Skip to main content

ab_farmer/node_client/
caching_proxy_node_client.rs

1//! Node client wrapper around another node client that caches some data for better performance and
2//! proxies other requests through
3
4use crate::node_client::{NodeClient, NodeClientExt};
5use crate::utils::AsyncJoinOnDrop;
6use ab_core_primitives::pieces::{Piece, PieceIndex};
7use ab_core_primitives::segments::{SegmentHeader, SegmentIndex};
8use ab_farmer_rpc_primitives::{
9    BlockSealInfo, BlockSealResponse, FarmerAppInfo, FarmerShardMembershipInfo,
10    MAX_SEGMENT_HEADERS_PER_REQUEST, SlotInfo, SolutionResponse,
11};
12use async_lock::{
13    Mutex as AsyncMutex, RwLock as AsyncRwLock,
14    RwLockUpgradableReadGuardArc as AsyncRwLockUpgradableReadGuard,
15    RwLockWriteGuardArc as AsyncRwLockWriteGuard,
16};
17use async_trait::async_trait;
18use futures::{FutureExt, Stream, StreamExt, select};
19use std::pin::Pin;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use tokio::sync::watch;
23use tokio_stream::wrappers::WatchStream;
24use tracing::{info, trace, warn};
25
26const SEGMENT_HEADERS_SYNC_INTERVAL: Duration = Duration::from_secs(1);
27const FARMER_APP_INFO_DEDUPLICATION_WINDOW: Duration = Duration::from_secs(1);
28
29#[derive(Debug, Default)]
30struct SegmentHeaders {
31    segment_headers: Vec<SegmentHeader>,
32    last_synced: Option<Instant>,
33}
34
35impl SegmentHeaders {
36    /// Push a new segment header to the cache, if it is the next segment header.
37    /// Otherwise, skip the push.
38    fn push(&mut self, archived_segment_header: SegmentHeader) {
39        if self.segment_headers.len()
40            == u64::from(archived_segment_header.local_segment_index()) as usize
41        {
42            self.segment_headers.push(archived_segment_header);
43        }
44    }
45
46    /// Get cached segment headers for the given segment indices.
47    ///
48    /// Returns `None` for segment indices that are not in the cache.
49    fn get_segment_headers(&self, segment_indices: &[SegmentIndex]) -> Vec<Option<SegmentHeader>> {
50        segment_indices
51            .iter()
52            .map(|segment_index| {
53                self.segment_headers
54                    .get(u64::from(*segment_index) as usize)
55                    .copied()
56            })
57            .collect::<Vec<_>>()
58    }
59
60    /// Get the last `limit` segment headers from the cache.
61    fn last_segment_headers(&self, limit: u32) -> Vec<Option<SegmentHeader>> {
62        self.segment_headers
63            .iter()
64            .rev()
65            .take(limit as usize)
66            .rev()
67            .copied()
68            .map(Some)
69            .collect()
70    }
71
72    /// Get uncached headers from the node, if we're not rate-limited.
73    /// This only requires a read lock.
74    ///
75    /// Returns any extra segment headers if the download succeeds, or an error if it fails.
76    /// The caller must write the returned segment headers to the cache, and reset the sync
77    /// rate-limit timer.
78    async fn request_uncached_headers<NC>(&self, client: &NC) -> anyhow::Result<Vec<SegmentHeader>>
79    where
80        NC: NodeClient,
81    {
82        // Skip the sync if we're still within the sync rate limit.
83        if let Some(last_synced) = &self.last_synced
84            && last_synced.elapsed() < SEGMENT_HEADERS_SYNC_INTERVAL
85        {
86            return Ok(Vec::new());
87        }
88
89        let mut extra_segment_headers = Vec::new();
90        let mut segment_index_offset = SegmentIndex::from(self.segment_headers.len() as u64);
91        let segment_index_step = SegmentIndex::from(MAX_SEGMENT_HEADERS_PER_REQUEST as u64);
92
93        'outer: loop {
94            let from = segment_index_offset;
95            let to = segment_index_offset + segment_index_step;
96            trace!(%from, %to, "Requesting segment headers");
97
98            for maybe_segment_header in client
99                .segment_headers((from..to).collect::<Vec<_>>())
100                .await
101                .map_err(|error| {
102                    anyhow::anyhow!(
103                        "Failed to download segment headers {from}..{to} from node: {error}"
104                    )
105                })?
106            {
107                let Some(segment_header) = maybe_segment_header else {
108                    // Reached non-existent segment header
109                    break 'outer;
110                };
111
112                extra_segment_headers.push(segment_header);
113            }
114
115            segment_index_offset += segment_index_step;
116        }
117
118        Ok(extra_segment_headers)
119    }
120
121    /// Write the sync results to the cache, and reset the sync rate-limit timer.
122    fn write_cache(&mut self, extra_segment_headers: Vec<SegmentHeader>) {
123        for segment_header in extra_segment_headers {
124            self.push(segment_header);
125        }
126        self.last_synced.replace(Instant::now());
127    }
128}
129
130/// Node client wrapper around another node client that caches some data for better performance and
131/// proxies other requests through.
132///
133/// NOTE: Archived segment acknowledgement is ignored in this client, all subscriptions are
134/// acknowledged implicitly and immediately.
135/// NOTE: Subscription messages that are not processed in time will be skipped for performance
136/// reasons!
137#[derive(Debug, Clone)]
138pub struct CachingProxyNodeClient<NC> {
139    inner: NC,
140    slot_info_receiver: watch::Receiver<Option<SlotInfo>>,
141    archived_segment_headers_receiver: watch::Receiver<Option<SegmentHeader>>,
142    block_sealing_receiver: watch::Receiver<Option<BlockSealInfo>>,
143    segment_headers: Arc<AsyncRwLock<SegmentHeaders>>,
144    last_farmer_app_info: Arc<AsyncMutex<(FarmerAppInfo, Instant)>>,
145    _background_task: Arc<AsyncJoinOnDrop<()>>,
146}
147
148impl<NC> CachingProxyNodeClient<NC>
149where
150    NC: NodeClient + Clone,
151{
152    /// Create a new instance
153    pub async fn new(client: NC) -> anyhow::Result<Self> {
154        let mut segment_headers = SegmentHeaders::default();
155        let mut archived_segments_notifications =
156            client.subscribe_archived_segment_headers().await?;
157
158        info!("Downloading all segment headers from node...");
159        // No locking is needed, we are the first and only instance right now.
160        let headers = segment_headers.request_uncached_headers(&client).await?;
161        segment_headers.write_cache(headers);
162        info!("Downloaded all segment headers from node successfully");
163
164        let segment_headers = Arc::new(AsyncRwLock::new(segment_headers));
165
166        let (slot_info_sender, slot_info_receiver) = watch::channel(None::<SlotInfo>);
167        let slot_info_proxy_fut = {
168            let mut slot_info_subscription = client.subscribe_slot_info().await?;
169
170            async move {
171                let mut last_slot_number = None;
172                while let Some(slot_info) = slot_info_subscription.next().await {
173                    if let Some(last_slot_number) = last_slot_number
174                        && last_slot_number >= slot_info.slot
175                    {
176                        continue;
177                    }
178                    last_slot_number.replace(slot_info.slot);
179
180                    if let Err(error) = slot_info_sender.send(Some(slot_info)) {
181                        warn!(%error, "Failed to proxy slot info notification");
182                        return;
183                    }
184                }
185            }
186        };
187
188        let (archived_segment_headers_sender, archived_segment_headers_receiver) =
189            watch::channel(None::<SegmentHeader>);
190        let segment_headers_maintenance_fut = {
191            let client = client.clone();
192            let segment_headers = Arc::clone(&segment_headers);
193
194            async move {
195                let mut last_archived_segment_index = None;
196                while let Some(archived_segment_header) =
197                    archived_segments_notifications.next().await
198                {
199                    let segment_index =
200                        SegmentIndex::from(archived_segment_header.local_segment_index());
201                    trace!(
202                        ?archived_segment_header,
203                        "New archived archived segment header notification"
204                    );
205
206                    while let Err(error) = client
207                        .acknowledge_archived_segment_header(segment_index)
208                        .await
209                    {
210                        warn!(
211                            %error,
212                            "Failed to acknowledge archived segment header, trying again"
213                        );
214                    }
215
216                    if let Some(last_archived_segment_index) = last_archived_segment_index
217                        && last_archived_segment_index >= segment_index
218                    {
219                        continue;
220                    }
221                    last_archived_segment_index.replace(segment_index);
222
223                    segment_headers.write().await.push(archived_segment_header);
224
225                    if let Err(error) =
226                        archived_segment_headers_sender.send(Some(archived_segment_header))
227                    {
228                        warn!(%error, "Failed to proxy archived segment header notification");
229                        return;
230                    }
231                }
232            }
233        };
234
235        let (block_sealing_sender, block_sealing_receiver) = watch::channel(None::<BlockSealInfo>);
236        let block_sealing_proxy_fut = {
237            let mut block_sealing_subscription = client.subscribe_block_sealing().await?;
238
239            async move {
240                while let Some(block_sealing_info) = block_sealing_subscription.next().await {
241                    if let Err(error) = block_sealing_sender.send(Some(block_sealing_info)) {
242                        warn!(%error, "Failed to proxy block sealing notification");
243                        return;
244                    }
245                }
246            }
247        };
248
249        let farmer_app_info = client
250            .farmer_app_info()
251            .await
252            .map_err(|error| anyhow::anyhow!("Failed to get farmer app info: {error}"))?;
253        let last_farmer_app_info = Arc::new(AsyncMutex::new((farmer_app_info, Instant::now())));
254
255        let background_task = tokio::spawn(async move {
256            select! {
257                _ = slot_info_proxy_fut.fuse() => {},
258                _ = segment_headers_maintenance_fut.fuse() => {},
259                _ = block_sealing_proxy_fut.fuse() => {},
260            }
261        });
262
263        let node_client = Self {
264            inner: client,
265            slot_info_receiver,
266            archived_segment_headers_receiver,
267            block_sealing_receiver,
268            segment_headers,
269            last_farmer_app_info,
270            _background_task: Arc::new(AsyncJoinOnDrop::new(background_task, true)),
271        };
272
273        Ok(node_client)
274    }
275}
276
277#[async_trait]
278impl<NC> NodeClient for CachingProxyNodeClient<NC>
279where
280    NC: NodeClient,
281{
282    async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
283        let (last_farmer_app_info, last_farmer_app_info_request) =
284            &mut *self.last_farmer_app_info.lock().await;
285
286        if last_farmer_app_info_request.elapsed() > FARMER_APP_INFO_DEDUPLICATION_WINDOW {
287            let new_last_farmer_app_info = self.inner.farmer_app_info().await?;
288
289            *last_farmer_app_info = new_last_farmer_app_info;
290            *last_farmer_app_info_request = Instant::now();
291        }
292
293        Ok(last_farmer_app_info.clone())
294    }
295
296    async fn subscribe_slot_info(
297        &self,
298    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
299        Ok(Box::pin(
300            WatchStream::new(self.slot_info_receiver.clone())
301                .filter_map(|maybe_slot_info| async move { maybe_slot_info }),
302        ))
303    }
304
305    async fn submit_solution_response(
306        &self,
307        solution_response: SolutionResponse,
308    ) -> anyhow::Result<()> {
309        self.inner.submit_solution_response(solution_response).await
310    }
311
312    async fn subscribe_block_sealing(
313        &self,
314    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = BlockSealInfo> + Send + 'static>>> {
315        Ok(Box::pin(
316            WatchStream::new(self.block_sealing_receiver.clone())
317                .filter_map(|maybe_block_sealing_info| async move { maybe_block_sealing_info }),
318        ))
319    }
320
321    /// Submit a block seal
322    async fn submit_block_seal(&self, block_seal: BlockSealResponse) -> anyhow::Result<()> {
323        self.inner.submit_block_seal(block_seal).await
324    }
325
326    async fn subscribe_archived_segment_headers(
327        &self,
328    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> {
329        Ok(Box::pin(
330            WatchStream::new(self.archived_segment_headers_receiver.clone())
331                .filter_map(|maybe_segment_header| async move { maybe_segment_header }),
332        ))
333    }
334
335    /// Gets segment headers for the given segment indices, updating the cache from the node if
336    /// needed.
337    ///
338    /// Returns `None` for segment indices that are not in the cache.
339    async fn segment_headers(
340        &self,
341        segment_indices: Vec<SegmentIndex>,
342    ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
343        let retrieved_segment_headers = self
344            .segment_headers
345            .read()
346            .await
347            .get_segment_headers(&segment_indices);
348
349        if retrieved_segment_headers.iter().all(Option::is_some) {
350            Ok(retrieved_segment_headers)
351        } else {
352            // We might be missing a requested segment header.
353            // Sync the cache with the node, applying a rate limit, and return cached segment
354            // headers.
355
356            // If we took a write lock here, a queue of writers could starve all the readers, even
357            // if those writers would be rate-limited. So we take an upgradable read lock for the
358            // rate limit check.
359            let segment_headers = self.segment_headers.upgradable_read_arc().await;
360
361            // Try again after acquiring the upgradeable read lock, in case another caller already
362            // synced the headers.
363            let retrieved_segment_headers = segment_headers.get_segment_headers(&segment_indices);
364            if retrieved_segment_headers.iter().all(Option::is_some) {
365                return Ok(retrieved_segment_headers);
366            }
367
368            // Try to sync the cache with the node.
369            let extra_segment_headers = segment_headers
370                .request_uncached_headers(&self.inner)
371                .await?;
372
373            if extra_segment_headers.is_empty() {
374                // No extra segment headers on the node, or we are rate-limited.
375                // So just return what we have in the cache.
376                return Ok(retrieved_segment_headers);
377            }
378
379            // We need to update the cached segment headers, so take the write lock.
380            let mut segment_headers =
381                AsyncRwLockUpgradableReadGuard::upgrade(segment_headers).await;
382            segment_headers.write_cache(extra_segment_headers);
383
384            // Downgrade the write lock to a read lock to get the updated segment headers for the
385            // query.
386            Ok(AsyncRwLockWriteGuard::downgrade(segment_headers)
387                .get_segment_headers(&segment_indices))
388        }
389    }
390
391    async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
392        self.inner.piece(piece_index).await
393    }
394
395    async fn acknowledge_archived_segment_header(
396        &self,
397        _segment_index: SegmentIndex,
398    ) -> anyhow::Result<()> {
399        // Not supported
400        Ok(())
401    }
402
403    async fn update_shard_membership_info(
404        &self,
405        info: FarmerShardMembershipInfo,
406    ) -> anyhow::Result<()> {
407        self.inner.update_shard_membership_info(info).await
408    }
409}
410
411#[async_trait]
412impl<NC> NodeClientExt for CachingProxyNodeClient<NC>
413where
414    NC: NodeClientExt,
415{
416    async fn cached_segment_headers(
417        &self,
418        segment_indices: Vec<SegmentIndex>,
419    ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
420        // To avoid remote denial of service, we don't update the cache here, because it is called
421        // from network code.
422        Ok(self
423            .segment_headers
424            .read()
425            .await
426            .get_segment_headers(&segment_indices))
427    }
428
429    async fn last_segment_headers(&self, limit: u32) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
430        Ok(self
431            .segment_headers
432            .read()
433            .await
434            .last_segment_headers(limit))
435    }
436}