Skip to main content

ab_farmer/node_client/
caching_proxy_node_client.rs

1//! Node client wrapper around another node client that caches some data for better performance and
2//! proxies other requests through
3
4use crate::node_client::{NodeClient, NodeClientExt};
5use crate::utils::AsyncJoinOnDrop;
6use ab_core_primitives::pieces::{Piece, PieceIndex};
7use ab_core_primitives::segments::{
8    SegmentHeader, SegmentIndex, SuperSegmentHeader, SuperSegmentIndex,
9};
10use ab_farmer_rpc_primitives::{
11    BlockSealInfo, BlockSealResponse, FarmerAppInfo, FarmerShardMembershipInfo,
12    MAX_SEGMENT_HEADERS_PER_REQUEST, SlotInfo, SolutionResponse,
13};
14use async_lock::{
15    Mutex as AsyncMutex, RwLock as AsyncRwLock,
16    RwLockUpgradableReadGuardArc as AsyncRwLockUpgradableReadGuard,
17    RwLockWriteGuardArc as AsyncRwLockWriteGuard,
18};
19use async_trait::async_trait;
20use futures::{FutureExt, Stream, StreamExt, select};
21use std::pin::Pin;
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24use tokio::sync::watch;
25use tokio_stream::wrappers::WatchStream;
26use tracing::{info, trace, warn};
27
28const SEGMENT_HEADERS_SYNC_INTERVAL: Duration = Duration::from_secs(1);
29const FARMER_APP_INFO_DEDUPLICATION_WINDOW: Duration = Duration::from_secs(1);
30
31#[derive(Debug, Default)]
32struct SegmentHeaders {
33    segment_headers: Vec<SegmentHeader>,
34    last_synced: Option<Instant>,
35}
36
37impl SegmentHeaders {
38    /// Push a new segment header to the cache, if it is the next segment header.
39    /// Otherwise, skip the push.
40    fn push(&mut self, archived_segment_header: SegmentHeader) {
41        if self.segment_headers.len()
42            == u64::from(archived_segment_header.local_segment_index()) as usize
43        {
44            self.segment_headers.push(archived_segment_header);
45        }
46    }
47
48    /// Get cached segment headers for the given segment indices.
49    ///
50    /// Returns `None` for segment indices that are not in the cache.
51    fn get_segment_headers(&self, segment_indices: &[SegmentIndex]) -> Vec<Option<SegmentHeader>> {
52        segment_indices
53            .iter()
54            .map(|segment_index| {
55                self.segment_headers
56                    .get(u64::from(*segment_index) as usize)
57                    .copied()
58            })
59            .collect::<Vec<_>>()
60    }
61
62    /// Get the last `limit` segment headers from the cache.
63    fn last_segment_headers(&self, limit: u32) -> Vec<Option<SegmentHeader>> {
64        self.segment_headers
65            .iter()
66            .rev()
67            .take(limit as usize)
68            .rev()
69            .copied()
70            .map(Some)
71            .collect()
72    }
73
74    /// Get uncached headers from the node, if we're not rate-limited.
75    /// This only requires a read lock.
76    ///
77    /// Returns any extra segment headers if the download succeeds, or an error if it fails.
78    /// The caller must write the returned segment headers to the cache, and reset the sync
79    /// rate-limit timer.
80    async fn request_uncached_headers<NC>(&self, client: &NC) -> anyhow::Result<Vec<SegmentHeader>>
81    where
82        NC: NodeClient,
83    {
84        // Skip the sync if we're still within the sync rate limit.
85        if let Some(last_synced) = &self.last_synced
86            && last_synced.elapsed() < SEGMENT_HEADERS_SYNC_INTERVAL
87        {
88            return Ok(Vec::new());
89        }
90
91        let mut extra_segment_headers = Vec::new();
92        let mut segment_index_offset = SegmentIndex::from(self.segment_headers.len() as u64);
93        let segment_index_step = SegmentIndex::from(MAX_SEGMENT_HEADERS_PER_REQUEST as u64);
94
95        'outer: loop {
96            let from = segment_index_offset;
97            let to = segment_index_offset + segment_index_step;
98            trace!(%from, %to, "Requesting segment headers");
99
100            for maybe_segment_header in client
101                .segment_headers((from..to).collect::<Vec<_>>())
102                .await
103                .map_err(|error| {
104                    anyhow::anyhow!(
105                        "Failed to download segment headers {from}..{to} from node: {error}"
106                    )
107                })?
108            {
109                let Some(segment_header) = maybe_segment_header else {
110                    // Reached non-existent segment header
111                    break 'outer;
112                };
113
114                extra_segment_headers.push(segment_header);
115            }
116
117            segment_index_offset += segment_index_step;
118        }
119
120        Ok(extra_segment_headers)
121    }
122
123    /// Write the sync results to the cache, and reset the sync rate-limit timer.
124    fn write_cache(&mut self, extra_segment_headers: Vec<SegmentHeader>) {
125        for segment_header in extra_segment_headers {
126            self.push(segment_header);
127        }
128        self.last_synced.replace(Instant::now());
129    }
130}
131
132/// Node client wrapper around another node client that caches some data for better performance and
133/// proxies other requests through.
134///
135/// NOTE: Archived segment acknowledgement is ignored in this client, all subscriptions are
136/// acknowledged implicitly and immediately.
137/// NOTE: Subscription messages that are not processed in time will be skipped for performance
138/// reasons!
139#[derive(Debug, Clone)]
140pub struct CachingProxyNodeClient<NC> {
141    inner: NC,
142    slot_info_receiver: watch::Receiver<Option<SlotInfo>>,
143    archived_segment_headers_receiver: watch::Receiver<Option<SegmentHeader>>,
144    block_sealing_receiver: watch::Receiver<Option<BlockSealInfo>>,
145    segment_headers: Arc<AsyncRwLock<SegmentHeaders>>,
146    last_farmer_app_info: Arc<AsyncMutex<(FarmerAppInfo, Instant)>>,
147    _background_task: Arc<AsyncJoinOnDrop<()>>,
148}
149
150impl<NC> CachingProxyNodeClient<NC>
151where
152    NC: NodeClient + Clone,
153{
154    /// Create a new instance
155    pub async fn new(client: NC) -> anyhow::Result<Self> {
156        let mut segment_headers = SegmentHeaders::default();
157        let mut archived_segments_notifications =
158            client.subscribe_archived_segment_headers().await?;
159
160        info!("Downloading all segment headers from node...");
161        // No locking is needed, we are the first and only instance right now.
162        let headers = segment_headers.request_uncached_headers(&client).await?;
163        segment_headers.write_cache(headers);
164        info!("Downloaded all segment headers from node successfully");
165
166        let segment_headers = Arc::new(AsyncRwLock::new(segment_headers));
167
168        let (slot_info_sender, slot_info_receiver) = watch::channel(None::<SlotInfo>);
169        let slot_info_proxy_fut = {
170            let mut slot_info_subscription = client.subscribe_slot_info().await?;
171
172            async move {
173                let mut last_slot_number = None;
174                while let Some(slot_info) = slot_info_subscription.next().await {
175                    if let Some(last_slot_number) = last_slot_number
176                        && last_slot_number >= slot_info.slot
177                    {
178                        continue;
179                    }
180                    last_slot_number.replace(slot_info.slot);
181
182                    if let Err(error) = slot_info_sender.send(Some(slot_info)) {
183                        warn!(%error, "Failed to proxy slot info notification");
184                        return;
185                    }
186                }
187            }
188        };
189
190        let (archived_segment_headers_sender, archived_segment_headers_receiver) =
191            watch::channel(None::<SegmentHeader>);
192        let segment_headers_maintenance_fut = {
193            let client = client.clone();
194            let segment_headers = Arc::clone(&segment_headers);
195
196            async move {
197                let mut last_archived_segment_index = None;
198                while let Some(archived_segment_header) =
199                    archived_segments_notifications.next().await
200                {
201                    let segment_index =
202                        SegmentIndex::from(archived_segment_header.local_segment_index());
203                    trace!(
204                        ?archived_segment_header,
205                        "New archived archived segment header notification"
206                    );
207
208                    while let Err(error) = client
209                        .acknowledge_archived_segment_header(segment_index)
210                        .await
211                    {
212                        warn!(
213                            %error,
214                            "Failed to acknowledge archived segment header, trying again"
215                        );
216                    }
217
218                    if let Some(last_archived_segment_index) = last_archived_segment_index
219                        && last_archived_segment_index >= segment_index
220                    {
221                        continue;
222                    }
223                    last_archived_segment_index.replace(segment_index);
224
225                    segment_headers.write().await.push(archived_segment_header);
226
227                    if let Err(error) =
228                        archived_segment_headers_sender.send(Some(archived_segment_header))
229                    {
230                        warn!(%error, "Failed to proxy archived segment header notification");
231                        return;
232                    }
233                }
234            }
235        };
236
237        let (block_sealing_sender, block_sealing_receiver) = watch::channel(None::<BlockSealInfo>);
238        let block_sealing_proxy_fut = {
239            let mut block_sealing_subscription = client.subscribe_block_sealing().await?;
240
241            async move {
242                while let Some(block_sealing_info) = block_sealing_subscription.next().await {
243                    if let Err(error) = block_sealing_sender.send(Some(block_sealing_info)) {
244                        warn!(%error, "Failed to proxy block sealing notification");
245                        return;
246                    }
247                }
248            }
249        };
250
251        let farmer_app_info = client
252            .farmer_app_info()
253            .await
254            .map_err(|error| anyhow::anyhow!("Failed to get farmer app info: {error}"))?;
255        let last_farmer_app_info = Arc::new(AsyncMutex::new((farmer_app_info, Instant::now())));
256
257        let background_task = tokio::spawn(async move {
258            select! {
259                _ = slot_info_proxy_fut.fuse() => {},
260                _ = segment_headers_maintenance_fut.fuse() => {},
261                _ = block_sealing_proxy_fut.fuse() => {},
262            }
263        });
264
265        let node_client = Self {
266            inner: client,
267            slot_info_receiver,
268            archived_segment_headers_receiver,
269            block_sealing_receiver,
270            segment_headers,
271            last_farmer_app_info,
272            _background_task: Arc::new(AsyncJoinOnDrop::new(background_task, true)),
273        };
274
275        Ok(node_client)
276    }
277}
278
279#[async_trait]
280impl<NC> NodeClient for CachingProxyNodeClient<NC>
281where
282    NC: NodeClient,
283{
284    async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
285        let (last_farmer_app_info, last_farmer_app_info_request) =
286            &mut *self.last_farmer_app_info.lock().await;
287
288        if last_farmer_app_info_request.elapsed() > FARMER_APP_INFO_DEDUPLICATION_WINDOW {
289            let new_last_farmer_app_info = self.inner.farmer_app_info().await?;
290
291            *last_farmer_app_info = new_last_farmer_app_info;
292            *last_farmer_app_info_request = Instant::now();
293        }
294
295        Ok(last_farmer_app_info.clone())
296    }
297
298    async fn subscribe_slot_info(
299        &self,
300    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
301        Ok(Box::pin(
302            WatchStream::new(self.slot_info_receiver.clone())
303                .filter_map(|maybe_slot_info| async move { maybe_slot_info }),
304        ))
305    }
306
307    async fn submit_solution_response(
308        &self,
309        solution_response: SolutionResponse,
310    ) -> anyhow::Result<()> {
311        self.inner.submit_solution_response(solution_response).await
312    }
313
314    async fn subscribe_block_sealing(
315        &self,
316    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = BlockSealInfo> + Send + 'static>>> {
317        Ok(Box::pin(
318            WatchStream::new(self.block_sealing_receiver.clone())
319                .filter_map(|maybe_block_sealing_info| async move { maybe_block_sealing_info }),
320        ))
321    }
322
323    async fn submit_block_seal(&self, block_seal: BlockSealResponse) -> anyhow::Result<()> {
324        self.inner.submit_block_seal(block_seal).await
325    }
326
327    async fn subscribe_archived_segment_headers(
328        &self,
329    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> {
330        Ok(Box::pin(
331            WatchStream::new(self.archived_segment_headers_receiver.clone())
332                .filter_map(|maybe_segment_header| async move { maybe_segment_header }),
333        ))
334    }
335
336    async fn super_segment_headers(
337        &self,
338        super_segment_indices: Vec<SuperSegmentIndex>,
339    ) -> anyhow::Result<Vec<Option<SuperSegmentHeader>>> {
340        // TODO: Local caching for super segment headers like for regular segment headers
341        self.inner
342            .super_segment_headers(super_segment_indices)
343            .await
344    }
345
346    /// Gets segment headers for the given segment indices, updating the cache from the node if
347    /// needed.
348    ///
349    /// Returns `None` for segment indices that are not in the cache.
350    async fn segment_headers(
351        &self,
352        segment_indices: Vec<SegmentIndex>,
353    ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
354        let retrieved_segment_headers = self
355            .segment_headers
356            .read()
357            .await
358            .get_segment_headers(&segment_indices);
359
360        if retrieved_segment_headers.iter().all(Option::is_some) {
361            Ok(retrieved_segment_headers)
362        } else {
363            // We might be missing a requested segment header.
364            // Sync the cache with the node, applying a rate limit, and return cached segment
365            // headers.
366
367            // If we took a write lock here, a queue of writers could starve all the readers, even
368            // if those writers would be rate-limited. So we take an upgradable read lock for the
369            // rate limit check.
370            let segment_headers = self.segment_headers.upgradable_read_arc().await;
371
372            // Try again after acquiring the upgradeable read lock, in case another caller already
373            // synced the headers.
374            let retrieved_segment_headers = segment_headers.get_segment_headers(&segment_indices);
375            if retrieved_segment_headers.iter().all(Option::is_some) {
376                return Ok(retrieved_segment_headers);
377            }
378
379            // Try to sync the cache with the node.
380            let extra_segment_headers = segment_headers
381                .request_uncached_headers(&self.inner)
382                .await?;
383
384            if extra_segment_headers.is_empty() {
385                // No extra segment headers on the node, or we are rate-limited.
386                // So just return what we have in the cache.
387                return Ok(retrieved_segment_headers);
388            }
389
390            // We need to update the cached segment headers, so take the write lock.
391            let mut segment_headers =
392                AsyncRwLockUpgradableReadGuard::upgrade(segment_headers).await;
393            segment_headers.write_cache(extra_segment_headers);
394
395            // Downgrade the write lock to a read lock to get the updated segment headers for the
396            // query.
397            Ok(AsyncRwLockWriteGuard::downgrade(segment_headers)
398                .get_segment_headers(&segment_indices))
399        }
400    }
401
402    async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
403        self.inner.piece(piece_index).await
404    }
405
406    async fn acknowledge_archived_segment_header(
407        &self,
408        _segment_index: SegmentIndex,
409    ) -> anyhow::Result<()> {
410        // Not supported
411        Ok(())
412    }
413
414    async fn update_shard_membership_info(
415        &self,
416        info: FarmerShardMembershipInfo,
417    ) -> anyhow::Result<()> {
418        self.inner.update_shard_membership_info(info).await
419    }
420}
421
422#[async_trait]
423impl<NC> NodeClientExt for CachingProxyNodeClient<NC>
424where
425    NC: NodeClientExt,
426{
427    async fn cached_segment_headers(
428        &self,
429        segment_indices: Vec<SegmentIndex>,
430    ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
431        // To avoid remote denial of service, we don't update the cache here, because it is called
432        // from network code.
433        Ok(self
434            .segment_headers
435            .read()
436            .await
437            .get_segment_headers(&segment_indices))
438    }
439
440    async fn last_segment_headers(&self, limit: u32) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
441        Ok(self
442            .segment_headers
443            .read()
444            .await
445            .last_segment_headers(limit))
446    }
447}