ab_farmer/node_client/
caching_proxy_node_client.rs1use crate::node_client::{NodeClient, NodeClientExt};
5use crate::utils::AsyncJoinOnDrop;
6use ab_core_primitives::pieces::{Piece, PieceIndex};
7use ab_core_primitives::segments::{SegmentHeader, SegmentIndex};
8use ab_farmer_rpc_primitives::{
9 BlockSealInfo, BlockSealResponse, FarmerAppInfo, MAX_SEGMENT_HEADERS_PER_REQUEST, SlotInfo,
10 SolutionResponse,
11};
12use async_lock::{
13 Mutex as AsyncMutex, RwLock as AsyncRwLock,
14 RwLockUpgradableReadGuardArc as AsyncRwLockUpgradableReadGuard,
15 RwLockWriteGuardArc as AsyncRwLockWriteGuard,
16};
17use async_trait::async_trait;
18use futures::{FutureExt, Stream, StreamExt, select};
19use std::pin::Pin;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use tokio::sync::watch;
23use tokio_stream::wrappers::WatchStream;
24use tracing::{info, trace, warn};
25
26const SEGMENT_HEADERS_SYNC_INTERVAL: Duration = Duration::from_secs(1);
27const FARMER_APP_INFO_DEDUPLICATION_WINDOW: Duration = Duration::from_secs(1);
28
29#[derive(Debug, Default)]
30struct SegmentHeaders {
31 segment_headers: Vec<SegmentHeader>,
32 last_synced: Option<Instant>,
33}
34
35impl SegmentHeaders {
36 fn push(&mut self, archived_segment_header: SegmentHeader) {
39 if self.segment_headers.len() == u64::from(archived_segment_header.segment_index()) as usize
40 {
41 self.segment_headers.push(archived_segment_header);
42 }
43 }
44
45 fn get_segment_headers(&self, segment_indices: &[SegmentIndex]) -> Vec<Option<SegmentHeader>> {
49 segment_indices
50 .iter()
51 .map(|segment_index| {
52 self.segment_headers
53 .get(u64::from(*segment_index) as usize)
54 .copied()
55 })
56 .collect::<Vec<_>>()
57 }
58
59 fn last_segment_headers(&self, limit: u32) -> Vec<Option<SegmentHeader>> {
61 self.segment_headers
62 .iter()
63 .rev()
64 .take(limit as usize)
65 .rev()
66 .copied()
67 .map(Some)
68 .collect()
69 }
70
71 async fn request_uncached_headers<NC>(&self, client: &NC) -> anyhow::Result<Vec<SegmentHeader>>
78 where
79 NC: NodeClient,
80 {
81 if let Some(last_synced) = &self.last_synced
83 && last_synced.elapsed() < SEGMENT_HEADERS_SYNC_INTERVAL
84 {
85 return Ok(Vec::new());
86 }
87
88 let mut extra_segment_headers = Vec::new();
89 let mut segment_index_offset = SegmentIndex::from(self.segment_headers.len() as u64);
90 let segment_index_step = SegmentIndex::from(MAX_SEGMENT_HEADERS_PER_REQUEST as u64);
91
92 'outer: loop {
93 let from = segment_index_offset;
94 let to = segment_index_offset + segment_index_step;
95 trace!(%from, %to, "Requesting segment headers");
96
97 for maybe_segment_header in client
98 .segment_headers((from..to).collect::<Vec<_>>())
99 .await
100 .map_err(|error| {
101 anyhow::anyhow!(
102 "Failed to download segment headers {from}..{to} from node: {error}"
103 )
104 })?
105 {
106 let Some(segment_header) = maybe_segment_header else {
107 break 'outer;
109 };
110
111 extra_segment_headers.push(segment_header);
112 }
113
114 segment_index_offset += segment_index_step;
115 }
116
117 Ok(extra_segment_headers)
118 }
119
120 fn write_cache(&mut self, extra_segment_headers: Vec<SegmentHeader>) {
122 for segment_header in extra_segment_headers {
123 self.push(segment_header);
124 }
125 self.last_synced.replace(Instant::now());
126 }
127}
128
129#[derive(Debug, Clone)]
137pub struct CachingProxyNodeClient<NC> {
138 inner: NC,
139 slot_info_receiver: watch::Receiver<Option<SlotInfo>>,
140 archived_segment_headers_receiver: watch::Receiver<Option<SegmentHeader>>,
141 block_sealing_receiver: watch::Receiver<Option<BlockSealInfo>>,
142 segment_headers: Arc<AsyncRwLock<SegmentHeaders>>,
143 last_farmer_app_info: Arc<AsyncMutex<(FarmerAppInfo, Instant)>>,
144 _background_task: Arc<AsyncJoinOnDrop<()>>,
145}
146
147impl<NC> CachingProxyNodeClient<NC>
148where
149 NC: NodeClient + Clone,
150{
151 pub async fn new(client: NC) -> anyhow::Result<Self> {
153 let mut segment_headers = SegmentHeaders::default();
154 let mut archived_segments_notifications =
155 client.subscribe_archived_segment_headers().await?;
156
157 info!("Downloading all segment headers from node...");
158 let headers = segment_headers.request_uncached_headers(&client).await?;
160 segment_headers.write_cache(headers);
161 info!("Downloaded all segment headers from node successfully");
162
163 let segment_headers = Arc::new(AsyncRwLock::new(segment_headers));
164
165 let (slot_info_sender, slot_info_receiver) = watch::channel(None::<SlotInfo>);
166 let slot_info_proxy_fut = {
167 let mut slot_info_subscription = client.subscribe_slot_info().await?;
168
169 async move {
170 let mut last_slot_number = None;
171 while let Some(slot_info) = slot_info_subscription.next().await {
172 if let Some(last_slot_number) = last_slot_number
173 && last_slot_number >= slot_info.slot_number
174 {
175 continue;
176 }
177 last_slot_number.replace(slot_info.slot_number);
178
179 if let Err(error) = slot_info_sender.send(Some(slot_info)) {
180 warn!(%error, "Failed to proxy slot info notification");
181 return;
182 }
183 }
184 }
185 };
186
187 let (archived_segment_headers_sender, archived_segment_headers_receiver) =
188 watch::channel(None::<SegmentHeader>);
189 let segment_headers_maintenance_fut = {
190 let client = client.clone();
191 let segment_headers = Arc::clone(&segment_headers);
192
193 async move {
194 let mut last_archived_segment_index = None;
195 while let Some(archived_segment_header) =
196 archived_segments_notifications.next().await
197 {
198 let segment_index = archived_segment_header.segment_index();
199 trace!(
200 ?archived_segment_header,
201 "New archived archived segment header notification"
202 );
203
204 while let Err(error) = client
205 .acknowledge_archived_segment_header(segment_index)
206 .await
207 {
208 warn!(
209 %error,
210 "Failed to acknowledge archived segment header, trying again"
211 );
212 }
213
214 if let Some(last_archived_segment_index) = last_archived_segment_index
215 && last_archived_segment_index >= segment_index
216 {
217 continue;
218 }
219 last_archived_segment_index.replace(segment_index);
220
221 segment_headers.write().await.push(archived_segment_header);
222
223 if let Err(error) =
224 archived_segment_headers_sender.send(Some(archived_segment_header))
225 {
226 warn!(%error, "Failed to proxy archived segment header notification");
227 return;
228 }
229 }
230 }
231 };
232
233 let (block_sealing_sender, block_sealing_receiver) = watch::channel(None::<BlockSealInfo>);
234 let block_sealing_proxy_fut = {
235 let mut block_sealing_subscription = client.subscribe_block_sealing().await?;
236
237 async move {
238 while let Some(block_sealing_info) = block_sealing_subscription.next().await {
239 if let Err(error) = block_sealing_sender.send(Some(block_sealing_info)) {
240 warn!(%error, "Failed to proxy block sealing notification");
241 return;
242 }
243 }
244 }
245 };
246
247 let farmer_app_info = client
248 .farmer_app_info()
249 .await
250 .map_err(|error| anyhow::anyhow!("Failed to get farmer app info: {error}"))?;
251 let last_farmer_app_info = Arc::new(AsyncMutex::new((farmer_app_info, Instant::now())));
252
253 let background_task = tokio::spawn(async move {
254 select! {
255 _ = slot_info_proxy_fut.fuse() => {},
256 _ = segment_headers_maintenance_fut.fuse() => {},
257 _ = block_sealing_proxy_fut.fuse() => {},
258 }
259 });
260
261 let node_client = Self {
262 inner: client,
263 slot_info_receiver,
264 archived_segment_headers_receiver,
265 block_sealing_receiver,
266 segment_headers,
267 last_farmer_app_info,
268 _background_task: Arc::new(AsyncJoinOnDrop::new(background_task, true)),
269 };
270
271 Ok(node_client)
272 }
273}
274
275#[async_trait]
276impl<NC> NodeClient for CachingProxyNodeClient<NC>
277where
278 NC: NodeClient,
279{
280 async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
281 let (last_farmer_app_info, last_farmer_app_info_request) =
282 &mut *self.last_farmer_app_info.lock().await;
283
284 if last_farmer_app_info_request.elapsed() > FARMER_APP_INFO_DEDUPLICATION_WINDOW {
285 let new_last_farmer_app_info = self.inner.farmer_app_info().await?;
286
287 *last_farmer_app_info = new_last_farmer_app_info;
288 *last_farmer_app_info_request = Instant::now();
289 }
290
291 Ok(last_farmer_app_info.clone())
292 }
293
294 async fn subscribe_slot_info(
295 &self,
296 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
297 Ok(Box::pin(
298 WatchStream::new(self.slot_info_receiver.clone())
299 .filter_map(|maybe_slot_info| async move { maybe_slot_info }),
300 ))
301 }
302
303 async fn submit_solution_response(
304 &self,
305 solution_response: SolutionResponse,
306 ) -> anyhow::Result<()> {
307 self.inner.submit_solution_response(solution_response).await
308 }
309
310 async fn subscribe_block_sealing(
311 &self,
312 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = BlockSealInfo> + Send + 'static>>> {
313 Ok(Box::pin(
314 WatchStream::new(self.block_sealing_receiver.clone())
315 .filter_map(|maybe_block_sealing_info| async move { maybe_block_sealing_info }),
316 ))
317 }
318
319 async fn submit_block_seal(&self, block_seal: BlockSealResponse) -> anyhow::Result<()> {
321 self.inner.submit_block_seal(block_seal).await
322 }
323
324 async fn subscribe_archived_segment_headers(
325 &self,
326 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> {
327 Ok(Box::pin(
328 WatchStream::new(self.archived_segment_headers_receiver.clone())
329 .filter_map(|maybe_segment_header| async move { maybe_segment_header }),
330 ))
331 }
332
333 async fn segment_headers(
338 &self,
339 segment_indices: Vec<SegmentIndex>,
340 ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
341 let retrieved_segment_headers = self
342 .segment_headers
343 .read()
344 .await
345 .get_segment_headers(&segment_indices);
346
347 if retrieved_segment_headers.iter().all(Option::is_some) {
348 Ok(retrieved_segment_headers)
349 } else {
350 let segment_headers = self.segment_headers.upgradable_read_arc().await;
357
358 let retrieved_segment_headers = segment_headers.get_segment_headers(&segment_indices);
361 if retrieved_segment_headers.iter().all(Option::is_some) {
362 return Ok(retrieved_segment_headers);
363 }
364
365 let extra_segment_headers = segment_headers
367 .request_uncached_headers(&self.inner)
368 .await?;
369
370 if extra_segment_headers.is_empty() {
371 return Ok(retrieved_segment_headers);
374 }
375
376 let mut segment_headers =
378 AsyncRwLockUpgradableReadGuard::upgrade(segment_headers).await;
379 segment_headers.write_cache(extra_segment_headers);
380
381 Ok(AsyncRwLockWriteGuard::downgrade(segment_headers)
384 .get_segment_headers(&segment_indices))
385 }
386 }
387
388 async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
389 self.inner.piece(piece_index).await
390 }
391
392 async fn acknowledge_archived_segment_header(
393 &self,
394 _segment_index: SegmentIndex,
395 ) -> anyhow::Result<()> {
396 Ok(())
398 }
399}
400
401#[async_trait]
402impl<NC> NodeClientExt for CachingProxyNodeClient<NC>
403where
404 NC: NodeClientExt,
405{
406 async fn cached_segment_headers(
407 &self,
408 segment_indices: Vec<SegmentIndex>,
409 ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
410 Ok(self
413 .segment_headers
414 .read()
415 .await
416 .get_segment_headers(&segment_indices))
417 }
418
419 async fn last_segment_headers(&self, limit: u32) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
420 Ok(self
421 .segment_headers
422 .read()
423 .await
424 .last_segment_headers(limit))
425 }
426}