ab_farmer/
farmer_cache.rs

1//! A container that caches pieces
2//!
3//! Farmer cache is a container that orchestrates a bunch of piece and plot caches that together
4//! persist pieces in a way that is easy to retrieve comparing to decoding pieces from plots.
5
6mod metrics;
7mod piece_cache_state;
8// TODO: Not supported under Miri: https://github.com/rust-lang/miri/issues/4464
9#[cfg(not(miri))]
10#[cfg(test)]
11mod tests;
12
13use crate::farm::{MaybePieceStoredResult, PieceCache, PieceCacheId, PieceCacheOffset, PlotCache};
14use crate::farmer_cache::metrics::FarmerCacheMetrics;
15use crate::farmer_cache::piece_cache_state::PieceCachesState;
16use crate::node_client::NodeClient;
17use crate::utils::run_future_in_dedicated_thread;
18use ab_core_primitives::pieces::{Piece, PieceIndex};
19use ab_core_primitives::segments::{SegmentHeader, SegmentIndex};
20use ab_data_retrieval::piece_getter::PieceGetter;
21use ab_networking::KeyWithDistance;
22use ab_networking::libp2p::PeerId;
23use ab_networking::libp2p::kad::RecordKey;
24use ab_networking::utils::multihash::ToMultihash;
25use async_lock::RwLock as AsyncRwLock;
26use bytesize::ByteSize;
27use event_listener_primitives::{Bag, HandlerId};
28use futures::channel::mpsc;
29use futures::future::{Either, FusedFuture};
30use futures::stream::{FuturesOrdered, FuturesUnordered};
31use futures::{FutureExt, SinkExt, Stream, StreamExt, select, stream};
32use parking_lot::{Mutex, RwLock};
33use prometheus_client::registry::Registry;
34use rand::prelude::*;
35use rayon::prelude::*;
36use std::collections::hash_map::Entry;
37use std::collections::{HashMap, HashSet};
38use std::future::join;
39use std::sync::Arc;
40use std::sync::atomic::{AtomicUsize, Ordering};
41use std::task::Poll;
42use std::time::Duration;
43use std::{fmt, mem};
44use tokio::sync::Semaphore;
45use tokio::task::yield_now;
46use tracing::{Instrument, debug, error, info, info_span, trace, warn};
47
48const WORKER_CHANNEL_CAPACITY: usize = 100;
49const SYNC_BATCH_SIZE: usize = 256;
50const SYNC_CONCURRENT_BATCHES: usize = 4;
51/// Make caches available as they are building without waiting for the initialization to finish,
52/// this number defines an interval in pieces after which cache is updated
53const INTERMEDIATE_CACHE_UPDATE_INTERVAL: usize = 100;
54const INITIAL_SYNC_FARM_INFO_CHECK_INTERVAL: Duration = Duration::from_secs(1);
55
56type HandlerFn<A> = Arc<dyn Fn(&A) + Send + Sync + 'static>;
57type Handler<A> = Bag<HandlerFn<A>, A>;
58type CacheIndex = u8;
59
60#[derive(Default, Debug)]
61struct Handlers {
62    progress: Handler<f32>,
63}
64
65#[derive(Debug, Clone, Copy)]
66struct FarmerCacheOffset {
67    cache_index: CacheIndex,
68    piece_offset: PieceCacheOffset,
69}
70
71impl FarmerCacheOffset {
72    fn new(cache_index: CacheIndex, piece_offset: PieceCacheOffset) -> Self {
73        Self {
74            cache_index,
75            piece_offset,
76        }
77    }
78}
79
80#[derive(Debug, Clone)]
81struct CacheBackend {
82    backend: Arc<dyn PieceCache>,
83    used_capacity: u32,
84    total_capacity: u32,
85}
86
87impl std::ops::Deref for CacheBackend {
88    type Target = Arc<dyn PieceCache>;
89
90    fn deref(&self) -> &Self::Target {
91        &self.backend
92    }
93}
94
95impl CacheBackend {
96    fn new(backend: Arc<dyn PieceCache>, total_capacity: u32) -> Self {
97        Self {
98            backend,
99            used_capacity: 0,
100            total_capacity,
101        }
102    }
103
104    fn next_free(&mut self) -> Option<PieceCacheOffset> {
105        let offset = self.used_capacity;
106        if offset < self.total_capacity {
107            self.used_capacity += 1;
108            Some(PieceCacheOffset(offset))
109        } else {
110            debug!(?offset, total_capacity = ?self.total_capacity, "No free space in cache backend");
111            None
112        }
113    }
114
115    fn free_size(&self) -> u32 {
116        self.total_capacity - self.used_capacity
117    }
118}
119
120#[derive(Debug)]
121struct CacheState {
122    cache_stored_pieces: HashMap<KeyWithDistance, FarmerCacheOffset>,
123    cache_free_offsets: Vec<FarmerCacheOffset>,
124    backend: CacheBackend,
125}
126
127#[derive(Debug)]
128enum WorkerCommand {
129    ReplaceBackingCaches {
130        new_piece_caches: Vec<Arc<dyn PieceCache>>,
131    },
132    ForgetKey {
133        key: RecordKey,
134    },
135}
136
137/// Farmer cache worker used to drive the farmer cache backend
138#[derive(Debug)]
139#[must_use = "Farmer cache will not work unless its worker is running"]
140pub struct FarmerCacheWorker<NC>
141where
142    NC: fmt::Debug,
143{
144    peer_id: PeerId,
145    node_client: NC,
146    piece_caches: Arc<AsyncRwLock<PieceCachesState>>,
147    plot_caches: Arc<PlotCaches>,
148    handlers: Arc<Handlers>,
149    worker_receiver: Option<mpsc::Receiver<WorkerCommand>>,
150    metrics: Option<Arc<FarmerCacheMetrics>>,
151}
152
153impl<NC> FarmerCacheWorker<NC>
154where
155    NC: NodeClient,
156{
157    /// Run the cache worker with provided piece getter.
158    ///
159    /// NOTE: Piece getter must not depend on farmer cache in order to avoid reference cycles!
160    pub async fn run<PG>(mut self, piece_getter: PG)
161    where
162        PG: PieceGetter,
163    {
164        // Limit is dynamically set later
165        let mut last_segment_index_internal = SegmentIndex::ZERO;
166
167        let mut worker_receiver = self
168            .worker_receiver
169            .take()
170            .expect("Always set during worker instantiation");
171
172        if let Some(WorkerCommand::ReplaceBackingCaches { new_piece_caches }) =
173            worker_receiver.next().await
174        {
175            self.initialize(
176                &piece_getter,
177                &mut last_segment_index_internal,
178                new_piece_caches,
179            )
180            .await;
181        } else {
182            // Piece cache is dropped before backing caches were sent
183            return;
184        }
185
186        let mut segment_headers_notifications =
187            match self.node_client.subscribe_archived_segment_headers().await {
188                Ok(segment_headers_notifications) => segment_headers_notifications,
189                Err(error) => {
190                    error!(%error, "Failed to subscribe to archived segments notifications");
191                    return;
192                }
193            };
194
195        // Keep up with segment indices that were potentially created since reinitialization,
196        // depending on the size of the diff this may pause block production for a while (due to
197        // subscription we have created above)
198        self.keep_up_after_initial_sync(&piece_getter, &mut last_segment_index_internal)
199            .await;
200
201        loop {
202            select! {
203                maybe_command = worker_receiver.next() => {
204                    let Some(command) = maybe_command else {
205                        // Nothing else left to do
206                        return;
207                    };
208
209                    self.handle_command(command, &piece_getter, &mut last_segment_index_internal).await;
210                }
211                maybe_segment_header = segment_headers_notifications.next().fuse() => {
212                    if let Some(segment_header) = maybe_segment_header {
213                        self.process_segment_header(&piece_getter, segment_header, &mut last_segment_index_internal).await;
214                    } else {
215                        // Keep-up sync only ends with subscription, which lasts for duration of an
216                        // instance
217                        return;
218                    }
219                }
220            }
221        }
222    }
223
224    async fn handle_command<PG>(
225        &self,
226        command: WorkerCommand,
227        piece_getter: &PG,
228        last_segment_index_internal: &mut SegmentIndex,
229    ) where
230        PG: PieceGetter,
231    {
232        match command {
233            WorkerCommand::ReplaceBackingCaches { new_piece_caches } => {
234                self.initialize(piece_getter, last_segment_index_internal, new_piece_caches)
235                    .await;
236            }
237            // TODO: Consider implementing optional re-sync of the piece instead of just forgetting
238            WorkerCommand::ForgetKey { key } => {
239                let mut caches = self.piece_caches.write().await;
240                let key = KeyWithDistance::new_with_record_key(self.peer_id, key);
241                let Some(offset) = caches.remove_stored_piece(&key) else {
242                    // Key not exist
243                    return;
244                };
245
246                let cache_index = offset.cache_index;
247                let piece_offset = offset.piece_offset;
248                let Some(backend) = caches.get_backend(cache_index).cloned() else {
249                    // Cache backend not exist
250                    return;
251                };
252
253                caches.push_dangling_free_offset(offset);
254                match backend.read_piece_index(piece_offset).await {
255                    Ok(Some(piece_index)) => {
256                        trace!(%piece_index, %cache_index, %piece_offset, "Forget piece");
257                    }
258                    Ok(None) => {
259                        warn!(
260                            %cache_index,
261                            %piece_offset,
262                            "Piece index out of range, this is likely an implementation bug, \
263                            not freeing heap element"
264                        );
265                    }
266                    Err(error) => {
267                        error!(
268                            %error,
269                            %cache_index,
270                            ?key,
271                            %piece_offset,
272                            "Error while reading piece from cache"
273                        );
274                    }
275                }
276            }
277        }
278    }
279
280    async fn initialize<PG>(
281        &self,
282        piece_getter: &PG,
283        last_segment_index_internal: &mut SegmentIndex,
284        new_piece_caches: Vec<Arc<dyn PieceCache>>,
285    ) where
286        PG: PieceGetter,
287    {
288        info!("Initializing piece cache");
289
290        // Pull old cache state since it will be replaced with a new one and reuse its allocations
291        let (mut stored_pieces, mut dangling_free_offsets) =
292            mem::take(&mut *self.piece_caches.write().await).reuse();
293
294        debug!("Collecting pieces that were in the cache before");
295
296        if let Some(metrics) = &self.metrics {
297            metrics.piece_cache_capacity_total.set(0);
298            metrics.piece_cache_capacity_used.set(0);
299        }
300
301        let peer_id = self.peer_id;
302
303        // Build cache state of all backends
304        let piece_caches_number = new_piece_caches.len();
305        let maybe_caches_futures = new_piece_caches
306            .into_iter()
307            .enumerate()
308            .filter_map(|(cache_index, new_cache)| {
309                let total_capacity = new_cache.max_num_elements();
310                let mut backend = CacheBackend::new(new_cache, total_capacity);
311                let Ok(cache_index) = CacheIndex::try_from(cache_index) else {
312                    warn!(
313                        ?piece_caches_number,
314                        "Too many piece caches provided, {cache_index} cache will be ignored",
315                    );
316                    return None;
317                };
318
319                if let Some(metrics) = &self.metrics {
320                    metrics
321                        .piece_cache_capacity_total
322                        .inc_by(total_capacity as i64);
323                }
324
325                let init_fut = async move {
326                    let used_capacity = &mut backend.used_capacity;
327
328                    // Hack with first collecting into `Option` with `Option::take()` call
329                    // later is to satisfy compiler that gets confused about ownership
330                    // otherwise
331                    let mut maybe_contents = match backend.backend.contents().await {
332                        Ok(contents) => Some(contents),
333                        Err(error) => {
334                            warn!(%error, "Failed to get cache contents");
335
336                            None
337                        }
338                    };
339
340                    #[allow(clippy::mutable_key_type)]
341                    let mut cache_stored_pieces = HashMap::new();
342                    let mut cache_free_offsets = Vec::new();
343
344                    let Some(mut contents) = maybe_contents.take() else {
345                        drop(maybe_contents);
346
347                        return CacheState {
348                            cache_stored_pieces,
349                            cache_free_offsets,
350                            backend,
351                        };
352                    };
353
354                    while let Some(maybe_element_details) = contents.next().await {
355                        let (piece_offset, maybe_piece_index) = match maybe_element_details {
356                            Ok(element_details) => element_details,
357                            Err(error) => {
358                                warn!(%error, "Failed to get cache contents element details");
359                                break;
360                            }
361                        };
362                        let offset = FarmerCacheOffset::new(cache_index, piece_offset);
363                        match maybe_piece_index {
364                            Some(piece_index) => {
365                                *used_capacity = piece_offset.0 + 1;
366                                let record_key = RecordKey::from(piece_index.to_multihash());
367                                let key = KeyWithDistance::new_with_record_key(peer_id, record_key);
368                                cache_stored_pieces.insert(key, offset);
369                            }
370                            None => {
371                                // TODO: Optimize to not store all free offsets, only dangling
372                                //  offsets are actually necessary
373                                cache_free_offsets.push(offset);
374                            }
375                        }
376
377                        // Allow for task to be aborted
378                        yield_now().await;
379                    }
380
381                    drop(maybe_contents);
382                    drop(contents);
383
384                    CacheState {
385                        cache_stored_pieces,
386                        cache_free_offsets,
387                        backend,
388                    }
389                };
390
391                Some(run_future_in_dedicated_thread(
392                    move || init_fut.instrument(info_span!("", %cache_index)),
393                    format!("piece-cache.{cache_index}"),
394                ))
395            })
396            .collect::<Result<Vec<_>, _>>();
397
398        let caches_futures = match maybe_caches_futures {
399            Ok(caches_futures) => caches_futures,
400            Err(error) => {
401                error!(%error, "Failed to spawn piece cache reading thread");
402
403                return;
404            }
405        };
406
407        let mut backends = Vec::with_capacity(caches_futures.len());
408        let mut caches_futures = caches_futures.into_iter().collect::<FuturesOrdered<_>>();
409
410        while let Some(maybe_cache) = caches_futures.next().await {
411            match maybe_cache {
412                Ok(cache) => {
413                    let backend = cache.backend;
414                    for (key, cache_offset) in cache.cache_stored_pieces {
415                        if let Some(old_cache_offset) = stored_pieces.insert(key, cache_offset) {
416                            dangling_free_offsets.push_front(old_cache_offset);
417                        }
418                    }
419                    dangling_free_offsets.extend(
420                        cache.cache_free_offsets.into_iter().filter(|free_offset| {
421                            free_offset.piece_offset.0 < backend.used_capacity
422                        }),
423                    );
424                    backends.push(backend);
425                }
426                Err(_cancelled) => {
427                    error!("Piece cache reading thread panicked");
428
429                    return;
430                }
431            };
432        }
433
434        let mut caches = PieceCachesState::new(stored_pieces, dangling_free_offsets, backends);
435
436        info!("Synchronizing piece cache");
437
438        let last_segment_index = loop {
439            match self.node_client.farmer_app_info().await {
440                Ok(farmer_app_info) => {
441                    let last_segment_index =
442                        farmer_app_info.protocol_info.history_size.segment_index();
443                    // Wait for node to be either fully synced or to be aware of non-zero segment
444                    // index, which would indicate it has started DSN sync and knows about
445                    // up-to-date archived history.
446                    //
447                    // While this doesn't account for situations where node was offline for a long
448                    // time and is aware of old segment headers, this is good enough for piece cache
449                    // sync to proceed and should result in better user experience on average.
450                    if !farmer_app_info.syncing || last_segment_index > SegmentIndex::ZERO {
451                        break last_segment_index;
452                    }
453                }
454                Err(error) => {
455                    error!(
456                        %error,
457                        "Failed to get farmer app info from node, keeping old cache state without \
458                        updates"
459                    );
460
461                    // Not the latest, but at least something
462                    *self.piece_caches.write().await = caches;
463                    return;
464                }
465            }
466
467            tokio::time::sleep(INITIAL_SYNC_FARM_INFO_CHECK_INTERVAL).await;
468        };
469
470        debug!(%last_segment_index, "Identified last segment index");
471
472        // Collect all the piece indices that need to be stored, we will sort them later
473        let segment_indices = Vec::from_iter(SegmentIndex::ZERO..=last_segment_index);
474        // TODO: This may eventually be too much to store in memory, though right now it is a
475        //  non-issue in practice
476        let mut piece_indices_to_store = segment_indices
477            .into_par_iter()
478            .flat_map(|segment_index| {
479                segment_index
480                    .segment_piece_indexes()
481                    .into_par_iter()
482                    .map(|piece_index| {
483                        (
484                            KeyWithDistance::new(self.peer_id, piece_index.to_multihash()),
485                            piece_index,
486                        )
487                    })
488            })
489            .collect::<Vec<_>>();
490
491        // Sort pieces by distance from peer to piece such that they are in ascending order
492        // and have higher chance of download
493        piece_indices_to_store.par_sort_unstable_by(|(a_key, _), (b_key, _)| a_key.cmp(b_key));
494
495        // `HashMap` is faster than `BTreeMap`
496        let mut piece_indices_to_store = piece_indices_to_store
497            .into_iter()
498            .take(caches.total_capacity())
499            .collect::<HashMap<_, _>>();
500
501        let mut piece_caches_capacity_used = vec![0u32; caches.backends().len()];
502        // Filter-out piece indices that are stored, but should not be as well as clean
503        // `inserted_piece_indices` from already stored piece indices, leaving just those that are
504        // still missing in cache
505        caches.free_unneeded_stored_pieces(&mut piece_indices_to_store);
506
507        if let Some(metrics) = &self.metrics {
508            for offset in caches.stored_pieces_offsets() {
509                piece_caches_capacity_used[usize::from(offset.cache_index)] += 1;
510            }
511
512            for cache_used in piece_caches_capacity_used {
513                metrics
514                    .piece_cache_capacity_used
515                    .inc_by(i64::from(cache_used));
516            }
517        }
518
519        // Store whatever correct pieces are immediately available after restart
520        self.piece_caches.write().await.clone_from(&caches);
521        let stored_count = caches.stored_pieces_offsets().len();
522
523        debug!(
524            %stored_count,
525            count = %piece_indices_to_store.len(),
526            "Identified piece indices that should be cached",
527        );
528
529        let pieces_to_download_total = piece_indices_to_store.len() + stored_count;
530        let piece_indices_to_store = piece_indices_to_store
531            .into_values()
532            .collect::<Vec<_>>()
533            // TODO: Allocating chunks here shouldn't be necessary, but otherwise it fails with
534            //  confusing error described in https://github.com/rust-lang/rust/issues/64552 and
535            //  similar upstream issues
536            .chunks(SYNC_BATCH_SIZE)
537            .map(|chunk| chunk.to_vec())
538            .collect::<Vec<_>>();
539
540        let downloaded_pieces_count = AtomicUsize::new(stored_count);
541        let caches = Mutex::new(caches);
542        self.handlers.progress.call_simple(&0.0);
543        let batch_count = piece_indices_to_store.len();
544        let piece_indices_to_store = piece_indices_to_store.into_iter().enumerate();
545
546        let downloading_semaphore = &Semaphore::new(SYNC_BATCH_SIZE * SYNC_CONCURRENT_BATCHES);
547        let ignored_cache_indices = &RwLock::new(HashSet::new());
548
549        let downloading_pieces_stream =
550            stream::iter(piece_indices_to_store.map(|(batch, piece_indices)| {
551                let downloaded_pieces_count = &downloaded_pieces_count;
552                let caches = &caches;
553                let num_pieces = piece_indices.len();
554
555                trace!(
556                    %num_pieces,
557                    %batch,
558                    %batch_count,
559                    first_piece_index = ?piece_indices.first().expect("chunks are never empty"),
560                    last_piece_index = ?piece_indices.last().expect("chunks are never empty"),
561                    downloaded_pieces_count = %downloaded_pieces_count.load(Ordering::Relaxed),
562                    %pieces_to_download_total,
563                    available_permits = %downloading_semaphore.available_permits(),
564                    "Started piece cache sync batch",
565                );
566
567                async move {
568                    let mut permit = downloading_semaphore
569                        .acquire_many(SYNC_BATCH_SIZE as u32)
570                        .await
571                        .expect("Semaphore is never closed; qed");
572                    debug!(%batch, %num_pieces, "Downloading pieces");
573
574                    let pieces_stream = match piece_getter.get_pieces(piece_indices).await {
575                        Ok(pieces_stream) => pieces_stream,
576                        Err(error) => {
577                            error!(
578                                %error,
579                                "Failed to get pieces from piece getter"
580                            );
581                            return;
582                        }
583                    };
584                    let mut pieces_stream = pieces_stream.enumerate();
585
586                    while let Some((index, (piece_index, result))) = pieces_stream.next().await {
587                        debug!(%batch, %index, %piece_index, "Downloaded piece");
588                        // Release slot for future batches, by dropping it along with the piece.
589                        let _permit = permit.split(1);
590
591                        let piece = match result {
592                            Ok(Some(piece)) => {
593                                trace!(%batch, %piece_index, "Downloaded piece successfully");
594                                piece
595                            }
596                            Ok(None) => {
597                                debug!(%batch, %piece_index, "Couldn't find piece");
598                                continue;
599                            }
600                            Err(error) => {
601                                debug!(
602                                    %batch,
603                                    %error,
604                                    %piece_index,
605                                    "Failed to get piece for piece cache"
606                                );
607                                continue;
608                            }
609                        };
610
611                        let (offset, maybe_backend) = {
612                            let mut caches = caches.lock();
613
614                            // Find plot in which there is a place for new piece to be stored
615                            let Some(offset) = caches.pop_free_offset() else {
616                                error!(
617                                    %batch,
618                                    %piece_index,
619                                    "Failed to store piece in cache, there was no space"
620                                );
621                                break;
622                            };
623
624                            (offset, caches.get_backend(offset.cache_index).cloned())
625                        };
626
627                        let cache_index = offset.cache_index;
628                        let piece_offset = offset.piece_offset;
629
630                        let skip_write = ignored_cache_indices.read().contains(&cache_index);
631                        if skip_write {
632                            trace!(
633                                %batch,
634                                %cache_index,
635                                %piece_index,
636                                %piece_offset,
637                                "Skipping known problematic cache index"
638                            );
639                        } else {
640                            if let Some(backend) = maybe_backend
641                                && let Err(error) =
642                                    backend.write_piece(piece_offset, piece_index, &piece).await
643                            {
644                                error!(
645                                    %error,
646                                    %batch,
647                                    %cache_index,
648                                    %piece_index,
649                                    %piece_offset,
650                                    "Failed to write piece into cache, ignoring this cache going \
651                                    forward"
652                                );
653                                ignored_cache_indices.write().insert(cache_index);
654                                continue;
655                            }
656
657                            let key =
658                                KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
659                            caches.lock().push_stored_piece(key, offset);
660                        }
661
662                        let prev_downloaded_pieces_count =
663                            downloaded_pieces_count.fetch_add(1, Ordering::Relaxed);
664                        // Do not print anything or send progress notification after last piece
665                        // until piece cache is written fully below
666                        if prev_downloaded_pieces_count != pieces_to_download_total {
667                            let progress = prev_downloaded_pieces_count as f32
668                                / pieces_to_download_total as f32
669                                * 100.0;
670                            if prev_downloaded_pieces_count
671                                .is_multiple_of(INTERMEDIATE_CACHE_UPDATE_INTERVAL)
672                            {
673                                let mut piece_caches = self.piece_caches.write().await;
674                                piece_caches.clone_from(&caches.lock());
675
676                                info!(
677                                    "Piece cache sync {progress:.2}% complete ({} / {})",
678                                    ByteSize::b(
679                                        (prev_downloaded_pieces_count * Piece::SIZE) as u64,
680                                    )
681                                    .display()
682                                    .iec(),
683                                    ByteSize::b((pieces_to_download_total * Piece::SIZE) as u64,)
684                                        .display()
685                                        .iec(),
686                                );
687                            }
688
689                            self.handlers.progress.call_simple(&progress);
690                        }
691                    }
692
693                    trace!(
694                        %num_pieces,
695                        %batch,
696                        %batch_count,
697                        downloaded_pieces_count = %downloaded_pieces_count.load(Ordering::Relaxed),
698                        %pieces_to_download_total,
699                        available_permits = %downloading_semaphore.available_permits(),
700                        "Finished piece cache sync batch",
701                    );
702                }
703            }));
704
705        // Download several batches concurrently to make sure slow tail of one is compensated by
706        // another
707        downloading_pieces_stream
708            // This allows to schedule new batch while previous batches partially completed, but
709            // avoids excessive memory usage like when all futures are created upfront
710            .buffer_unordered(SYNC_CONCURRENT_BATCHES * 10)
711            // Simply drain everything
712            .for_each(|()| async {})
713            .await;
714
715        *self.piece_caches.write().await = caches.into_inner();
716        self.handlers.progress.call_simple(&100.0);
717        *last_segment_index_internal = last_segment_index;
718
719        info!("Finished piece cache synchronization");
720    }
721
722    async fn process_segment_header<PG>(
723        &self,
724        piece_getter: &PG,
725        segment_header: SegmentHeader,
726        last_segment_index_internal: &mut SegmentIndex,
727    ) where
728        PG: PieceGetter,
729    {
730        let segment_index = segment_header.segment_index();
731        debug!(%segment_index, "Starting to process newly archived segment");
732
733        if *last_segment_index_internal < segment_index {
734            debug!(%segment_index, "Downloading potentially useful pieces");
735
736            // We do not insert pieces into cache/heap yet, so we don't know if all of these pieces
737            // will be included, but there is a good chance they will be, and we want to acknowledge
738            // new segment header as soon as possible
739            let pieces_to_maybe_include = segment_index
740                .segment_piece_indexes()
741                .into_iter()
742                .map(|piece_index| async move {
743                    let should_store_in_piece_cache = self
744                        .piece_caches
745                        .read()
746                        .await
747                        .should_include_key(self.peer_id, piece_index);
748
749                    let key = RecordKey::from(piece_index.to_multihash());
750                    let should_store_in_plot_cache =
751                        self.plot_caches.should_store(piece_index, &key).await;
752
753                    if !(should_store_in_piece_cache || should_store_in_plot_cache) {
754                        trace!(%piece_index, "Piece doesn't need to be cached #1");
755
756                        return None;
757                    }
758
759                    let maybe_piece_result =
760                        self.node_client
761                            .piece(piece_index)
762                            .await
763                            .inspect_err(|error| {
764                                debug!(
765                                    %error,
766                                    %segment_index,
767                                    %piece_index,
768                                    "Failed to retrieve piece from node right after archiving"
769                                );
770                            });
771
772                    if let Ok(Some(piece)) = maybe_piece_result {
773                        return Some((piece_index, piece));
774                    }
775
776                    match piece_getter.get_piece(piece_index).await {
777                        Ok(Some(piece)) => Some((piece_index, piece)),
778                        Ok(None) => {
779                            warn!(
780                                %segment_index,
781                                %piece_index,
782                                "Failed to retrieve piece right after archiving"
783                            );
784
785                            None
786                        }
787                        Err(error) => {
788                            warn!(
789                                %error,
790                                %segment_index,
791                                %piece_index,
792                                "Failed to retrieve piece right after archiving"
793                            );
794
795                            None
796                        }
797                    }
798                })
799                .collect::<FuturesUnordered<_>>()
800                .filter_map(|maybe_piece| async move { maybe_piece })
801                .collect::<Vec<_>>()
802                .await;
803
804            debug!(%segment_index, "Downloaded potentially useful pieces");
805
806            self.acknowledge_archived_segment_processing(segment_index)
807                .await;
808
809            // Go through potentially matching pieces again now that segment was acknowledged and
810            // try to persist them if necessary
811            for (piece_index, piece) in pieces_to_maybe_include {
812                if !self
813                    .plot_caches
814                    .store_additional_piece(piece_index, &piece)
815                    .await
816                {
817                    trace!(%piece_index, "Piece could not be cached in plot cache");
818                }
819
820                if !self
821                    .piece_caches
822                    .read()
823                    .await
824                    .should_include_key(self.peer_id, piece_index)
825                {
826                    trace!(%piece_index, "Piece doesn't need to be cached #2");
827
828                    continue;
829                }
830
831                trace!(%piece_index, "Piece needs to be cached #1");
832
833                self.persist_piece_in_cache(piece_index, piece).await;
834            }
835
836            *last_segment_index_internal = segment_index;
837        } else {
838            self.acknowledge_archived_segment_processing(segment_index)
839                .await;
840        }
841
842        debug!(%segment_index, "Finished processing newly archived segment");
843    }
844
845    async fn acknowledge_archived_segment_processing(&self, segment_index: SegmentIndex) {
846        match self
847            .node_client
848            .acknowledge_archived_segment_header(segment_index)
849            .await
850        {
851            Ok(()) => {
852                debug!(%segment_index, "Acknowledged archived segment");
853            }
854            Err(error) => {
855                error!(%segment_index, ?error, "Failed to acknowledge archived segment");
856            }
857        };
858    }
859
860    async fn keep_up_after_initial_sync<PG>(
861        &self,
862        piece_getter: &PG,
863        last_segment_index_internal: &mut SegmentIndex,
864    ) where
865        PG: PieceGetter,
866    {
867        let last_segment_index = match self.node_client.farmer_app_info().await {
868            Ok(farmer_app_info) => farmer_app_info.protocol_info.history_size.segment_index(),
869            Err(error) => {
870                error!(
871                    %error,
872                    "Failed to get farmer app info from node, keeping old cache state without \
873                    updates"
874                );
875                return;
876            }
877        };
878
879        if last_segment_index <= *last_segment_index_internal {
880            return;
881        }
882
883        info!(
884            "Syncing piece cache to the latest history size, this may pause block production if \
885            takes too long"
886        );
887
888        // Keep up with segment indices that were potentially created since reinitialization
889        let piece_indices = (*last_segment_index_internal..=last_segment_index)
890            .flat_map(|segment_index| segment_index.segment_piece_indexes());
891
892        // TODO: Download pieces concurrently
893        for piece_index in piece_indices {
894            if !self
895                .piece_caches
896                .read()
897                .await
898                .should_include_key(self.peer_id, piece_index)
899            {
900                trace!(%piece_index, "Piece doesn't need to be cached #3");
901
902                continue;
903            }
904
905            trace!(%piece_index, "Piece needs to be cached #2");
906
907            let result = piece_getter.get_piece(piece_index).await;
908
909            let piece = match result {
910                Ok(Some(piece)) => piece,
911                Ok(None) => {
912                    debug!(%piece_index, "Couldn't find piece");
913                    continue;
914                }
915                Err(error) => {
916                    debug!(
917                        %error,
918                        %piece_index,
919                        "Failed to get piece for piece cache"
920                    );
921                    continue;
922                }
923            };
924
925            self.persist_piece_in_cache(piece_index, piece).await;
926        }
927
928        info!("Finished syncing piece cache to the latest history size");
929
930        *last_segment_index_internal = last_segment_index;
931    }
932
933    /// This assumes it was already checked that piece needs to be stored, no verification for this
934    /// is done internally and invariants will break if this assumption doesn't hold true
935    async fn persist_piece_in_cache(&self, piece_index: PieceIndex, piece: Piece) {
936        let key = KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
937        let mut caches = self.piece_caches.write().await;
938        match caches.should_replace(&key) {
939            // Entry is already occupied, we need to find and replace old piece with new one
940            Some((old_key, offset)) => {
941                let cache_index = offset.cache_index;
942                let piece_offset = offset.piece_offset;
943                let Some(backend) = caches.get_backend(cache_index) else {
944                    // Cache backend not exist
945                    warn!(
946                        %cache_index,
947                        %piece_index,
948                        "Should have a cached backend, but it didn't exist, this is an \
949                        implementation bug"
950                    );
951                    return;
952                };
953                if let Err(error) = backend.write_piece(piece_offset, piece_index, &piece).await {
954                    error!(
955                        %error,
956                        %cache_index,
957                        %piece_index,
958                        %piece_offset,
959                        "Failed to write piece into cache"
960                    );
961                } else {
962                    let old_piece_index = decode_piece_index_from_record_key(old_key.record_key());
963                    trace!(
964                        %cache_index,
965                        %old_piece_index,
966                        %piece_index,
967                        %piece_offset,
968                        "Successfully replaced old cached piece"
969                    );
970                    caches.push_stored_piece(key, offset);
971                }
972            }
973            // There is free space in cache, need to find a free spot and place piece there
974            None => {
975                let Some(offset) = caches.pop_free_offset() else {
976                    warn!(
977                        %piece_index,
978                        "Should have inserted piece into cache, but it didn't happen, this is an \
979                        implementation bug"
980                    );
981                    return;
982                };
983                let cache_index = offset.cache_index;
984                let piece_offset = offset.piece_offset;
985                let Some(backend) = caches.get_backend(cache_index) else {
986                    // Cache backend not exist
987                    warn!(
988                        %cache_index,
989                        %piece_index,
990                        "Should have a cached backend, but it didn't exist, this is an \
991                        implementation bug"
992                    );
993                    return;
994                };
995
996                if let Err(error) = backend.write_piece(piece_offset, piece_index, &piece).await {
997                    error!(
998                        %error,
999                        %cache_index,
1000                        %piece_index,
1001                        %piece_offset,
1002                        "Failed to write piece into cache"
1003                    );
1004                } else {
1005                    trace!(
1006                        %cache_index,
1007                        %piece_index,
1008                        %piece_offset,
1009                        "Successfully stored piece in cache"
1010                    );
1011                    if let Some(metrics) = &self.metrics {
1012                        metrics.piece_cache_capacity_used.inc();
1013                    }
1014                    caches.push_stored_piece(key, offset);
1015                }
1016            }
1017        };
1018    }
1019}
1020
1021#[derive(Debug)]
1022struct PlotCaches {
1023    /// Additional piece caches
1024    caches: AsyncRwLock<Vec<Arc<dyn PlotCache>>>,
1025    /// Next plot cache to use for storing pieces
1026    next_plot_cache: AtomicUsize,
1027}
1028
1029impl PlotCaches {
1030    /// Returns true if there might be space to add a piece to a cache.
1031    /// Returns false if it is already in a cache, or it can't be added to any of the caches.
1032    ///
1033    /// Available space can be overwritten by a sector at any time, so the piece write can still
1034    /// fail even if this returns `true`.
1035    async fn should_store(&self, piece_index: PieceIndex, key: &RecordKey) -> bool {
1036        for (cache_index, cache) in self.caches.read().await.iter().enumerate() {
1037            match cache.is_piece_maybe_stored(key).await {
1038                Ok(MaybePieceStoredResult::No) => {
1039                    // Isn't stored or can't be stored, try another cache if there is one
1040                }
1041                Ok(MaybePieceStoredResult::Vacant) => {
1042                    return true;
1043                }
1044                Ok(MaybePieceStoredResult::Yes) => {
1045                    // Already stored, nothing else left to do
1046                    return false;
1047                }
1048                Err(error) => {
1049                    warn!(
1050                        %cache_index,
1051                        %piece_index,
1052                        %error,
1053                        "Failed to check piece stored in cache"
1054                    );
1055                }
1056            }
1057        }
1058
1059        false
1060    }
1061
1062    /// Store a piece in additional downloaded pieces, if there is space for it.
1063    /// Returns `true` if the piece was added to a cache, and `false` if it couldn't be stored,
1064    /// typically because the cache is full.
1065    async fn store_additional_piece(&self, piece_index: PieceIndex, piece: &Piece) -> bool {
1066        let plot_caches = self.caches.read().await;
1067        let plot_caches_len = plot_caches.len();
1068
1069        // Store pieces in plots using round-robin distribution
1070        for _ in 0..plot_caches_len {
1071            let plot_cache_index =
1072                self.next_plot_cache.fetch_add(1, Ordering::Relaxed) % plot_caches_len;
1073
1074            match plot_caches[plot_cache_index]
1075                .try_store_piece(piece_index, piece)
1076                .await
1077            {
1078                Ok(true) => {
1079                    return true;
1080                }
1081                Ok(false) => {
1082                    continue;
1083                }
1084                Err(error) => {
1085                    error!(
1086                        %error,
1087                        %piece_index,
1088                        %plot_cache_index,
1089                        "Failed to store additional piece in cache"
1090                    );
1091                    continue;
1092                }
1093            }
1094        }
1095
1096        false
1097    }
1098}
1099
1100/// Farmer cache that aggregates different kinds of caches of multiple disks.
1101///
1102/// Pieces in [`PieceCache`] are stored based on capacity and proximity of piece index to farmer's
1103/// network identity. If capacity is not enough to store all pieces in cache then pieces that are
1104/// further from network identity will be evicted, this is helpful for quick retrieval of pieces
1105/// from DSN as well as plotting purposes.
1106///
1107/// [`PlotCache`] is used as a supplementary cache and is primarily helpful for smaller farmers
1108/// where piece cache is not enough to store all the pieces on the network, while there is a lot of
1109/// space in the plot that is not used by sectors yet and can be leverage as extra caching space.
1110#[derive(Debug, Clone)]
1111pub struct FarmerCache {
1112    peer_id: PeerId,
1113    /// Individual dedicated piece caches
1114    piece_caches: Arc<AsyncRwLock<PieceCachesState>>,
1115    /// Additional piece caches
1116    plot_caches: Arc<PlotCaches>,
1117    handlers: Arc<Handlers>,
1118    // We do not want to increase capacity unnecessarily on clone
1119    worker_sender: mpsc::Sender<WorkerCommand>,
1120    metrics: Option<Arc<FarmerCacheMetrics>>,
1121}
1122
1123impl FarmerCache {
1124    /// Create new piece cache instance and corresponding worker.
1125    ///
1126    /// NOTE: Returned future is async, but does blocking operations and should be running in
1127    /// dedicated thread.
1128    pub fn new<NC>(
1129        node_client: NC,
1130        peer_id: PeerId,
1131        registry: Option<&mut Registry>,
1132    ) -> (Self, FarmerCacheWorker<NC>)
1133    where
1134        NC: NodeClient,
1135    {
1136        let caches = Arc::default();
1137        let (worker_sender, worker_receiver) = mpsc::channel(WORKER_CHANNEL_CAPACITY);
1138        let handlers = Arc::new(Handlers::default());
1139
1140        let plot_caches = Arc::new(PlotCaches {
1141            caches: AsyncRwLock::default(),
1142            next_plot_cache: AtomicUsize::new(0),
1143        });
1144        let metrics = registry.map(|registry| Arc::new(FarmerCacheMetrics::new(registry)));
1145
1146        let instance = Self {
1147            peer_id,
1148            piece_caches: Arc::clone(&caches),
1149            plot_caches: Arc::clone(&plot_caches),
1150            handlers: Arc::clone(&handlers),
1151            worker_sender,
1152            metrics: metrics.clone(),
1153        };
1154        let worker = FarmerCacheWorker {
1155            peer_id,
1156            node_client,
1157            piece_caches: caches,
1158            plot_caches,
1159            handlers,
1160            worker_receiver: Some(worker_receiver),
1161            metrics,
1162        };
1163
1164        (instance, worker)
1165    }
1166
1167    /// Get piece from cache
1168    pub async fn get_piece<Key>(&self, key: Key) -> Option<Piece>
1169    where
1170        RecordKey: From<Key>,
1171    {
1172        let key = RecordKey::from(key);
1173        let maybe_piece_found = {
1174            let key = KeyWithDistance::new_with_record_key(self.peer_id, key.clone());
1175            let caches = self.piece_caches.read().await;
1176
1177            caches.get_stored_piece(&key).and_then(|offset| {
1178                let cache_index = offset.cache_index;
1179                let piece_offset = offset.piece_offset;
1180                Some((
1181                    piece_offset,
1182                    cache_index,
1183                    caches.get_backend(cache_index)?.clone(),
1184                ))
1185            })
1186        };
1187
1188        if let Some((piece_offset, cache_index, backend)) = maybe_piece_found {
1189            match backend.read_piece(piece_offset).await {
1190                Ok(maybe_piece) => {
1191                    return match maybe_piece {
1192                        Some((_piece_index, piece)) => {
1193                            if let Some(metrics) = &self.metrics {
1194                                metrics.cache_get_hit.inc();
1195                            }
1196                            Some(piece)
1197                        }
1198                        None => {
1199                            error!(
1200                                %cache_index,
1201                                %piece_offset,
1202                                ?key,
1203                                "Piece was expected to be in cache, but wasn't found there"
1204                            );
1205                            if let Some(metrics) = &self.metrics {
1206                                metrics.cache_get_error.inc();
1207                            }
1208                            None
1209                        }
1210                    };
1211                }
1212                Err(error) => {
1213                    error!(
1214                        %error,
1215                        %cache_index,
1216                        ?key,
1217                        %piece_offset,
1218                        "Error while reading piece from cache"
1219                    );
1220
1221                    if let Err(error) = self
1222                        .worker_sender
1223                        .clone()
1224                        .send(WorkerCommand::ForgetKey { key })
1225                        .await
1226                    {
1227                        trace!(%error, "Failed to send ForgetKey command to worker");
1228                    }
1229
1230                    if let Some(metrics) = &self.metrics {
1231                        metrics.cache_get_error.inc();
1232                    }
1233                    return None;
1234                }
1235            }
1236        }
1237
1238        for cache in self.plot_caches.caches.read().await.iter() {
1239            if let Ok(Some(piece)) = cache.read_piece(&key).await {
1240                if let Some(metrics) = &self.metrics {
1241                    metrics.cache_get_hit.inc();
1242                }
1243                return Some(piece);
1244            }
1245        }
1246
1247        if let Some(metrics) = &self.metrics {
1248            metrics.cache_get_miss.inc();
1249        }
1250        None
1251    }
1252
1253    /// Get pieces from cache.
1254    ///
1255    /// Number of elements in returned stream is the same as number of unique `piece_indices`.
1256    pub async fn get_pieces<'a, PieceIndices>(
1257        &'a self,
1258        piece_indices: PieceIndices,
1259    ) -> impl Stream<Item = (PieceIndex, Option<Piece>)> + Send + Unpin + 'a
1260    where
1261        PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send + 'a> + Send + 'a,
1262    {
1263        let mut pieces_to_get_from_plot_cache = Vec::new();
1264
1265        let pieces_to_read_from_piece_cache = {
1266            let caches = self.piece_caches.read().await;
1267            // Pieces to read from piece cache grouped by backend for efficiency reasons
1268            let mut pieces_to_read_from_piece_cache =
1269                HashMap::<CacheIndex, (CacheBackend, HashMap<_, _>)>::new();
1270
1271            for piece_index in piece_indices {
1272                let key = RecordKey::from(piece_index.to_multihash());
1273
1274                let offset = match caches.get_stored_piece(&KeyWithDistance::new_with_record_key(
1275                    self.peer_id,
1276                    key.clone(),
1277                )) {
1278                    Some(offset) => offset,
1279                    None => {
1280                        pieces_to_get_from_plot_cache.push((piece_index, key));
1281                        continue;
1282                    }
1283                };
1284
1285                let cache_index = offset.cache_index;
1286                let piece_offset = offset.piece_offset;
1287
1288                match pieces_to_read_from_piece_cache.entry(cache_index) {
1289                    Entry::Occupied(mut entry) => {
1290                        let (_backend, pieces) = entry.get_mut();
1291                        pieces.insert(piece_offset, (piece_index, key));
1292                    }
1293                    Entry::Vacant(entry) => {
1294                        let backend = match caches.get_backend(cache_index) {
1295                            Some(backend) => backend.clone(),
1296                            None => {
1297                                pieces_to_get_from_plot_cache.push((piece_index, key));
1298                                continue;
1299                            }
1300                        };
1301                        entry
1302                            .insert((backend, HashMap::from([(piece_offset, (piece_index, key))])));
1303                    }
1304                }
1305            }
1306
1307            pieces_to_read_from_piece_cache
1308        };
1309
1310        let (tx, mut rx) = mpsc::unbounded();
1311
1312        let fut = async move {
1313            let tx = &tx;
1314
1315            let mut reading_from_piece_cache = pieces_to_read_from_piece_cache
1316                .into_iter()
1317                .map(|(cache_index, (backend, mut pieces_to_get))| async move {
1318                    let mut pieces_stream = match backend
1319                        .read_pieces(Box::new(
1320                            pieces_to_get
1321                                .keys()
1322                                .copied()
1323                                .collect::<Vec<_>>()
1324                                .into_iter(),
1325                        ))
1326                        .await
1327                    {
1328                        Ok(pieces_stream) => pieces_stream,
1329                        Err(error) => {
1330                            error!(
1331                                %error,
1332                                %cache_index,
1333                                "Error while reading pieces from cache"
1334                            );
1335
1336                            if let Some(metrics) = &self.metrics {
1337                                metrics.cache_get_error.inc_by(pieces_to_get.len() as u64);
1338                            }
1339                            for (piece_index, _key) in pieces_to_get.into_values() {
1340                                tx.unbounded_send((piece_index, None)).expect(
1341                                    "This future isn't polled after receiver is dropped; qed",
1342                                );
1343                            }
1344                            return;
1345                        }
1346                    };
1347
1348                    while let Some(maybe_piece) = pieces_stream.next().await {
1349                        let result = match maybe_piece {
1350                            Ok((piece_offset, Some((piece_index, piece)))) => {
1351                                pieces_to_get.remove(&piece_offset);
1352
1353                                if let Some(metrics) = &self.metrics {
1354                                    metrics.cache_get_hit.inc();
1355                                }
1356                                (piece_index, Some(piece))
1357                            }
1358                            Ok((piece_offset, None)) => {
1359                                let Some((piece_index, key)) = pieces_to_get.remove(&piece_offset)
1360                                else {
1361                                    debug!(
1362                                        %cache_index,
1363                                        %piece_offset,
1364                                        "Received piece offset that was not expected"
1365                                    );
1366                                    continue;
1367                                };
1368
1369                                error!(
1370                                    %cache_index,
1371                                    %piece_index,
1372                                    %piece_offset,
1373                                    ?key,
1374                                    "Piece was expected to be in cache, but wasn't found there"
1375                                );
1376                                if let Some(metrics) = &self.metrics {
1377                                    metrics.cache_get_error.inc();
1378                                }
1379                                (piece_index, None)
1380                            }
1381                            Err(error) => {
1382                                error!(
1383                                    %error,
1384                                    %cache_index,
1385                                    "Error while reading piece from cache"
1386                                );
1387
1388                                if let Some(metrics) = &self.metrics {
1389                                    metrics.cache_get_error.inc();
1390                                }
1391                                continue;
1392                            }
1393                        };
1394
1395                        tx.unbounded_send(result)
1396                            .expect("This future isn't polled after receiver is dropped; qed");
1397                    }
1398
1399                    if pieces_to_get.is_empty() {
1400                        return;
1401                    }
1402
1403                    if let Some(metrics) = &self.metrics {
1404                        metrics.cache_get_error.inc_by(pieces_to_get.len() as u64);
1405                    }
1406                    for (piece_offset, (piece_index, key)) in pieces_to_get {
1407                        error!(
1408                            %cache_index,
1409                            %piece_index,
1410                            %piece_offset,
1411                            ?key,
1412                            "Piece cache didn't return an entry for offset"
1413                        );
1414
1415                        // Uphold invariant of the method that some result should be returned
1416                        // for every unique piece index
1417                        tx.unbounded_send((piece_index, None))
1418                            .expect("This future isn't polled after receiver is dropped; qed");
1419                    }
1420                })
1421                .collect::<FuturesUnordered<_>>();
1422            // TODO: Can't use this due to https://github.com/rust-lang/rust/issues/64650
1423            // Simply drain everything
1424            // .for_each(|()| async {})
1425
1426            // TODO: Remove once https://github.com/rust-lang/rust/issues/64650 is resolved
1427            let reading_from_piece_cache_fut = async move {
1428                while let Some(()) = reading_from_piece_cache.next().await {
1429                    // Simply drain everything
1430                }
1431            };
1432
1433            let reading_from_plot_cache_fut = async {
1434                if pieces_to_get_from_plot_cache.is_empty() {
1435                    return;
1436                }
1437
1438                for cache in self.plot_caches.caches.read().await.iter() {
1439                    // Iterating over offsets in reverse order to both traverse elements in async code
1440                    // and being able to efficiently remove entries without extra allocations
1441                    for offset in (0..pieces_to_get_from_plot_cache.len()).rev() {
1442                        let (piece_index, key) = &pieces_to_get_from_plot_cache[offset];
1443
1444                        if let Ok(Some(piece)) = cache.read_piece(key).await {
1445                            if let Some(metrics) = &self.metrics {
1446                                metrics.cache_get_hit.inc();
1447                            }
1448                            tx.unbounded_send((*piece_index, Some(piece)))
1449                                .expect("This future isn't polled after receiver is dropped; qed");
1450
1451                            // Due to iteration in reverse order and swapping using elements at the end,
1452                            // this doesn't affect processing of the elements
1453                            pieces_to_get_from_plot_cache.swap_remove(offset);
1454                        }
1455                    }
1456
1457                    if pieces_to_get_from_plot_cache.is_empty() {
1458                        return;
1459                    }
1460                }
1461
1462                if let Some(metrics) = &self.metrics {
1463                    metrics
1464                        .cache_get_miss
1465                        .inc_by(pieces_to_get_from_plot_cache.len() as u64);
1466                }
1467
1468                for (piece_index, _key) in pieces_to_get_from_plot_cache {
1469                    tx.unbounded_send((piece_index, None))
1470                        .expect("This future isn't polled after receiver is dropped; qed");
1471                }
1472            };
1473
1474            join!(reading_from_piece_cache_fut, reading_from_plot_cache_fut).await
1475        };
1476        let mut fut = Box::pin(fut.fuse());
1477
1478        // Drive above future and stream back any pieces that were downloaded so far
1479        stream::poll_fn(move |cx| {
1480            if !fut.is_terminated() {
1481                // Result doesn't matter, we'll need to poll stream below anyway
1482                let _ = fut.poll_unpin(cx);
1483            }
1484
1485            if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) {
1486                return Poll::Ready(maybe_result);
1487            }
1488
1489            // Exit will be done by the stream above
1490            Poll::Pending
1491        })
1492    }
1493
1494    /// Returns a filtered list of pieces that were found in farmer cache, order is not guaranteed
1495    pub async fn has_pieces(&self, mut piece_indices: Vec<PieceIndex>) -> Vec<PieceIndex> {
1496        let mut pieces_to_find = HashMap::<PieceIndex, RecordKey>::from_iter(
1497            piece_indices
1498                .iter()
1499                .map(|piece_index| (*piece_index, RecordKey::from(piece_index.to_multihash()))),
1500        );
1501
1502        // Quick check in piece caches
1503        {
1504            let piece_caches = self.piece_caches.read().await;
1505            pieces_to_find.retain(|_piece_index, key| {
1506                let distance_key = KeyWithDistance::new(self.peer_id, key.clone());
1507                !piece_caches.contains_stored_piece(&distance_key)
1508            });
1509        }
1510
1511        // Early exit if everything is cached
1512        if pieces_to_find.is_empty() {
1513            return piece_indices;
1514        }
1515
1516        // Check plot caches concurrently
1517        if let Some(plot_caches) = self.plot_caches.caches.try_read() {
1518            let plot_caches = &plot_caches;
1519            let not_found = pieces_to_find
1520                .into_iter()
1521                .map(|(piece_index, key)| async move {
1522                    let key = &key;
1523
1524                    let found = plot_caches
1525                        .iter()
1526                        .map(|plot_cache| async {
1527                            matches!(
1528                                plot_cache.is_piece_maybe_stored(key).await,
1529                                Ok(MaybePieceStoredResult::Yes)
1530                            )
1531                        })
1532                        .collect::<FuturesUnordered<_>>()
1533                        .any(|found| async move { found })
1534                        .await;
1535
1536                    if found { None } else { Some(piece_index) }
1537                })
1538                .collect::<FuturesUnordered<_>>()
1539                .filter_map(|maybe_piece_index| async move { maybe_piece_index })
1540                .collect::<HashSet<_>>()
1541                .await;
1542            piece_indices.retain(|piece_index| !not_found.contains(piece_index));
1543        }
1544        piece_indices
1545    }
1546
1547    /// Find piece in cache and return its retrieval details
1548    pub async fn find_piece(
1549        &self,
1550        piece_index: PieceIndex,
1551    ) -> Option<(PieceCacheId, PieceCacheOffset)> {
1552        let caches = self.piece_caches.read().await;
1553
1554        self.find_piece_internal(&caches, piece_index)
1555    }
1556
1557    /// Find pieces in cache and return their retrieval details
1558    pub async fn find_pieces<PieceIndices>(
1559        &self,
1560        piece_indices: PieceIndices,
1561    ) -> Vec<(PieceIndex, PieceCacheId, PieceCacheOffset)>
1562    where
1563        PieceIndices: IntoIterator<Item = PieceIndex>,
1564    {
1565        let caches = self.piece_caches.read().await;
1566
1567        piece_indices
1568            .into_iter()
1569            .filter_map(|piece_index| {
1570                self.find_piece_internal(&caches, piece_index)
1571                    .map(|(cache_id, piece_offset)| (piece_index, cache_id, piece_offset))
1572            })
1573            .collect()
1574    }
1575
1576    fn find_piece_internal(
1577        &self,
1578        caches: &PieceCachesState,
1579        piece_index: PieceIndex,
1580    ) -> Option<(PieceCacheId, PieceCacheOffset)> {
1581        let key = KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
1582
1583        let Some(offset) = caches.get_stored_piece(&key) else {
1584            if let Some(metrics) = &self.metrics {
1585                metrics.cache_find_miss.inc();
1586            }
1587
1588            return None;
1589        };
1590        let piece_offset = offset.piece_offset;
1591
1592        if let Some(backend) = caches.get_backend(offset.cache_index) {
1593            if let Some(metrics) = &self.metrics {
1594                metrics.cache_find_hit.inc();
1595            }
1596            return Some((*backend.id(), piece_offset));
1597        }
1598
1599        if let Some(metrics) = &self.metrics {
1600            metrics.cache_find_miss.inc();
1601        }
1602        None
1603    }
1604
1605    /// Try to store a piece in additional downloaded pieces, if there is space for it.
1606    /// Returns `true` if the piece was added to this cache, and `false` if it was already stored,
1607    /// or there was no space.
1608    pub async fn maybe_store_additional_piece(
1609        &self,
1610        piece_index: PieceIndex,
1611        piece: &Piece,
1612    ) -> bool {
1613        let key = RecordKey::from(piece_index.to_multihash());
1614
1615        let should_store = self.plot_caches.should_store(piece_index, &key).await;
1616
1617        if !should_store {
1618            return false;
1619        }
1620
1621        self.plot_caches
1622            .store_additional_piece(piece_index, piece)
1623            .await
1624    }
1625
1626    /// Initialize replacement of backing caches
1627    pub async fn replace_backing_caches(
1628        &self,
1629        new_piece_caches: Vec<Arc<dyn PieceCache>>,
1630        new_plot_caches: Vec<Arc<dyn PlotCache>>,
1631    ) {
1632        if let Err(error) = self
1633            .worker_sender
1634            .clone()
1635            .send(WorkerCommand::ReplaceBackingCaches { new_piece_caches })
1636            .await
1637        {
1638            warn!(%error, "Failed to replace backing caches, worker exited");
1639        }
1640
1641        *self.plot_caches.caches.write().await = new_plot_caches;
1642    }
1643
1644    /// Subscribe to cache sync notifications
1645    pub fn on_sync_progress(&self, callback: HandlerFn<f32>) -> HandlerId {
1646        self.handlers.progress.add(callback)
1647    }
1648}
1649
1650/// Collection of [`FarmerCache`] instances for load balancing
1651#[derive(Debug, Clone)]
1652pub struct FarmerCaches {
1653    caches: Arc<[FarmerCache]>,
1654}
1655
1656impl From<Arc<[FarmerCache]>> for FarmerCaches {
1657    fn from(caches: Arc<[FarmerCache]>) -> Self {
1658        Self { caches }
1659    }
1660}
1661
1662impl From<FarmerCache> for FarmerCaches {
1663    fn from(cache: FarmerCache) -> Self {
1664        Self {
1665            caches: Arc::new([cache]),
1666        }
1667    }
1668}
1669
1670impl FarmerCaches {
1671    /// Get piece from cache
1672    pub async fn get_piece<Key>(&self, key: Key) -> Option<Piece>
1673    where
1674        RecordKey: From<Key>,
1675    {
1676        let farmer_cache = self.caches.choose(&mut rand::rng())?;
1677        farmer_cache.get_piece(key).await
1678    }
1679
1680    /// Get pieces from cache.
1681    ///
1682    /// Number of elements in returned stream is the same as number of unique `piece_indices`.
1683    pub async fn get_pieces<'a, PieceIndices>(
1684        &'a self,
1685        piece_indices: PieceIndices,
1686    ) -> impl Stream<Item = (PieceIndex, Option<Piece>)> + Send + Unpin + 'a
1687    where
1688        PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send + 'a> + Send + 'a,
1689    {
1690        let Some(farmer_cache) = self.caches.choose(&mut rand::rng()) else {
1691            return Either::Left(stream::iter(
1692                piece_indices
1693                    .into_iter()
1694                    .map(|piece_index| (piece_index, None)),
1695            ));
1696        };
1697
1698        Either::Right(farmer_cache.get_pieces(piece_indices).await)
1699    }
1700
1701    /// Returns a filtered list of pieces that were found in farmer cache, order is not guaranteed
1702    pub async fn has_pieces(&self, piece_indices: Vec<PieceIndex>) -> Vec<PieceIndex> {
1703        let Some(farmer_cache) = self.caches.choose(&mut rand::rng()) else {
1704            return Vec::new();
1705        };
1706
1707        farmer_cache.has_pieces(piece_indices).await
1708    }
1709
1710    /// Try to store a piece in additional downloaded pieces, if there is space for it.
1711    /// Returns `true` if the piece was added to one or more caches, and `false` if it was already
1712    /// stored, or there was no space.
1713    pub async fn maybe_store_additional_piece(
1714        &self,
1715        piece_index: PieceIndex,
1716        piece: &Piece,
1717    ) -> bool {
1718        // Run all the futures to completion, and take a non-short-circuiting any() on the results.
1719        self.caches
1720            .iter()
1721            .map(|farmer_cache| farmer_cache.maybe_store_additional_piece(piece_index, piece))
1722            .collect::<FuturesUnordered<_>>()
1723            .fold::<bool, _, _>(false, |acc, stored| async move { acc || stored })
1724            .await
1725    }
1726}
1727
1728/// Extracts the `PieceIndex` from a `RecordKey`.
1729fn decode_piece_index_from_record_key(key: &RecordKey) -> PieceIndex {
1730    let len = key.as_ref().len();
1731    let s = len - PieceIndex::SIZE;
1732
1733    let mut piece_index_bytes = [0u8; PieceIndex::SIZE];
1734    piece_index_bytes.copy_from_slice(&key.as_ref()[s..]);
1735
1736    PieceIndex::from_bytes(piece_index_bytes)
1737}