Skip to main content

ab_farmer/
farmer_piece_getter.rs

1//! Farmer-specific piece getter
2
3use crate::farm::plotted_pieces::PlottedPieces;
4use crate::farmer_cache::FarmerCaches;
5use crate::node_client::NodeClient;
6use ab_core_primitives::pieces::{Piece, PieceIndex};
7use ab_data_retrieval::piece_getter::PieceGetter;
8use ab_networking::utils::multihash::ToMultihash;
9use ab_networking::utils::piece_provider::{PieceProvider, PieceValidator};
10use async_lock::RwLock as AsyncRwLock;
11use async_trait::async_trait;
12use backon::{ExponentialBuilder, Retryable};
13use futures::channel::mpsc;
14use futures::future::FusedFuture;
15use futures::stream::FuturesUnordered;
16use futures::{FutureExt, Stream, StreamExt, stream};
17use std::fmt;
18use std::hash::Hash;
19use std::pin::Pin;
20use std::sync::atomic::{AtomicU32, Ordering};
21use std::sync::{Arc, Weak};
22use std::task::{Context, Poll};
23use tracing::{debug, error, trace};
24
25pub mod piece_validator;
26
27const MAX_RANDOM_WALK_ROUNDS: usize = 15;
28
29/// Retry policy for getting pieces from DSN cache
30#[derive(Debug)]
31pub struct DsnCacheRetryPolicy {
32    /// Max number of retries when trying to get piece from DSN cache
33    pub max_retries: u16,
34    /// Exponential backoff between retries
35    pub backoff: ExponentialBuilder,
36}
37
38struct Inner<FarmIndex, PV, NC> {
39    piece_provider: PieceProvider<PV>,
40    farmer_caches: FarmerCaches,
41    node_client: NC,
42    plotted_pieces: Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
43    dsn_cache_retry_policy: DsnCacheRetryPolicy,
44}
45
46/// Farmer-specific piece getter.
47///
48/// Implements [`PieceGetter`] for plotting purposes, but useful outside of that as well.
49pub struct FarmerPieceGetter<FarmIndex, PV, NC> {
50    inner: Arc<Inner<FarmIndex, PV, NC>>,
51}
52
53impl<FarmIndex, PV, NC> fmt::Debug for FarmerPieceGetter<FarmIndex, PV, NC> {
54    #[inline]
55    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56        f.debug_struct("FarmerPieceGetter").finish_non_exhaustive()
57    }
58}
59
60impl<FarmIndex, PV, NC> Clone for FarmerPieceGetter<FarmIndex, PV, NC> {
61    #[inline]
62    fn clone(&self) -> Self {
63        Self {
64            inner: Arc::clone(&self.inner),
65        }
66    }
67}
68
69impl<FarmIndex, PV, NC> FarmerPieceGetter<FarmIndex, PV, NC>
70where
71    FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static,
72    usize: From<FarmIndex>,
73    PV: PieceValidator + Send + 'static,
74    NC: NodeClient,
75{
76    /// Create a new instance
77    pub fn new(
78        piece_provider: PieceProvider<PV>,
79        farmer_caches: FarmerCaches,
80        node_client: NC,
81        plotted_pieces: Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
82        dsn_cache_retry_policy: DsnCacheRetryPolicy,
83    ) -> Self {
84        Self {
85            inner: Arc::new(Inner {
86                piece_provider,
87                farmer_caches,
88                node_client,
89                plotted_pieces,
90                dsn_cache_retry_policy,
91            }),
92        }
93    }
94
95    /// Fast way to get piece using various caches
96    pub async fn get_piece_fast(&self, piece_index: PieceIndex) -> Option<Piece> {
97        self.get_piece_fast_internal(piece_index).await
98    }
99
100    async fn get_piece_fast_internal(&self, piece_index: PieceIndex) -> Option<Piece> {
101        let inner = &self.inner;
102
103        trace!(%piece_index, "Getting piece from farmer cache");
104        if let Some(piece) = inner
105            .farmer_caches
106            .get_piece(piece_index.to_multihash())
107            .await
108        {
109            trace!(%piece_index, "Got piece from farmer cache successfully");
110            return Some(piece);
111        }
112
113        // L2 piece acquisition
114        trace!(%piece_index, "Getting piece from DSN L2 cache");
115        if let Some(piece) = inner.piece_provider.get_piece_from_cache(piece_index).await {
116            let added_to_cache = inner
117                .farmer_caches
118                .maybe_store_additional_piece(piece_index, &piece)
119                .await;
120            trace!(%piece_index, %added_to_cache, "Got piece from DSN L2 cache");
121            return Some(piece);
122        }
123
124        // Try node's RPC before reaching to L1 (archival storage on DSN)
125        trace!(%piece_index, "Getting piece from node");
126        match inner.node_client.piece(piece_index).await {
127            Ok(Some(piece)) => {
128                let added_to_cache = inner
129                    .farmer_caches
130                    .maybe_store_additional_piece(piece_index, &piece)
131                    .await;
132                trace!(%piece_index, %added_to_cache, "Got piece from node successfully");
133                return Some(piece);
134            }
135            Ok(None) => {
136                // Nothing to do
137            }
138            Err(error) => {
139                error!(
140                    %error,
141                    %piece_index,
142                    "Failed to retrieve first segment piece from node"
143                );
144            }
145        }
146
147        None
148    }
149
150    /// Slow way to get piece using archival storage
151    pub async fn get_piece_slow(&self, piece_index: PieceIndex) -> Option<Piece> {
152        self.get_piece_slow_internal(piece_index).await
153    }
154
155    /// Slow way to get piece using archival storage
156    async fn get_piece_slow_internal(&self, piece_index: PieceIndex) -> Option<Piece> {
157        let inner = &self.inner;
158
159        trace!(%piece_index, "Getting piece from local plot");
160        let maybe_read_piece_fut = inner
161            .plotted_pieces
162            .try_read()
163            .and_then(|plotted_pieces| plotted_pieces.read_piece(piece_index));
164
165        if let Some(read_piece_fut) = maybe_read_piece_fut
166            && let Some(piece) = read_piece_fut.await
167        {
168            let added_to_cache = inner
169                .farmer_caches
170                .maybe_store_additional_piece(piece_index, &piece)
171                .await;
172            trace!(%piece_index, %added_to_cache, "Got piece from local plot successfully");
173            return Some(piece);
174        }
175
176        // L1 piece acquisition
177        trace!(%piece_index, "Getting piece from DSN L1.");
178
179        let archival_storage_search_result = inner
180            .piece_provider
181            .get_piece_from_archival_storage(piece_index, MAX_RANDOM_WALK_ROUNDS)
182            .await;
183
184        if let Some(piece) = archival_storage_search_result {
185            let added_to_cache = inner
186                .farmer_caches
187                .maybe_store_additional_piece(piece_index, &piece)
188                .await;
189            trace!(%piece_index, %added_to_cache, "DSN L1 lookup succeeded");
190            return Some(piece);
191        }
192
193        None
194    }
195
196    /// Downgrade to [`WeakFarmerPieceGetter`] in order to break reference cycles with internally
197    /// used [`Arc`]
198    pub fn downgrade(&self) -> WeakFarmerPieceGetter<FarmIndex, PV, NC> {
199        WeakFarmerPieceGetter {
200            inner: Arc::downgrade(&self.inner),
201        }
202    }
203}
204
205#[async_trait]
206impl<FarmIndex, PV, NC> PieceGetter for FarmerPieceGetter<FarmIndex, PV, NC>
207where
208    FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static,
209    usize: From<FarmIndex>,
210    PV: PieceValidator + Send + 'static,
211    NC: NodeClient,
212{
213    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
214        {
215            let retries = AtomicU32::new(0);
216            let max_retries = u32::from(self.inner.dsn_cache_retry_policy.max_retries);
217            let maybe_piece_fut = (|| async {
218                let current_attempt = retries.fetch_add(1, Ordering::Relaxed);
219
220                if let Some(piece) = self.get_piece_fast_internal(piece_index).await {
221                    trace!(%piece_index, current_attempt, "Got piece fast");
222                    return Ok(Some(piece));
223                }
224                if current_attempt >= max_retries {
225                    if max_retries > 0 {
226                        debug!(
227                            %piece_index,
228                            current_attempt,
229                            max_retries,
230                            "Couldn't get a piece fast. No retries left"
231                        );
232                    }
233                    return Ok(None);
234                }
235
236                trace!(%piece_index, current_attempt, "Couldn't get a piece fast, retrying...");
237
238                Err("Couldn't get piece fast")
239            })
240            .retry(self.inner.dsn_cache_retry_policy.backoff);
241
242            if let Ok(Some(piece)) = maybe_piece_fut.await {
243                trace!(%piece_index, "Got piece from cache successfully");
244                return Ok(Some(piece));
245            }
246        };
247
248        if let Some(piece) = self.get_piece_slow_internal(piece_index).await {
249            return Ok(Some(piece));
250        }
251
252        debug!(
253            %piece_index,
254            "Cannot acquire piece: all methods yielded empty result"
255        );
256        Ok(None)
257    }
258
259    async fn get_pieces<'a>(
260        &'a self,
261        piece_indices: Vec<PieceIndex>,
262    ) -> anyhow::Result<
263        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
264    > {
265        let (tx, mut rx) = mpsc::unbounded();
266
267        let fut = async move {
268            let tx = &tx;
269
270            let piece_count = piece_indices.len();
271            debug!(%piece_count, "Getting pieces from farmer cache");
272            let mut pieces_not_found_in_farmer_cache = Vec::new();
273            let mut pieces_in_farmer_cache =
274                self.inner.farmer_caches.get_pieces(piece_indices).await;
275
276            while let Some((piece_index, maybe_piece)) = pieces_in_farmer_cache.next().await {
277                let Some(piece) = maybe_piece else {
278                    pieces_not_found_in_farmer_cache.push(piece_index);
279                    continue;
280                };
281                tx.unbounded_send((piece_index, Ok(Some(piece))))
282                    .expect("This future isn't polled after receiver is dropped; qed");
283            }
284
285            if pieces_not_found_in_farmer_cache.is_empty() {
286                return;
287            }
288
289            debug!(
290                remaining_piece_count = %pieces_not_found_in_farmer_cache.len(),
291                %piece_count,
292                "Getting pieces from DSN cache",
293            );
294            let mut pieces_not_found_in_dsn_cache = Vec::new();
295            let mut pieces_in_dsn_cache = self
296                .inner
297                .piece_provider
298                .get_from_cache(pieces_not_found_in_farmer_cache)
299                .await;
300
301            while let Some((piece_index, maybe_piece)) = pieces_in_dsn_cache.next().await {
302                let Some(piece) = maybe_piece else {
303                    pieces_not_found_in_dsn_cache.push(piece_index);
304                    continue;
305                };
306                // TODO: Would be nice to have concurrency here
307                let added_to_cache = self
308                    .inner
309                    .farmer_caches
310                    .maybe_store_additional_piece(piece_index, &piece)
311                    .await;
312                trace!(%piece_index, %added_to_cache, "Got piece from DSN cache successfully");
313                tx.unbounded_send((piece_index, Ok(Some(piece))))
314                    .expect("This future isn't polled after receiver is dropped; qed");
315            }
316
317            if pieces_not_found_in_dsn_cache.is_empty() {
318                return;
319            }
320
321            debug!(
322                remaining_piece_count = %pieces_not_found_in_dsn_cache.len(),
323                %piece_count,
324                "Getting pieces from node",
325            );
326            let pieces_not_found_on_node = pieces_not_found_in_dsn_cache
327                .into_iter()
328                .map(|piece_index| async move {
329                    match self.inner.node_client.piece(piece_index).await {
330                        Ok(Some(piece)) => {
331                            let added_to_cache = self.inner
332                                .farmer_caches
333                                .maybe_store_additional_piece(piece_index, &piece)
334                                .await;
335                            trace!(%piece_index, %added_to_cache, "Got piece from node successfully");
336
337                            tx.unbounded_send((piece_index, Ok(Some(piece))))
338                                .expect("This future isn't polled after receiver is dropped; qed");
339                            None
340                        }
341                        Ok(None) => Some(piece_index),
342                        Err(error) => {
343                            error!(
344                                %error,
345                                %piece_index,
346                                "Failed to retrieve first segment piece from node"
347                            );
348                            Some(piece_index)
349                        }
350                    }
351                })
352                .collect::<FuturesUnordered<_>>()
353                .filter_map(|maybe_piece_index| async move { maybe_piece_index })
354                .collect::<Vec<_>>()
355                .await;
356
357            if pieces_not_found_on_node.is_empty() {
358                return;
359            }
360
361            debug!(
362                remaining_piece_count = %pieces_not_found_on_node.len(),
363                %piece_count,
364                "Some pieces were not easily reachable",
365            );
366            pieces_not_found_on_node
367                .into_iter()
368                .map(|piece_index| async move {
369                    let maybe_piece = self.get_piece_slow_internal(piece_index).await;
370
371                    tx.unbounded_send((piece_index, Ok(maybe_piece)))
372                        .expect("This future isn't polled after receiver is dropped; qed");
373                })
374                .collect::<FuturesUnordered<_>>()
375                // Simply drain everything
376                .for_each(|()| async {})
377                .await;
378        };
379        let mut fut = Box::pin(fut.fuse());
380
381        // Drive above future and stream back any pieces that were downloaded so far
382        Ok(Box::new(stream::poll_fn(move |cx| {
383            if !fut.is_terminated() {
384                // Result doesn't matter, we'll need to poll stream below anyway
385                let _ = fut.poll_unpin(cx);
386            }
387
388            if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) {
389                return Poll::Ready(maybe_result);
390            }
391
392            // Exit will be done by the stream above
393            Poll::Pending
394        })))
395    }
396}
397
398/// Weak farmer piece getter, can be upgraded to [`FarmerPieceGetter`]
399pub struct WeakFarmerPieceGetter<FarmIndex, PV, NC> {
400    inner: Weak<Inner<FarmIndex, PV, NC>>,
401}
402
403impl<FarmIndex, PV, NC> fmt::Debug for WeakFarmerPieceGetter<FarmIndex, PV, NC> {
404    #[inline]
405    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
406        f.debug_struct("WeakFarmerPieceGetter")
407            .finish_non_exhaustive()
408    }
409}
410
411impl<FarmIndex, PV, NC> Clone for WeakFarmerPieceGetter<FarmIndex, PV, NC> {
412    #[inline]
413    fn clone(&self) -> Self {
414        Self {
415            inner: self.inner.clone(),
416        }
417    }
418}
419
420/// This wrapper allows us to return the stream, which in turn depends on `piece_getter` that was
421/// previously on the stack of the inner function. What this wrapper does is create a
422/// self-referential data structure, so we can move both together, while still implementing `Stream`
423/// trait as necessary.
424#[ouroboros::self_referencing]
425struct StreamWithPieceGetter<FarmIndex, PV, NC>
426where
427    FarmIndex: 'static,
428    PV: 'static,
429    NC: 'static,
430{
431    piece_getter: FarmerPieceGetter<FarmIndex, PV, NC>,
432    #[borrows(piece_getter)]
433    #[covariant]
434    stream:
435        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'this>,
436}
437
438impl<FarmIndex, PV, NC> Stream for StreamWithPieceGetter<FarmIndex, PV, NC>
439where
440    FarmIndex: 'static,
441    PV: 'static,
442    NC: 'static,
443{
444    type Item = (PieceIndex, anyhow::Result<Option<Piece>>);
445
446    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
447        self.get_mut()
448            .with_stream_mut(|stream| stream.poll_next_unpin(cx))
449    }
450}
451
452#[async_trait]
453impl<FarmIndex, PV, NC> PieceGetter for WeakFarmerPieceGetter<FarmIndex, PV, NC>
454where
455    FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static,
456    usize: From<FarmIndex>,
457    PV: PieceValidator + Send + 'static,
458    NC: NodeClient,
459{
460    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
461        let Some(piece_getter) = self.upgrade() else {
462            debug!("Farmer piece getter upgrade didn't succeed");
463            return Ok(None);
464        };
465
466        piece_getter.get_piece(piece_index).await
467    }
468
469    async fn get_pieces<'a>(
470        &'a self,
471        piece_indices: Vec<PieceIndex>,
472    ) -> anyhow::Result<
473        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
474    > {
475        let Some(piece_getter) = self.upgrade() else {
476            debug!("Farmer piece getter upgrade didn't succeed");
477            return Ok(Box::new(stream::iter(
478                piece_indices
479                    .into_iter()
480                    .map(|piece_index| (piece_index, Ok(None))),
481            )));
482        };
483
484        // TODO: This is necessary due to more complex lifetimes not yet supported by ouroboros, see
485        //  https://github.com/someguynamedjosh/ouroboros/issues/112
486        let stream_with_piece_getter =
487            StreamWithPieceGetter::try_new_async_send(piece_getter, move |piece_getter| {
488                piece_getter.get_pieces(piece_indices)
489            })
490            .await?;
491
492        Ok(Box::new(stream_with_piece_getter))
493    }
494}
495
496impl<FarmIndex, PV, NC> WeakFarmerPieceGetter<FarmIndex, PV, NC> {
497    /// Try to upgrade to [`FarmerPieceGetter`] if there is at least one other instance of it alive
498    pub fn upgrade(&self) -> Option<FarmerPieceGetter<FarmIndex, PV, NC>> {
499        Some(FarmerPieceGetter {
500            inner: self.inner.upgrade()?,
501        })
502    }
503}