1use crate::node_client::{NodeClient, NodeClientExt};
5use crate::utils::AsyncJoinOnDrop;
6use ab_core_primitives::pieces::{Piece, PieceIndex};
7use ab_core_primitives::segments::{
8 SegmentHeader, SegmentIndex, SuperSegmentHeader, SuperSegmentIndex,
9};
10use ab_farmer_rpc_primitives::{
11 BlockSealInfo, BlockSealResponse, FarmerAppInfo, FarmerShardMembershipInfo,
12 MAX_SEGMENT_HEADERS_PER_REQUEST, SlotInfo, SolutionResponse,
13};
14use async_lock::{
15 Mutex as AsyncMutex, RwLock as AsyncRwLock,
16 RwLockUpgradableReadGuardArc as AsyncRwLockUpgradableReadGuard,
17 RwLockWriteGuardArc as AsyncRwLockWriteGuard,
18};
19use async_trait::async_trait;
20use futures::{FutureExt, Stream, StreamExt, select};
21use std::pin::Pin;
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24use tokio::sync::watch;
25use tokio_stream::wrappers::WatchStream;
26use tracing::{info, trace, warn};
27
28const SEGMENT_HEADERS_SYNC_INTERVAL: Duration = Duration::from_secs(1);
29const FARMER_APP_INFO_DEDUPLICATION_WINDOW: Duration = Duration::from_secs(1);
30
31#[derive(Debug, Default)]
32struct SegmentHeaders {
33 segment_headers: Vec<SegmentHeader>,
34 last_synced: Option<Instant>,
35}
36
37impl SegmentHeaders {
38 fn push(&mut self, archived_segment_header: SegmentHeader) {
41 if self.segment_headers.len()
42 == u64::from(archived_segment_header.local_segment_index()) as usize
43 {
44 self.segment_headers.push(archived_segment_header);
45 }
46 }
47
48 fn get_segment_headers(&self, segment_indices: &[SegmentIndex]) -> Vec<Option<SegmentHeader>> {
52 segment_indices
53 .iter()
54 .map(|segment_index| {
55 self.segment_headers
56 .get(u64::from(*segment_index) as usize)
57 .copied()
58 })
59 .collect::<Vec<_>>()
60 }
61
62 fn last_segment_headers(&self, limit: u32) -> Vec<Option<SegmentHeader>> {
64 self.segment_headers
65 .iter()
66 .rev()
67 .take(limit as usize)
68 .rev()
69 .copied()
70 .map(Some)
71 .collect()
72 }
73
74 async fn request_uncached_headers<NC>(&self, client: &NC) -> anyhow::Result<Vec<SegmentHeader>>
81 where
82 NC: NodeClient,
83 {
84 if let Some(last_synced) = &self.last_synced
86 && last_synced.elapsed() < SEGMENT_HEADERS_SYNC_INTERVAL
87 {
88 return Ok(Vec::new());
89 }
90
91 let mut extra_segment_headers = Vec::new();
92 let mut segment_index_offset = SegmentIndex::from(self.segment_headers.len() as u64);
93 let segment_index_step = SegmentIndex::from(MAX_SEGMENT_HEADERS_PER_REQUEST as u64);
94
95 'outer: loop {
96 let from = segment_index_offset;
97 let to = segment_index_offset + segment_index_step;
98 trace!(%from, %to, "Requesting segment headers");
99
100 for maybe_segment_header in client
101 .segment_headers((from..to).collect::<Vec<_>>())
102 .await
103 .map_err(|error| {
104 anyhow::anyhow!(
105 "Failed to download segment headers {from}..{to} from node: {error}"
106 )
107 })?
108 {
109 let Some(segment_header) = maybe_segment_header else {
110 break 'outer;
112 };
113
114 extra_segment_headers.push(segment_header);
115 }
116
117 segment_index_offset += segment_index_step;
118 }
119
120 Ok(extra_segment_headers)
121 }
122
123 fn write_cache(&mut self, extra_segment_headers: Vec<SegmentHeader>) {
125 for segment_header in extra_segment_headers {
126 self.push(segment_header);
127 }
128 self.last_synced.replace(Instant::now());
129 }
130}
131
132#[derive(Debug, Clone)]
140pub struct CachingProxyNodeClient<NC> {
141 inner: NC,
142 slot_info_receiver: watch::Receiver<Option<SlotInfo>>,
143 archived_segment_headers_receiver: watch::Receiver<Option<SegmentHeader>>,
144 block_sealing_receiver: watch::Receiver<Option<BlockSealInfo>>,
145 segment_headers: Arc<AsyncRwLock<SegmentHeaders>>,
146 last_farmer_app_info: Arc<AsyncMutex<(FarmerAppInfo, Instant)>>,
147 _background_task: Arc<AsyncJoinOnDrop<()>>,
148}
149
150impl<NC> CachingProxyNodeClient<NC>
151where
152 NC: NodeClient + Clone,
153{
154 pub async fn new(client: NC) -> anyhow::Result<Self> {
156 let mut segment_headers = SegmentHeaders::default();
157 let mut archived_segments_notifications =
158 client.subscribe_archived_segment_headers().await?;
159
160 info!("Downloading all segment headers from node...");
161 let headers = segment_headers.request_uncached_headers(&client).await?;
163 segment_headers.write_cache(headers);
164 info!("Downloaded all segment headers from node successfully");
165
166 let segment_headers = Arc::new(AsyncRwLock::new(segment_headers));
167
168 let (slot_info_sender, slot_info_receiver) = watch::channel(None::<SlotInfo>);
169 let slot_info_proxy_fut = {
170 let mut slot_info_subscription = client.subscribe_slot_info().await?;
171
172 async move {
173 let mut last_slot_number = None;
174 while let Some(slot_info) = slot_info_subscription.next().await {
175 if let Some(last_slot_number) = last_slot_number
176 && last_slot_number >= slot_info.slot
177 {
178 continue;
179 }
180 last_slot_number.replace(slot_info.slot);
181
182 if let Err(error) = slot_info_sender.send(Some(slot_info)) {
183 warn!(%error, "Failed to proxy slot info notification");
184 return;
185 }
186 }
187 }
188 };
189
190 let (archived_segment_headers_sender, archived_segment_headers_receiver) =
191 watch::channel(None::<SegmentHeader>);
192 let segment_headers_maintenance_fut = {
193 let client = client.clone();
194 let segment_headers = Arc::clone(&segment_headers);
195
196 async move {
197 let mut last_archived_segment_index = None;
198 while let Some(archived_segment_header) =
199 archived_segments_notifications.next().await
200 {
201 let segment_index =
202 SegmentIndex::from(archived_segment_header.local_segment_index());
203 trace!(
204 ?archived_segment_header,
205 "New archived archived segment header notification"
206 );
207
208 while let Err(error) = client
209 .acknowledge_archived_segment_header(segment_index)
210 .await
211 {
212 warn!(
213 %error,
214 "Failed to acknowledge archived segment header, trying again"
215 );
216 }
217
218 if let Some(last_archived_segment_index) = last_archived_segment_index
219 && last_archived_segment_index >= segment_index
220 {
221 continue;
222 }
223 last_archived_segment_index.replace(segment_index);
224
225 segment_headers.write().await.push(archived_segment_header);
226
227 if let Err(error) =
228 archived_segment_headers_sender.send(Some(archived_segment_header))
229 {
230 warn!(%error, "Failed to proxy archived segment header notification");
231 return;
232 }
233 }
234 }
235 };
236
237 let (block_sealing_sender, block_sealing_receiver) = watch::channel(None::<BlockSealInfo>);
238 let block_sealing_proxy_fut = {
239 let mut block_sealing_subscription = client.subscribe_block_sealing().await?;
240
241 async move {
242 while let Some(block_sealing_info) = block_sealing_subscription.next().await {
243 if let Err(error) = block_sealing_sender.send(Some(block_sealing_info)) {
244 warn!(%error, "Failed to proxy block sealing notification");
245 return;
246 }
247 }
248 }
249 };
250
251 let farmer_app_info = client
252 .farmer_app_info()
253 .await
254 .map_err(|error| anyhow::anyhow!("Failed to get farmer app info: {error}"))?;
255 let last_farmer_app_info = Arc::new(AsyncMutex::new((farmer_app_info, Instant::now())));
256
257 let background_task = tokio::spawn(async move {
258 select! {
259 _ = slot_info_proxy_fut.fuse() => {},
260 _ = segment_headers_maintenance_fut.fuse() => {},
261 _ = block_sealing_proxy_fut.fuse() => {},
262 }
263 });
264
265 let node_client = Self {
266 inner: client,
267 slot_info_receiver,
268 archived_segment_headers_receiver,
269 block_sealing_receiver,
270 segment_headers,
271 last_farmer_app_info,
272 _background_task: Arc::new(AsyncJoinOnDrop::new(background_task, true)),
273 };
274
275 Ok(node_client)
276 }
277}
278
279#[async_trait]
280impl<NC> NodeClient for CachingProxyNodeClient<NC>
281where
282 NC: NodeClient,
283{
284 async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
285 let (last_farmer_app_info, last_farmer_app_info_request) =
286 &mut *self.last_farmer_app_info.lock().await;
287
288 if last_farmer_app_info_request.elapsed() > FARMER_APP_INFO_DEDUPLICATION_WINDOW {
289 let new_last_farmer_app_info = self.inner.farmer_app_info().await?;
290
291 *last_farmer_app_info = new_last_farmer_app_info;
292 *last_farmer_app_info_request = Instant::now();
293 }
294
295 Ok(last_farmer_app_info.clone())
296 }
297
298 async fn subscribe_slot_info(
299 &self,
300 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
301 Ok(Box::pin(
302 WatchStream::new(self.slot_info_receiver.clone())
303 .filter_map(|maybe_slot_info| async move { maybe_slot_info }),
304 ))
305 }
306
307 async fn submit_solution_response(
308 &self,
309 solution_response: SolutionResponse,
310 ) -> anyhow::Result<()> {
311 self.inner.submit_solution_response(solution_response).await
312 }
313
314 async fn subscribe_block_sealing(
315 &self,
316 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = BlockSealInfo> + Send + 'static>>> {
317 Ok(Box::pin(
318 WatchStream::new(self.block_sealing_receiver.clone())
319 .filter_map(|maybe_block_sealing_info| async move { maybe_block_sealing_info }),
320 ))
321 }
322
323 async fn submit_block_seal(&self, block_seal: BlockSealResponse) -> anyhow::Result<()> {
324 self.inner.submit_block_seal(block_seal).await
325 }
326
327 async fn subscribe_archived_segment_headers(
328 &self,
329 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> {
330 Ok(Box::pin(
331 WatchStream::new(self.archived_segment_headers_receiver.clone())
332 .filter_map(|maybe_segment_header| async move { maybe_segment_header }),
333 ))
334 }
335
336 async fn super_segment_headers(
337 &self,
338 super_segment_indices: Vec<SuperSegmentIndex>,
339 ) -> anyhow::Result<Vec<Option<SuperSegmentHeader>>> {
340 self.inner
342 .super_segment_headers(super_segment_indices)
343 .await
344 }
345
346 async fn segment_headers(
351 &self,
352 segment_indices: Vec<SegmentIndex>,
353 ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
354 let retrieved_segment_headers = self
355 .segment_headers
356 .read()
357 .await
358 .get_segment_headers(&segment_indices);
359
360 if retrieved_segment_headers.iter().all(Option::is_some) {
361 Ok(retrieved_segment_headers)
362 } else {
363 let segment_headers = self.segment_headers.upgradable_read_arc().await;
371
372 let retrieved_segment_headers = segment_headers.get_segment_headers(&segment_indices);
375 if retrieved_segment_headers.iter().all(Option::is_some) {
376 return Ok(retrieved_segment_headers);
377 }
378
379 let extra_segment_headers = segment_headers
381 .request_uncached_headers(&self.inner)
382 .await?;
383
384 if extra_segment_headers.is_empty() {
385 return Ok(retrieved_segment_headers);
388 }
389
390 let mut segment_headers =
392 AsyncRwLockUpgradableReadGuard::upgrade(segment_headers).await;
393 segment_headers.write_cache(extra_segment_headers);
394
395 Ok(AsyncRwLockWriteGuard::downgrade(segment_headers)
398 .get_segment_headers(&segment_indices))
399 }
400 }
401
402 async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
403 self.inner.piece(piece_index).await
404 }
405
406 async fn acknowledge_archived_segment_header(
407 &self,
408 _segment_index: SegmentIndex,
409 ) -> anyhow::Result<()> {
410 Ok(())
412 }
413
414 async fn update_shard_membership_info(
415 &self,
416 info: FarmerShardMembershipInfo,
417 ) -> anyhow::Result<()> {
418 self.inner.update_shard_membership_info(info).await
419 }
420}
421
422#[async_trait]
423impl<NC> NodeClientExt for CachingProxyNodeClient<NC>
424where
425 NC: NodeClientExt,
426{
427 async fn cached_segment_headers(
428 &self,
429 segment_indices: Vec<SegmentIndex>,
430 ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
431 Ok(self
434 .segment_headers
435 .read()
436 .await
437 .get_segment_headers(&segment_indices))
438 }
439
440 async fn last_segment_headers(&self, limit: u32) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
441 Ok(self
442 .segment_headers
443 .read()
444 .await
445 .last_segment_headers(limit))
446 }
447}