Skip to main content

ab_farmer/node_client/
caching_proxy_node_client.rs

1//! Node client wrapper around another node client that caches some data for better performance and
2//! proxies other requests through
3
4use crate::node_client::{NodeClient, NodeClientExt};
5use crate::utils::AsyncJoinOnDrop;
6use ab_core_primitives::pieces::{Piece, PieceIndex};
7use ab_core_primitives::segments::{
8    SegmentIndex, SuperSegmentHeader, SuperSegmentIndex, SuperSegmentRoot,
9};
10use ab_farmer_rpc_primitives::{
11    BlockSealInfo, BlockSealResponse, FarmerAppInfo, FarmerShardMembershipInfo,
12    MAX_SUPER_SEGMENT_HEADERS_PER_REQUEST, SlotInfo, SolutionResponse,
13};
14use async_lock::{
15    Mutex as AsyncMutex, RwLock as AsyncRwLock,
16    RwLockUpgradableReadGuardArc as AsyncRwLockUpgradableReadGuard,
17    RwLockWriteGuardArc as AsyncRwLockWriteGuard,
18};
19use async_trait::async_trait;
20use futures::{FutureExt, Stream, StreamExt, select};
21use std::pin::Pin;
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24use tokio::sync::watch;
25use tokio_stream::wrappers::WatchStream;
26use tracing::{info, trace, warn};
27
28const SEGMENT_HEADERS_SYNC_INTERVAL: Duration = Duration::from_secs(1);
29const FARMER_APP_INFO_DEDUPLICATION_WINDOW: Duration = Duration::from_secs(1);
30
31#[derive(Debug, Default)]
32struct SuperSegmentHeaders {
33    super_segment_headers: Vec<SuperSegmentHeader>,
34    last_synced: Option<Instant>,
35}
36
37impl SuperSegmentHeaders {
38    /// Push a new super segment header to the cache if it is the next super segment header.
39    /// Otherwise, skip the push.
40    fn push(&mut self, new_segment_header: SuperSegmentHeader) {
41        if self.super_segment_headers.len() as u64 == u64::from(new_segment_header.index.as_inner())
42        {
43            self.super_segment_headers.push(new_segment_header);
44        }
45    }
46
47    /// Get cached super segment headers for the given super segment indices.
48    ///
49    /// Returns `None` for super segment indices that are not in the cache.
50    fn get_super_segment_headers(
51        &self,
52        super_segment_indices: &[SuperSegmentIndex],
53    ) -> Vec<Option<SuperSegmentHeader>> {
54        super_segment_indices
55            .iter()
56            .map(|super_segment_index| {
57                self.super_segment_headers
58                    .get(u64::from(*super_segment_index) as usize)
59                    .copied()
60            })
61            .collect::<Vec<_>>()
62    }
63
64    /// Get the last `limit` super segment headers from the cache
65    fn last_super_segment_headers(&self, limit: u32) -> Vec<Option<SuperSegmentHeader>> {
66        self.super_segment_headers
67            .iter()
68            .rev()
69            .take(limit as usize)
70            .rev()
71            .copied()
72            .map(Some)
73            .collect()
74    }
75
76    // TODO: Maybe caching or more compact storage that points segment indices to super segments
77    //  when this is called thousands of times during replotting?
78    fn super_segment_root_for_segment_index(
79        &self,
80        segment_index: SegmentIndex,
81    ) -> Option<SuperSegmentRoot> {
82        let index = self
83            .super_segment_headers
84            .binary_search_by_key(&segment_index, |super_segment_header| {
85                super_segment_header.max_segment_index.as_inner()
86            })
87            .unwrap_or_else(|insert_index| insert_index);
88
89        let super_segment_header = self.super_segment_headers.get(index).copied()?;
90
91        let max_segment_index = super_segment_header.max_segment_index.as_inner();
92        let first_segment_index = max_segment_index
93            - SegmentIndex::from(u64::from(super_segment_header.num_segments))
94            + SegmentIndex::ONE;
95
96        (first_segment_index..=max_segment_index)
97            .contains(&segment_index)
98            .then_some(super_segment_header.root)
99    }
100
101    /// Get uncached headers from the node if we're not rate-limited.
102    /// This only requires a read lock.
103    ///
104    /// Returns any extra super segment headers if the download succeeds, or an error if it fails.
105    /// The caller must write the returned super segment headers to the cache and reset the sync
106    /// rate-limit timer.
107    async fn request_uncached_headers<NC>(
108        &self,
109        client: &NC,
110    ) -> anyhow::Result<Vec<SuperSegmentHeader>>
111    where
112        NC: NodeClient,
113    {
114        // Skip the sync if we're still within the sync rate limit.
115        if let Some(last_synced) = &self.last_synced
116            && last_synced.elapsed() < SEGMENT_HEADERS_SYNC_INTERVAL
117        {
118            return Ok(Vec::new());
119        }
120
121        let mut extra_super_segment_headers = Vec::new();
122        let mut super_segment_index_offset =
123            SuperSegmentIndex::from(self.super_segment_headers.len() as u64);
124        let segment_index_step =
125            SuperSegmentIndex::from(MAX_SUPER_SEGMENT_HEADERS_PER_REQUEST as u64);
126
127        'outer: loop {
128            let from = super_segment_index_offset;
129            let to = super_segment_index_offset + segment_index_step;
130            trace!(%from, %to, "Requesting super segment headers");
131
132            for maybe_super_segment_header in client
133                .super_segment_headers((from..to).collect::<Vec<_>>())
134                .await
135                .map_err(|error| {
136                    anyhow::anyhow!(
137                        "Failed to download super segment headers {from}..{to} from node: {error}"
138                    )
139                })?
140            {
141                let Some(super_segment_header) = maybe_super_segment_header else {
142                    // Reached non-existent super segment header
143                    break 'outer;
144                };
145
146                extra_super_segment_headers.push(super_segment_header);
147            }
148
149            super_segment_index_offset += segment_index_step;
150        }
151
152        Ok(extra_super_segment_headers)
153    }
154
155    /// Write the sync results to the cache, and reset the sync rate-limit timer.
156    fn write_cache(&mut self, extra_super_segment_headers: Vec<SuperSegmentHeader>) {
157        for super_segment_header in extra_super_segment_headers {
158            self.push(super_segment_header);
159        }
160        self.last_synced.replace(Instant::now());
161    }
162}
163
164/// Node client wrapper around another node client that caches some data for better performance and
165/// proxies other requests through.
166///
167/// NOTE: Archived segment acknowledgement is ignored in this client, all subscriptions are
168/// acknowledged implicitly and immediately.
169/// NOTE: Subscription messages that are not processed in time will be skipped for performance
170/// reasons!
171#[derive(Debug, Clone)]
172pub struct CachingProxyNodeClient<NC> {
173    inner: NC,
174    slot_info_receiver: watch::Receiver<Option<SlotInfo>>,
175    new_super_segment_headers_receiver: watch::Receiver<Option<SuperSegmentHeader>>,
176    block_sealing_receiver: watch::Receiver<Option<BlockSealInfo>>,
177    super_segment_headers: Arc<AsyncRwLock<SuperSegmentHeaders>>,
178    last_farmer_app_info: Arc<AsyncMutex<(FarmerAppInfo, Instant)>>,
179    _background_task: Arc<AsyncJoinOnDrop<()>>,
180}
181
182impl<NC> CachingProxyNodeClient<NC>
183where
184    NC: NodeClient + Clone,
185{
186    /// Create a new instance
187    pub async fn new(client: NC) -> anyhow::Result<Self> {
188        let mut super_segment_headers = SuperSegmentHeaders::default();
189        let mut new_super_segments_notifications =
190            client.subscribe_new_super_segment_headers().await?;
191
192        info!("Downloading all super segment headers from node...");
193        // No locking is needed, we are the first and only instance right now.
194        let headers = super_segment_headers
195            .request_uncached_headers(&client)
196            .await?;
197        super_segment_headers.write_cache(headers);
198        info!("Downloaded all super segment headers from node successfully");
199
200        let super_segment_headers = Arc::new(AsyncRwLock::new(super_segment_headers));
201
202        let (slot_info_sender, slot_info_receiver) = watch::channel(None::<SlotInfo>);
203        let slot_info_proxy_fut = {
204            let mut slot_info_subscription = client.subscribe_slot_info().await?;
205
206            async move {
207                let mut last_slot_number = None;
208                while let Some(slot_info) = slot_info_subscription.next().await {
209                    if let Some(last_slot_number) = last_slot_number
210                        && last_slot_number >= slot_info.slot
211                    {
212                        continue;
213                    }
214                    last_slot_number.replace(slot_info.slot);
215
216                    if let Err(error) = slot_info_sender.send(Some(slot_info)) {
217                        warn!(%error, "Failed to proxy slot info notification");
218                        return;
219                    }
220                }
221            }
222        };
223
224        let (new_super_segment_headers_sender, new_super_segment_headers_receiver) =
225            watch::channel(None::<SuperSegmentHeader>);
226        let super_segment_headers_maintenance_fut = {
227            let super_segment_headers = Arc::clone(&super_segment_headers);
228
229            async move {
230                let mut last_super_segment_index = None;
231                while let Some(new_segment_header) = new_super_segments_notifications.next().await {
232                    let super_segment_index = new_segment_header.index;
233                    trace!(?new_segment_header, "New super segment header notification");
234
235                    if let Some(last_super_segment_index) = last_super_segment_index
236                        && last_super_segment_index >= super_segment_index
237                    {
238                        continue;
239                    }
240                    last_super_segment_index.replace(super_segment_index);
241
242                    super_segment_headers.write().await.push(new_segment_header);
243
244                    if let Err(error) =
245                        new_super_segment_headers_sender.send(Some(new_segment_header))
246                    {
247                        warn!(%error, "Failed to proxy new super segment header notification");
248                        return;
249                    }
250                }
251            }
252        };
253
254        let (block_sealing_sender, block_sealing_receiver) = watch::channel(None::<BlockSealInfo>);
255        let block_sealing_proxy_fut = {
256            let mut block_sealing_subscription = client.subscribe_block_sealing().await?;
257
258            async move {
259                while let Some(block_sealing_info) = block_sealing_subscription.next().await {
260                    if let Err(error) = block_sealing_sender.send(Some(block_sealing_info)) {
261                        warn!(%error, "Failed to proxy block sealing notification");
262                        return;
263                    }
264                }
265            }
266        };
267
268        let farmer_app_info = client
269            .farmer_app_info()
270            .await
271            .map_err(|error| anyhow::anyhow!("Failed to get farmer app info: {error}"))?;
272        let last_farmer_app_info = Arc::new(AsyncMutex::new((farmer_app_info, Instant::now())));
273
274        let background_task = tokio::spawn(async move {
275            select! {
276                _ = slot_info_proxy_fut.fuse() => {},
277                _ = super_segment_headers_maintenance_fut.fuse() => {},
278                _ = block_sealing_proxy_fut.fuse() => {},
279            }
280        });
281
282        let node_client = Self {
283            inner: client,
284            slot_info_receiver,
285            new_super_segment_headers_receiver,
286            block_sealing_receiver,
287            super_segment_headers,
288            last_farmer_app_info,
289            _background_task: Arc::new(AsyncJoinOnDrop::new(background_task, true)),
290        };
291
292        Ok(node_client)
293    }
294}
295
296#[async_trait]
297impl<NC> NodeClient for CachingProxyNodeClient<NC>
298where
299    NC: NodeClient,
300{
301    async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
302        let (last_farmer_app_info, last_farmer_app_info_request) =
303            &mut *self.last_farmer_app_info.lock().await;
304
305        if last_farmer_app_info_request.elapsed() > FARMER_APP_INFO_DEDUPLICATION_WINDOW {
306            let new_last_farmer_app_info = self.inner.farmer_app_info().await?;
307
308            *last_farmer_app_info = new_last_farmer_app_info;
309            *last_farmer_app_info_request = Instant::now();
310        }
311
312        Ok(last_farmer_app_info.clone())
313    }
314
315    async fn subscribe_slot_info(
316        &self,
317    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
318        Ok(Box::pin(
319            WatchStream::new(self.slot_info_receiver.clone())
320                .filter_map(|maybe_slot_info| async move { maybe_slot_info }),
321        ))
322    }
323
324    async fn submit_solution_response(
325        &self,
326        solution_response: SolutionResponse,
327    ) -> anyhow::Result<()> {
328        self.inner.submit_solution_response(solution_response).await
329    }
330
331    async fn subscribe_block_sealing(
332        &self,
333    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = BlockSealInfo> + Send + 'static>>> {
334        Ok(Box::pin(
335            WatchStream::new(self.block_sealing_receiver.clone())
336                .filter_map(|maybe_block_sealing_info| async move { maybe_block_sealing_info }),
337        ))
338    }
339
340    async fn submit_block_seal(&self, block_seal: BlockSealResponse) -> anyhow::Result<()> {
341        self.inner.submit_block_seal(block_seal).await
342    }
343
344    async fn subscribe_new_super_segment_headers(
345        &self,
346    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SuperSegmentHeader> + Send + 'static>>> {
347        Ok(Box::pin(
348            WatchStream::new(self.new_super_segment_headers_receiver.clone())
349                .filter_map(|maybe_super_segment_header| async move { maybe_super_segment_header }),
350        ))
351    }
352
353    async fn super_segment_headers(
354        &self,
355        super_segment_indices: Vec<SuperSegmentIndex>,
356    ) -> anyhow::Result<Vec<Option<SuperSegmentHeader>>> {
357        let retrieved_super_segment_headers = self
358            .super_segment_headers
359            .read()
360            .await
361            .get_super_segment_headers(&super_segment_indices);
362
363        if retrieved_super_segment_headers.iter().all(Option::is_some) {
364            return Ok(retrieved_super_segment_headers);
365        }
366
367        // We might be missing a requested super segment header.
368        // Sync the cache with the node, apply a rate limit, and return cached super segment
369        // headers.
370
371        // If we took a write lock here, a queue of writers could starve all the readers, even if
372        // those writers are rate-limited. So we take an upgradable read lock for the rate limit
373        // check.
374        let super_segment_headers = self.super_segment_headers.upgradable_read_arc().await;
375
376        // Try again after acquiring the upgradeable read lock, in case another caller already
377        // synced the headers
378        let retrieved_super_segment_headers =
379            super_segment_headers.get_super_segment_headers(&super_segment_indices);
380        if retrieved_super_segment_headers.iter().all(Option::is_some) {
381            return Ok(retrieved_super_segment_headers);
382        }
383
384        // Try to sync the cache with the node
385        let extra_super_segment_headers = super_segment_headers
386            .request_uncached_headers(&self.inner)
387            .await?;
388
389        if extra_super_segment_headers.is_empty() {
390            // No extra super segment headers on the node, or we are rate-limited, so just return
391            // what is in the cache
392            return Ok(retrieved_super_segment_headers);
393        }
394
395        // Need to update the cached super segment headers, so take the write lock
396        let mut super_segment_headers =
397            AsyncRwLockUpgradableReadGuard::upgrade(super_segment_headers).await;
398        super_segment_headers.write_cache(extra_super_segment_headers);
399
400        // Downgrade the write lock to a read lock to get the updated super segment headers for the
401        // query
402        Ok(AsyncRwLockWriteGuard::downgrade(super_segment_headers)
403            .get_super_segment_headers(&super_segment_indices))
404    }
405
406    async fn super_segment_root_for_segment_index(
407        &self,
408        segment_index: SegmentIndex,
409    ) -> anyhow::Result<Option<SuperSegmentRoot>> {
410        Ok(self
411            .super_segment_headers
412            .read()
413            .await
414            .super_segment_root_for_segment_index(segment_index))
415    }
416
417    async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
418        self.inner.piece(piece_index).await
419    }
420
421    async fn update_shard_membership_info(
422        &self,
423        info: FarmerShardMembershipInfo,
424    ) -> anyhow::Result<()> {
425        self.inner.update_shard_membership_info(info).await
426    }
427}
428
429#[async_trait]
430impl<NC> NodeClientExt for CachingProxyNodeClient<NC>
431where
432    NC: NodeClientExt,
433{
434    async fn cached_super_segment_headers(
435        &self,
436        super_segment_indices: Vec<SuperSegmentIndex>,
437    ) -> anyhow::Result<Vec<Option<SuperSegmentHeader>>> {
438        // To avoid remote denial of service, we don't update the cache here because it is called
439        // from network code
440        Ok(self
441            .super_segment_headers
442            .read()
443            .await
444            .get_super_segment_headers(&super_segment_indices))
445    }
446
447    async fn last_super_segment_headers(
448        &self,
449        limit: u32,
450    ) -> anyhow::Result<Vec<Option<SuperSegmentHeader>>> {
451        Ok(self
452            .super_segment_headers
453            .read()
454            .await
455            .last_super_segment_headers(limit))
456    }
457}