1use crate::farm::plotted_pieces::PlottedPieces;
4use crate::farmer_cache::FarmerCaches;
5use crate::node_client::NodeClient;
6use ab_core_primitives::pieces::{Piece, PieceIndex};
7use ab_data_retrieval::piece_getter::PieceGetter;
8use ab_networking::utils::multihash::ToMultihash;
9use ab_networking::utils::piece_provider::{PieceProvider, PieceValidator};
10use async_lock::RwLock as AsyncRwLock;
11use async_trait::async_trait;
12use backon::{ExponentialBuilder, Retryable};
13use futures::channel::mpsc;
14use futures::future::FusedFuture;
15use futures::stream::FuturesUnordered;
16use futures::{FutureExt, Stream, StreamExt, stream};
17use std::fmt;
18use std::hash::Hash;
19use std::pin::Pin;
20use std::sync::atomic::{AtomicU32, Ordering};
21use std::sync::{Arc, Weak};
22use std::task::{Context, Poll};
23use tracing::{debug, error, trace};
24
25pub mod piece_validator;
26
27const MAX_RANDOM_WALK_ROUNDS: usize = 15;
28
29#[derive(Debug)]
31pub struct DsnCacheRetryPolicy {
32 pub max_retries: u16,
34 pub backoff: ExponentialBuilder,
36}
37
38struct Inner<FarmIndex, PV, NC> {
39 piece_provider: PieceProvider<PV>,
40 farmer_caches: FarmerCaches,
41 node_client: NC,
42 plotted_pieces: Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
43 dsn_cache_retry_policy: DsnCacheRetryPolicy,
44}
45
46pub struct FarmerPieceGetter<FarmIndex, PV, NC> {
50 inner: Arc<Inner<FarmIndex, PV, NC>>,
51}
52
53impl<FarmIndex, PV, NC> fmt::Debug for FarmerPieceGetter<FarmIndex, PV, NC> {
54 #[inline]
55 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56 f.debug_struct("FarmerPieceGetter").finish_non_exhaustive()
57 }
58}
59
60impl<FarmIndex, PV, NC> Clone for FarmerPieceGetter<FarmIndex, PV, NC> {
61 #[inline]
62 fn clone(&self) -> Self {
63 Self {
64 inner: Arc::clone(&self.inner),
65 }
66 }
67}
68
69impl<FarmIndex, PV, NC> FarmerPieceGetter<FarmIndex, PV, NC>
70where
71 FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static,
72 usize: From<FarmIndex>,
73 PV: PieceValidator + Send + 'static,
74 NC: NodeClient,
75{
76 pub fn new(
78 piece_provider: PieceProvider<PV>,
79 farmer_caches: FarmerCaches,
80 node_client: NC,
81 plotted_pieces: Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
82 dsn_cache_retry_policy: DsnCacheRetryPolicy,
83 ) -> Self {
84 Self {
85 inner: Arc::new(Inner {
86 piece_provider,
87 farmer_caches,
88 node_client,
89 plotted_pieces,
90 dsn_cache_retry_policy,
91 }),
92 }
93 }
94
95 pub async fn get_piece_fast(&self, piece_index: PieceIndex) -> Option<Piece> {
97 self.get_piece_fast_internal(piece_index).await
98 }
99
100 async fn get_piece_fast_internal(&self, piece_index: PieceIndex) -> Option<Piece> {
101 let inner = &self.inner;
102
103 trace!(%piece_index, "Getting piece from farmer cache");
104 if let Some(piece) = inner
105 .farmer_caches
106 .get_piece(piece_index.to_multihash())
107 .await
108 {
109 trace!(%piece_index, "Got piece from farmer cache successfully");
110 return Some(piece);
111 }
112
113 trace!(%piece_index, "Getting piece from DSN L2 cache");
115 if let Some(piece) = inner.piece_provider.get_piece_from_cache(piece_index).await {
116 let added_to_cache = inner
117 .farmer_caches
118 .maybe_store_additional_piece(piece_index, &piece)
119 .await;
120 trace!(%piece_index, %added_to_cache, "Got piece from DSN L2 cache");
121 return Some(piece);
122 }
123
124 trace!(%piece_index, "Getting piece from node");
126 match inner.node_client.piece(piece_index).await {
127 Ok(Some(piece)) => {
128 let added_to_cache = inner
129 .farmer_caches
130 .maybe_store_additional_piece(piece_index, &piece)
131 .await;
132 trace!(%piece_index, %added_to_cache, "Got piece from node successfully");
133 return Some(piece);
134 }
135 Ok(None) => {
136 }
138 Err(error) => {
139 error!(
140 %error,
141 %piece_index,
142 "Failed to retrieve first segment piece from node"
143 );
144 }
145 }
146
147 None
148 }
149
150 pub async fn get_piece_slow(&self, piece_index: PieceIndex) -> Option<Piece> {
152 self.get_piece_slow_internal(piece_index).await
153 }
154
155 async fn get_piece_slow_internal(&self, piece_index: PieceIndex) -> Option<Piece> {
157 let inner = &self.inner;
158
159 trace!(%piece_index, "Getting piece from local plot");
160 let maybe_read_piece_fut = inner
161 .plotted_pieces
162 .try_read()
163 .and_then(|plotted_pieces| plotted_pieces.read_piece(piece_index));
164
165 if let Some(read_piece_fut) = maybe_read_piece_fut
166 && let Some(piece) = read_piece_fut.await
167 {
168 let added_to_cache = inner
169 .farmer_caches
170 .maybe_store_additional_piece(piece_index, &piece)
171 .await;
172 trace!(%piece_index, %added_to_cache, "Got piece from local plot successfully");
173 return Some(piece);
174 }
175
176 trace!(%piece_index, "Getting piece from DSN L1.");
178
179 let archival_storage_search_result = inner
180 .piece_provider
181 .get_piece_from_archival_storage(piece_index, MAX_RANDOM_WALK_ROUNDS)
182 .await;
183
184 if let Some(piece) = archival_storage_search_result {
185 let added_to_cache = inner
186 .farmer_caches
187 .maybe_store_additional_piece(piece_index, &piece)
188 .await;
189 trace!(%piece_index, %added_to_cache, "DSN L1 lookup succeeded");
190 return Some(piece);
191 }
192
193 None
194 }
195
196 pub fn downgrade(&self) -> WeakFarmerPieceGetter<FarmIndex, PV, NC> {
199 WeakFarmerPieceGetter {
200 inner: Arc::downgrade(&self.inner),
201 }
202 }
203}
204
205#[async_trait]
206impl<FarmIndex, PV, NC> PieceGetter for FarmerPieceGetter<FarmIndex, PV, NC>
207where
208 FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static,
209 usize: From<FarmIndex>,
210 PV: PieceValidator + Send + 'static,
211 NC: NodeClient,
212{
213 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
214 {
215 let retries = AtomicU32::new(0);
216 let max_retries = u32::from(self.inner.dsn_cache_retry_policy.max_retries);
217 let maybe_piece_fut = (|| async {
218 let current_attempt = retries.fetch_add(1, Ordering::Relaxed);
219
220 if let Some(piece) = self.get_piece_fast_internal(piece_index).await {
221 trace!(%piece_index, current_attempt, "Got piece fast");
222 return Ok(Some(piece));
223 }
224 if current_attempt >= max_retries {
225 if max_retries > 0 {
226 debug!(
227 %piece_index,
228 current_attempt,
229 max_retries,
230 "Couldn't get a piece fast. No retries left"
231 );
232 }
233 return Ok(None);
234 }
235
236 trace!(%piece_index, current_attempt, "Couldn't get a piece fast, retrying...");
237
238 Err("Couldn't get piece fast")
239 })
240 .retry(self.inner.dsn_cache_retry_policy.backoff);
241
242 if let Ok(Some(piece)) = maybe_piece_fut.await {
243 trace!(%piece_index, "Got piece from cache successfully");
244 return Ok(Some(piece));
245 }
246 };
247
248 if let Some(piece) = self.get_piece_slow_internal(piece_index).await {
249 return Ok(Some(piece));
250 }
251
252 debug!(
253 %piece_index,
254 "Cannot acquire piece: all methods yielded empty result"
255 );
256 Ok(None)
257 }
258
259 async fn get_pieces<'a>(
260 &'a self,
261 piece_indices: Vec<PieceIndex>,
262 ) -> anyhow::Result<
263 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
264 > {
265 let (tx, mut rx) = mpsc::unbounded();
266
267 let fut = async move {
268 let tx = &tx;
269
270 let piece_count = piece_indices.len();
271 debug!(%piece_count, "Getting pieces from farmer cache");
272 let mut pieces_not_found_in_farmer_cache = Vec::new();
273 let mut pieces_in_farmer_cache =
274 self.inner.farmer_caches.get_pieces(piece_indices).await;
275
276 while let Some((piece_index, maybe_piece)) = pieces_in_farmer_cache.next().await {
277 let Some(piece) = maybe_piece else {
278 pieces_not_found_in_farmer_cache.push(piece_index);
279 continue;
280 };
281 tx.unbounded_send((piece_index, Ok(Some(piece))))
282 .expect("This future isn't polled after receiver is dropped; qed");
283 }
284
285 if pieces_not_found_in_farmer_cache.is_empty() {
286 return;
287 }
288
289 debug!(
290 remaining_piece_count = %pieces_not_found_in_farmer_cache.len(),
291 %piece_count,
292 "Getting pieces from DSN cache",
293 );
294 let mut pieces_not_found_in_dsn_cache = Vec::new();
295 let mut pieces_in_dsn_cache = self
296 .inner
297 .piece_provider
298 .get_from_cache(pieces_not_found_in_farmer_cache)
299 .await;
300
301 while let Some((piece_index, maybe_piece)) = pieces_in_dsn_cache.next().await {
302 let Some(piece) = maybe_piece else {
303 pieces_not_found_in_dsn_cache.push(piece_index);
304 continue;
305 };
306 let added_to_cache = self
308 .inner
309 .farmer_caches
310 .maybe_store_additional_piece(piece_index, &piece)
311 .await;
312 trace!(%piece_index, %added_to_cache, "Got piece from DSN cache successfully");
313 tx.unbounded_send((piece_index, Ok(Some(piece))))
314 .expect("This future isn't polled after receiver is dropped; qed");
315 }
316
317 if pieces_not_found_in_dsn_cache.is_empty() {
318 return;
319 }
320
321 debug!(
322 remaining_piece_count = %pieces_not_found_in_dsn_cache.len(),
323 %piece_count,
324 "Getting pieces from node",
325 );
326 let pieces_not_found_on_node = pieces_not_found_in_dsn_cache
327 .into_iter()
328 .map(|piece_index| async move {
329 match self.inner.node_client.piece(piece_index).await {
330 Ok(Some(piece)) => {
331 let added_to_cache = self.inner
332 .farmer_caches
333 .maybe_store_additional_piece(piece_index, &piece)
334 .await;
335 trace!(%piece_index, %added_to_cache, "Got piece from node successfully");
336
337 tx.unbounded_send((piece_index, Ok(Some(piece))))
338 .expect("This future isn't polled after receiver is dropped; qed");
339 None
340 }
341 Ok(None) => Some(piece_index),
342 Err(error) => {
343 error!(
344 %error,
345 %piece_index,
346 "Failed to retrieve first segment piece from node"
347 );
348 Some(piece_index)
349 }
350 }
351 })
352 .collect::<FuturesUnordered<_>>()
353 .filter_map(|maybe_piece_index| async move { maybe_piece_index })
354 .collect::<Vec<_>>()
355 .await;
356
357 if pieces_not_found_on_node.is_empty() {
358 return;
359 }
360
361 debug!(
362 remaining_piece_count = %pieces_not_found_on_node.len(),
363 %piece_count,
364 "Some pieces were not easily reachable",
365 );
366 pieces_not_found_on_node
367 .into_iter()
368 .map(|piece_index| async move {
369 let maybe_piece = self.get_piece_slow_internal(piece_index).await;
370
371 tx.unbounded_send((piece_index, Ok(maybe_piece)))
372 .expect("This future isn't polled after receiver is dropped; qed");
373 })
374 .collect::<FuturesUnordered<_>>()
375 .for_each(|()| async {})
377 .await;
378 };
379 let mut fut = Box::pin(fut.fuse());
380
381 Ok(Box::new(stream::poll_fn(move |cx| {
383 if !fut.is_terminated() {
384 let _ = fut.poll_unpin(cx);
386 }
387
388 if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) {
389 return Poll::Ready(maybe_result);
390 }
391
392 Poll::Pending
394 })))
395 }
396}
397
398pub struct WeakFarmerPieceGetter<FarmIndex, PV, NC> {
400 inner: Weak<Inner<FarmIndex, PV, NC>>,
401}
402
403impl<FarmIndex, PV, NC> fmt::Debug for WeakFarmerPieceGetter<FarmIndex, PV, NC> {
404 #[inline]
405 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
406 f.debug_struct("WeakFarmerPieceGetter")
407 .finish_non_exhaustive()
408 }
409}
410
411impl<FarmIndex, PV, NC> Clone for WeakFarmerPieceGetter<FarmIndex, PV, NC> {
412 #[inline]
413 fn clone(&self) -> Self {
414 Self {
415 inner: self.inner.clone(),
416 }
417 }
418}
419
420#[ouroboros::self_referencing]
425struct StreamWithPieceGetter<FarmIndex, PV, NC>
426where
427 FarmIndex: 'static,
428 PV: 'static,
429 NC: 'static,
430{
431 piece_getter: FarmerPieceGetter<FarmIndex, PV, NC>,
432 #[borrows(piece_getter)]
433 #[covariant]
434 stream:
435 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'this>,
436}
437
438impl<FarmIndex, PV, NC> Stream for StreamWithPieceGetter<FarmIndex, PV, NC>
439where
440 FarmIndex: 'static,
441 PV: 'static,
442 NC: 'static,
443{
444 type Item = (PieceIndex, anyhow::Result<Option<Piece>>);
445
446 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
447 self.get_mut()
448 .with_stream_mut(|stream| stream.poll_next_unpin(cx))
449 }
450}
451
452#[async_trait]
453impl<FarmIndex, PV, NC> PieceGetter for WeakFarmerPieceGetter<FarmIndex, PV, NC>
454where
455 FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static,
456 usize: From<FarmIndex>,
457 PV: PieceValidator + Send + 'static,
458 NC: NodeClient,
459{
460 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
461 let Some(piece_getter) = self.upgrade() else {
462 debug!("Farmer piece getter upgrade didn't succeed");
463 return Ok(None);
464 };
465
466 piece_getter.get_piece(piece_index).await
467 }
468
469 async fn get_pieces<'a>(
470 &'a self,
471 piece_indices: Vec<PieceIndex>,
472 ) -> anyhow::Result<
473 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
474 > {
475 let Some(piece_getter) = self.upgrade() else {
476 debug!("Farmer piece getter upgrade didn't succeed");
477 return Ok(Box::new(stream::iter(
478 piece_indices
479 .into_iter()
480 .map(|piece_index| (piece_index, Ok(None))),
481 )));
482 };
483
484 let stream_with_piece_getter =
487 StreamWithPieceGetter::try_new_async_send(piece_getter, move |piece_getter| {
488 piece_getter.get_pieces(piece_indices)
489 })
490 .await?;
491
492 Ok(Box::new(stream_with_piece_getter))
493 }
494}
495
496impl<FarmIndex, PV, NC> WeakFarmerPieceGetter<FarmIndex, PV, NC> {
497 pub fn upgrade(&self) -> Option<FarmerPieceGetter<FarmIndex, PV, NC>> {
499 Some(FarmerPieceGetter {
500 inner: self.inner.upgrade()?,
501 })
502 }
503}