1use crate::node_client::{NodeClient, NodeClientExt};
5use crate::utils::AsyncJoinOnDrop;
6use ab_core_primitives::pieces::{Piece, PieceIndex};
7use ab_core_primitives::segments::{
8 SegmentIndex, SuperSegmentHeader, SuperSegmentIndex, SuperSegmentRoot,
9};
10use ab_farmer_rpc_primitives::{
11 BlockSealInfo, BlockSealResponse, FarmerAppInfo, FarmerShardMembershipInfo,
12 MAX_SUPER_SEGMENT_HEADERS_PER_REQUEST, SlotInfo, SolutionResponse,
13};
14use async_lock::{
15 Mutex as AsyncMutex, RwLock as AsyncRwLock,
16 RwLockUpgradableReadGuardArc as AsyncRwLockUpgradableReadGuard,
17 RwLockWriteGuardArc as AsyncRwLockWriteGuard,
18};
19use async_trait::async_trait;
20use futures::{FutureExt, Stream, StreamExt, select};
21use std::pin::Pin;
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24use tokio::sync::watch;
25use tokio_stream::wrappers::WatchStream;
26use tracing::{info, trace, warn};
27
28const SEGMENT_HEADERS_SYNC_INTERVAL: Duration = Duration::from_secs(1);
29const FARMER_APP_INFO_DEDUPLICATION_WINDOW: Duration = Duration::from_secs(1);
30
31#[derive(Debug, Default)]
32struct SuperSegmentHeaders {
33 super_segment_headers: Vec<SuperSegmentHeader>,
34 last_synced: Option<Instant>,
35}
36
37impl SuperSegmentHeaders {
38 fn push(&mut self, new_segment_header: SuperSegmentHeader) {
41 if self.super_segment_headers.len() as u64 == u64::from(new_segment_header.index.as_inner())
42 {
43 self.super_segment_headers.push(new_segment_header);
44 }
45 }
46
47 fn get_super_segment_headers(
51 &self,
52 super_segment_indices: &[SuperSegmentIndex],
53 ) -> Vec<Option<SuperSegmentHeader>> {
54 super_segment_indices
55 .iter()
56 .map(|super_segment_index| {
57 self.super_segment_headers
58 .get(u64::from(*super_segment_index) as usize)
59 .copied()
60 })
61 .collect::<Vec<_>>()
62 }
63
64 fn last_super_segment_headers(&self, limit: u32) -> Vec<Option<SuperSegmentHeader>> {
66 self.super_segment_headers
67 .iter()
68 .rev()
69 .take(limit as usize)
70 .rev()
71 .copied()
72 .map(Some)
73 .collect()
74 }
75
76 fn super_segment_root_for_segment_index(
79 &self,
80 segment_index: SegmentIndex,
81 ) -> Option<SuperSegmentRoot> {
82 let index = self
83 .super_segment_headers
84 .binary_search_by_key(&segment_index, |super_segment_header| {
85 super_segment_header.max_segment_index.as_inner()
86 })
87 .unwrap_or_else(|insert_index| insert_index);
88
89 let super_segment_header = self.super_segment_headers.get(index).copied()?;
90
91 let max_segment_index = super_segment_header.max_segment_index.as_inner();
92 let first_segment_index = max_segment_index
93 - SegmentIndex::from(u64::from(super_segment_header.num_segments))
94 + SegmentIndex::ONE;
95
96 (first_segment_index..=max_segment_index)
97 .contains(&segment_index)
98 .then_some(super_segment_header.root)
99 }
100
101 async fn request_uncached_headers<NC>(
108 &self,
109 client: &NC,
110 ) -> anyhow::Result<Vec<SuperSegmentHeader>>
111 where
112 NC: NodeClient,
113 {
114 if let Some(last_synced) = &self.last_synced
116 && last_synced.elapsed() < SEGMENT_HEADERS_SYNC_INTERVAL
117 {
118 return Ok(Vec::new());
119 }
120
121 let mut extra_super_segment_headers = Vec::new();
122 let mut super_segment_index_offset =
123 SuperSegmentIndex::from(self.super_segment_headers.len() as u64);
124 let segment_index_step =
125 SuperSegmentIndex::from(MAX_SUPER_SEGMENT_HEADERS_PER_REQUEST as u64);
126
127 'outer: loop {
128 let from = super_segment_index_offset;
129 let to = super_segment_index_offset + segment_index_step;
130 trace!(%from, %to, "Requesting super segment headers");
131
132 for maybe_super_segment_header in client
133 .super_segment_headers((from..to).collect::<Vec<_>>())
134 .await
135 .map_err(|error| {
136 anyhow::anyhow!(
137 "Failed to download super segment headers {from}..{to} from node: {error}"
138 )
139 })?
140 {
141 let Some(super_segment_header) = maybe_super_segment_header else {
142 break 'outer;
144 };
145
146 extra_super_segment_headers.push(super_segment_header);
147 }
148
149 super_segment_index_offset += segment_index_step;
150 }
151
152 Ok(extra_super_segment_headers)
153 }
154
155 fn write_cache(&mut self, extra_super_segment_headers: Vec<SuperSegmentHeader>) {
157 for super_segment_header in extra_super_segment_headers {
158 self.push(super_segment_header);
159 }
160 self.last_synced.replace(Instant::now());
161 }
162}
163
164#[derive(Debug, Clone)]
172pub struct CachingProxyNodeClient<NC> {
173 inner: NC,
174 slot_info_receiver: watch::Receiver<Option<SlotInfo>>,
175 new_super_segment_headers_receiver: watch::Receiver<Option<SuperSegmentHeader>>,
176 block_sealing_receiver: watch::Receiver<Option<BlockSealInfo>>,
177 super_segment_headers: Arc<AsyncRwLock<SuperSegmentHeaders>>,
178 last_farmer_app_info: Arc<AsyncMutex<(FarmerAppInfo, Instant)>>,
179 _background_task: Arc<AsyncJoinOnDrop<()>>,
180}
181
182impl<NC> CachingProxyNodeClient<NC>
183where
184 NC: NodeClient + Clone,
185{
186 pub async fn new(client: NC) -> anyhow::Result<Self> {
188 let mut super_segment_headers = SuperSegmentHeaders::default();
189 let mut new_super_segments_notifications =
190 client.subscribe_new_super_segment_headers().await?;
191
192 info!("Downloading all super segment headers from node...");
193 let headers = super_segment_headers
195 .request_uncached_headers(&client)
196 .await?;
197 super_segment_headers.write_cache(headers);
198 info!("Downloaded all super segment headers from node successfully");
199
200 let super_segment_headers = Arc::new(AsyncRwLock::new(super_segment_headers));
201
202 let (slot_info_sender, slot_info_receiver) = watch::channel(None::<SlotInfo>);
203 let slot_info_proxy_fut = {
204 let mut slot_info_subscription = client.subscribe_slot_info().await?;
205
206 async move {
207 let mut last_slot_number = None;
208 while let Some(slot_info) = slot_info_subscription.next().await {
209 if let Some(last_slot_number) = last_slot_number
210 && last_slot_number >= slot_info.slot
211 {
212 continue;
213 }
214 last_slot_number.replace(slot_info.slot);
215
216 if let Err(error) = slot_info_sender.send(Some(slot_info)) {
217 warn!(%error, "Failed to proxy slot info notification");
218 return;
219 }
220 }
221 }
222 };
223
224 let (new_super_segment_headers_sender, new_super_segment_headers_receiver) =
225 watch::channel(None::<SuperSegmentHeader>);
226 let super_segment_headers_maintenance_fut = {
227 let super_segment_headers = Arc::clone(&super_segment_headers);
228
229 async move {
230 let mut last_super_segment_index = None;
231 while let Some(new_segment_header) = new_super_segments_notifications.next().await {
232 let super_segment_index = new_segment_header.index;
233 trace!(?new_segment_header, "New super segment header notification");
234
235 if let Some(last_super_segment_index) = last_super_segment_index
236 && last_super_segment_index >= super_segment_index
237 {
238 continue;
239 }
240 last_super_segment_index.replace(super_segment_index);
241
242 super_segment_headers.write().await.push(new_segment_header);
243
244 if let Err(error) =
245 new_super_segment_headers_sender.send(Some(new_segment_header))
246 {
247 warn!(%error, "Failed to proxy new super segment header notification");
248 return;
249 }
250 }
251 }
252 };
253
254 let (block_sealing_sender, block_sealing_receiver) = watch::channel(None::<BlockSealInfo>);
255 let block_sealing_proxy_fut = {
256 let mut block_sealing_subscription = client.subscribe_block_sealing().await?;
257
258 async move {
259 while let Some(block_sealing_info) = block_sealing_subscription.next().await {
260 if let Err(error) = block_sealing_sender.send(Some(block_sealing_info)) {
261 warn!(%error, "Failed to proxy block sealing notification");
262 return;
263 }
264 }
265 }
266 };
267
268 let farmer_app_info = client
269 .farmer_app_info()
270 .await
271 .map_err(|error| anyhow::anyhow!("Failed to get farmer app info: {error}"))?;
272 let last_farmer_app_info = Arc::new(AsyncMutex::new((farmer_app_info, Instant::now())));
273
274 let background_task = tokio::spawn(async move {
275 select! {
276 _ = slot_info_proxy_fut.fuse() => {},
277 _ = super_segment_headers_maintenance_fut.fuse() => {},
278 _ = block_sealing_proxy_fut.fuse() => {},
279 }
280 });
281
282 let node_client = Self {
283 inner: client,
284 slot_info_receiver,
285 new_super_segment_headers_receiver,
286 block_sealing_receiver,
287 super_segment_headers,
288 last_farmer_app_info,
289 _background_task: Arc::new(AsyncJoinOnDrop::new(background_task, true)),
290 };
291
292 Ok(node_client)
293 }
294}
295
296#[async_trait]
297impl<NC> NodeClient for CachingProxyNodeClient<NC>
298where
299 NC: NodeClient,
300{
301 async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
302 let (last_farmer_app_info, last_farmer_app_info_request) =
303 &mut *self.last_farmer_app_info.lock().await;
304
305 if last_farmer_app_info_request.elapsed() > FARMER_APP_INFO_DEDUPLICATION_WINDOW {
306 let new_last_farmer_app_info = self.inner.farmer_app_info().await?;
307
308 *last_farmer_app_info = new_last_farmer_app_info;
309 *last_farmer_app_info_request = Instant::now();
310 }
311
312 Ok(last_farmer_app_info.clone())
313 }
314
315 async fn subscribe_slot_info(
316 &self,
317 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
318 Ok(Box::pin(
319 WatchStream::new(self.slot_info_receiver.clone())
320 .filter_map(|maybe_slot_info| async move { maybe_slot_info }),
321 ))
322 }
323
324 async fn submit_solution_response(
325 &self,
326 solution_response: SolutionResponse,
327 ) -> anyhow::Result<()> {
328 self.inner.submit_solution_response(solution_response).await
329 }
330
331 async fn subscribe_block_sealing(
332 &self,
333 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = BlockSealInfo> + Send + 'static>>> {
334 Ok(Box::pin(
335 WatchStream::new(self.block_sealing_receiver.clone())
336 .filter_map(|maybe_block_sealing_info| async move { maybe_block_sealing_info }),
337 ))
338 }
339
340 async fn submit_block_seal(&self, block_seal: BlockSealResponse) -> anyhow::Result<()> {
341 self.inner.submit_block_seal(block_seal).await
342 }
343
344 async fn subscribe_new_super_segment_headers(
345 &self,
346 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SuperSegmentHeader> + Send + 'static>>> {
347 Ok(Box::pin(
348 WatchStream::new(self.new_super_segment_headers_receiver.clone())
349 .filter_map(|maybe_super_segment_header| async move { maybe_super_segment_header }),
350 ))
351 }
352
353 async fn super_segment_headers(
354 &self,
355 super_segment_indices: Vec<SuperSegmentIndex>,
356 ) -> anyhow::Result<Vec<Option<SuperSegmentHeader>>> {
357 let retrieved_super_segment_headers = self
358 .super_segment_headers
359 .read()
360 .await
361 .get_super_segment_headers(&super_segment_indices);
362
363 if retrieved_super_segment_headers.iter().all(Option::is_some) {
364 return Ok(retrieved_super_segment_headers);
365 }
366
367 let super_segment_headers = self.super_segment_headers.upgradable_read_arc().await;
375
376 let retrieved_super_segment_headers =
379 super_segment_headers.get_super_segment_headers(&super_segment_indices);
380 if retrieved_super_segment_headers.iter().all(Option::is_some) {
381 return Ok(retrieved_super_segment_headers);
382 }
383
384 let extra_super_segment_headers = super_segment_headers
386 .request_uncached_headers(&self.inner)
387 .await?;
388
389 if extra_super_segment_headers.is_empty() {
390 return Ok(retrieved_super_segment_headers);
393 }
394
395 let mut super_segment_headers =
397 AsyncRwLockUpgradableReadGuard::upgrade(super_segment_headers).await;
398 super_segment_headers.write_cache(extra_super_segment_headers);
399
400 Ok(AsyncRwLockWriteGuard::downgrade(super_segment_headers)
403 .get_super_segment_headers(&super_segment_indices))
404 }
405
406 async fn super_segment_root_for_segment_index(
407 &self,
408 segment_index: SegmentIndex,
409 ) -> anyhow::Result<Option<SuperSegmentRoot>> {
410 Ok(self
411 .super_segment_headers
412 .read()
413 .await
414 .super_segment_root_for_segment_index(segment_index))
415 }
416
417 async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
418 self.inner.piece(piece_index).await
419 }
420
421 async fn update_shard_membership_info(
422 &self,
423 info: FarmerShardMembershipInfo,
424 ) -> anyhow::Result<()> {
425 self.inner.update_shard_membership_info(info).await
426 }
427}
428
429#[async_trait]
430impl<NC> NodeClientExt for CachingProxyNodeClient<NC>
431where
432 NC: NodeClientExt,
433{
434 async fn cached_super_segment_headers(
435 &self,
436 super_segment_indices: Vec<SuperSegmentIndex>,
437 ) -> anyhow::Result<Vec<Option<SuperSegmentHeader>>> {
438 Ok(self
441 .super_segment_headers
442 .read()
443 .await
444 .get_super_segment_headers(&super_segment_indices))
445 }
446
447 async fn last_super_segment_headers(
448 &self,
449 limit: u32,
450 ) -> anyhow::Result<Vec<Option<SuperSegmentHeader>>> {
451 Ok(self
452 .super_segment_headers
453 .read()
454 .await
455 .last_super_segment_headers(limit))
456 }
457}