ab_farmer/node_client/
caching_proxy_node_client.rs1use crate::node_client::{NodeClient, NodeClientExt};
5use crate::utils::AsyncJoinOnDrop;
6use ab_core_primitives::pieces::{Piece, PieceIndex};
7use ab_core_primitives::segments::{SegmentHeader, SegmentIndex};
8use ab_farmer_rpc_primitives::{
9 BlockSealInfo, BlockSealResponse, FarmerAppInfo, FarmerShardMembershipInfo,
10 MAX_SEGMENT_HEADERS_PER_REQUEST, SlotInfo, SolutionResponse,
11};
12use async_lock::{
13 Mutex as AsyncMutex, RwLock as AsyncRwLock,
14 RwLockUpgradableReadGuardArc as AsyncRwLockUpgradableReadGuard,
15 RwLockWriteGuardArc as AsyncRwLockWriteGuard,
16};
17use async_trait::async_trait;
18use futures::{FutureExt, Stream, StreamExt, select};
19use std::pin::Pin;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use tokio::sync::watch;
23use tokio_stream::wrappers::WatchStream;
24use tracing::{info, trace, warn};
25
26const SEGMENT_HEADERS_SYNC_INTERVAL: Duration = Duration::from_secs(1);
27const FARMER_APP_INFO_DEDUPLICATION_WINDOW: Duration = Duration::from_secs(1);
28
29#[derive(Debug, Default)]
30struct SegmentHeaders {
31 segment_headers: Vec<SegmentHeader>,
32 last_synced: Option<Instant>,
33}
34
35impl SegmentHeaders {
36 fn push(&mut self, archived_segment_header: SegmentHeader) {
39 if self.segment_headers.len()
40 == u64::from(archived_segment_header.local_segment_index()) as usize
41 {
42 self.segment_headers.push(archived_segment_header);
43 }
44 }
45
46 fn get_segment_headers(&self, segment_indices: &[SegmentIndex]) -> Vec<Option<SegmentHeader>> {
50 segment_indices
51 .iter()
52 .map(|segment_index| {
53 self.segment_headers
54 .get(u64::from(*segment_index) as usize)
55 .copied()
56 })
57 .collect::<Vec<_>>()
58 }
59
60 fn last_segment_headers(&self, limit: u32) -> Vec<Option<SegmentHeader>> {
62 self.segment_headers
63 .iter()
64 .rev()
65 .take(limit as usize)
66 .rev()
67 .copied()
68 .map(Some)
69 .collect()
70 }
71
72 async fn request_uncached_headers<NC>(&self, client: &NC) -> anyhow::Result<Vec<SegmentHeader>>
79 where
80 NC: NodeClient,
81 {
82 if let Some(last_synced) = &self.last_synced
84 && last_synced.elapsed() < SEGMENT_HEADERS_SYNC_INTERVAL
85 {
86 return Ok(Vec::new());
87 }
88
89 let mut extra_segment_headers = Vec::new();
90 let mut segment_index_offset = SegmentIndex::from(self.segment_headers.len() as u64);
91 let segment_index_step = SegmentIndex::from(MAX_SEGMENT_HEADERS_PER_REQUEST as u64);
92
93 'outer: loop {
94 let from = segment_index_offset;
95 let to = segment_index_offset + segment_index_step;
96 trace!(%from, %to, "Requesting segment headers");
97
98 for maybe_segment_header in client
99 .segment_headers((from..to).collect::<Vec<_>>())
100 .await
101 .map_err(|error| {
102 anyhow::anyhow!(
103 "Failed to download segment headers {from}..{to} from node: {error}"
104 )
105 })?
106 {
107 let Some(segment_header) = maybe_segment_header else {
108 break 'outer;
110 };
111
112 extra_segment_headers.push(segment_header);
113 }
114
115 segment_index_offset += segment_index_step;
116 }
117
118 Ok(extra_segment_headers)
119 }
120
121 fn write_cache(&mut self, extra_segment_headers: Vec<SegmentHeader>) {
123 for segment_header in extra_segment_headers {
124 self.push(segment_header);
125 }
126 self.last_synced.replace(Instant::now());
127 }
128}
129
130#[derive(Debug, Clone)]
138pub struct CachingProxyNodeClient<NC> {
139 inner: NC,
140 slot_info_receiver: watch::Receiver<Option<SlotInfo>>,
141 archived_segment_headers_receiver: watch::Receiver<Option<SegmentHeader>>,
142 block_sealing_receiver: watch::Receiver<Option<BlockSealInfo>>,
143 segment_headers: Arc<AsyncRwLock<SegmentHeaders>>,
144 last_farmer_app_info: Arc<AsyncMutex<(FarmerAppInfo, Instant)>>,
145 _background_task: Arc<AsyncJoinOnDrop<()>>,
146}
147
148impl<NC> CachingProxyNodeClient<NC>
149where
150 NC: NodeClient + Clone,
151{
152 pub async fn new(client: NC) -> anyhow::Result<Self> {
154 let mut segment_headers = SegmentHeaders::default();
155 let mut archived_segments_notifications =
156 client.subscribe_archived_segment_headers().await?;
157
158 info!("Downloading all segment headers from node...");
159 let headers = segment_headers.request_uncached_headers(&client).await?;
161 segment_headers.write_cache(headers);
162 info!("Downloaded all segment headers from node successfully");
163
164 let segment_headers = Arc::new(AsyncRwLock::new(segment_headers));
165
166 let (slot_info_sender, slot_info_receiver) = watch::channel(None::<SlotInfo>);
167 let slot_info_proxy_fut = {
168 let mut slot_info_subscription = client.subscribe_slot_info().await?;
169
170 async move {
171 let mut last_slot_number = None;
172 while let Some(slot_info) = slot_info_subscription.next().await {
173 if let Some(last_slot_number) = last_slot_number
174 && last_slot_number >= slot_info.slot
175 {
176 continue;
177 }
178 last_slot_number.replace(slot_info.slot);
179
180 if let Err(error) = slot_info_sender.send(Some(slot_info)) {
181 warn!(%error, "Failed to proxy slot info notification");
182 return;
183 }
184 }
185 }
186 };
187
188 let (archived_segment_headers_sender, archived_segment_headers_receiver) =
189 watch::channel(None::<SegmentHeader>);
190 let segment_headers_maintenance_fut = {
191 let client = client.clone();
192 let segment_headers = Arc::clone(&segment_headers);
193
194 async move {
195 let mut last_archived_segment_index = None;
196 while let Some(archived_segment_header) =
197 archived_segments_notifications.next().await
198 {
199 let segment_index =
200 SegmentIndex::from(archived_segment_header.local_segment_index());
201 trace!(
202 ?archived_segment_header,
203 "New archived archived segment header notification"
204 );
205
206 while let Err(error) = client
207 .acknowledge_archived_segment_header(segment_index)
208 .await
209 {
210 warn!(
211 %error,
212 "Failed to acknowledge archived segment header, trying again"
213 );
214 }
215
216 if let Some(last_archived_segment_index) = last_archived_segment_index
217 && last_archived_segment_index >= segment_index
218 {
219 continue;
220 }
221 last_archived_segment_index.replace(segment_index);
222
223 segment_headers.write().await.push(archived_segment_header);
224
225 if let Err(error) =
226 archived_segment_headers_sender.send(Some(archived_segment_header))
227 {
228 warn!(%error, "Failed to proxy archived segment header notification");
229 return;
230 }
231 }
232 }
233 };
234
235 let (block_sealing_sender, block_sealing_receiver) = watch::channel(None::<BlockSealInfo>);
236 let block_sealing_proxy_fut = {
237 let mut block_sealing_subscription = client.subscribe_block_sealing().await?;
238
239 async move {
240 while let Some(block_sealing_info) = block_sealing_subscription.next().await {
241 if let Err(error) = block_sealing_sender.send(Some(block_sealing_info)) {
242 warn!(%error, "Failed to proxy block sealing notification");
243 return;
244 }
245 }
246 }
247 };
248
249 let farmer_app_info = client
250 .farmer_app_info()
251 .await
252 .map_err(|error| anyhow::anyhow!("Failed to get farmer app info: {error}"))?;
253 let last_farmer_app_info = Arc::new(AsyncMutex::new((farmer_app_info, Instant::now())));
254
255 let background_task = tokio::spawn(async move {
256 select! {
257 _ = slot_info_proxy_fut.fuse() => {},
258 _ = segment_headers_maintenance_fut.fuse() => {},
259 _ = block_sealing_proxy_fut.fuse() => {},
260 }
261 });
262
263 let node_client = Self {
264 inner: client,
265 slot_info_receiver,
266 archived_segment_headers_receiver,
267 block_sealing_receiver,
268 segment_headers,
269 last_farmer_app_info,
270 _background_task: Arc::new(AsyncJoinOnDrop::new(background_task, true)),
271 };
272
273 Ok(node_client)
274 }
275}
276
277#[async_trait]
278impl<NC> NodeClient for CachingProxyNodeClient<NC>
279where
280 NC: NodeClient,
281{
282 async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
283 let (last_farmer_app_info, last_farmer_app_info_request) =
284 &mut *self.last_farmer_app_info.lock().await;
285
286 if last_farmer_app_info_request.elapsed() > FARMER_APP_INFO_DEDUPLICATION_WINDOW {
287 let new_last_farmer_app_info = self.inner.farmer_app_info().await?;
288
289 *last_farmer_app_info = new_last_farmer_app_info;
290 *last_farmer_app_info_request = Instant::now();
291 }
292
293 Ok(last_farmer_app_info.clone())
294 }
295
296 async fn subscribe_slot_info(
297 &self,
298 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
299 Ok(Box::pin(
300 WatchStream::new(self.slot_info_receiver.clone())
301 .filter_map(|maybe_slot_info| async move { maybe_slot_info }),
302 ))
303 }
304
305 async fn submit_solution_response(
306 &self,
307 solution_response: SolutionResponse,
308 ) -> anyhow::Result<()> {
309 self.inner.submit_solution_response(solution_response).await
310 }
311
312 async fn subscribe_block_sealing(
313 &self,
314 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = BlockSealInfo> + Send + 'static>>> {
315 Ok(Box::pin(
316 WatchStream::new(self.block_sealing_receiver.clone())
317 .filter_map(|maybe_block_sealing_info| async move { maybe_block_sealing_info }),
318 ))
319 }
320
321 async fn submit_block_seal(&self, block_seal: BlockSealResponse) -> anyhow::Result<()> {
323 self.inner.submit_block_seal(block_seal).await
324 }
325
326 async fn subscribe_archived_segment_headers(
327 &self,
328 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> {
329 Ok(Box::pin(
330 WatchStream::new(self.archived_segment_headers_receiver.clone())
331 .filter_map(|maybe_segment_header| async move { maybe_segment_header }),
332 ))
333 }
334
335 async fn segment_headers(
340 &self,
341 segment_indices: Vec<SegmentIndex>,
342 ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
343 let retrieved_segment_headers = self
344 .segment_headers
345 .read()
346 .await
347 .get_segment_headers(&segment_indices);
348
349 if retrieved_segment_headers.iter().all(Option::is_some) {
350 Ok(retrieved_segment_headers)
351 } else {
352 let segment_headers = self.segment_headers.upgradable_read_arc().await;
360
361 let retrieved_segment_headers = segment_headers.get_segment_headers(&segment_indices);
364 if retrieved_segment_headers.iter().all(Option::is_some) {
365 return Ok(retrieved_segment_headers);
366 }
367
368 let extra_segment_headers = segment_headers
370 .request_uncached_headers(&self.inner)
371 .await?;
372
373 if extra_segment_headers.is_empty() {
374 return Ok(retrieved_segment_headers);
377 }
378
379 let mut segment_headers =
381 AsyncRwLockUpgradableReadGuard::upgrade(segment_headers).await;
382 segment_headers.write_cache(extra_segment_headers);
383
384 Ok(AsyncRwLockWriteGuard::downgrade(segment_headers)
387 .get_segment_headers(&segment_indices))
388 }
389 }
390
391 async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
392 self.inner.piece(piece_index).await
393 }
394
395 async fn acknowledge_archived_segment_header(
396 &self,
397 _segment_index: SegmentIndex,
398 ) -> anyhow::Result<()> {
399 Ok(())
401 }
402
403 async fn update_shard_membership_info(
404 &self,
405 info: FarmerShardMembershipInfo,
406 ) -> anyhow::Result<()> {
407 self.inner.update_shard_membership_info(info).await
408 }
409}
410
411#[async_trait]
412impl<NC> NodeClientExt for CachingProxyNodeClient<NC>
413where
414 NC: NodeClientExt,
415{
416 async fn cached_segment_headers(
417 &self,
418 segment_indices: Vec<SegmentIndex>,
419 ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
420 Ok(self
423 .segment_headers
424 .read()
425 .await
426 .get_segment_headers(&segment_indices))
427 }
428
429 async fn last_segment_headers(&self, limit: u32) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
430 Ok(self
431 .segment_headers
432 .read()
433 .await
434 .last_segment_headers(limit))
435 }
436}