Skip to main content

ab_farmer/
farmer_cache.rs

1//! A container that caches pieces
2//!
3//! Farmer cache is a container that orchestrates a bunch of piece and plot caches that together
4//! persist pieces in a way that is easy to retrieve comparing to decoding pieces from plots.
5
6mod metrics;
7mod piece_cache_state;
8// TODO: Not supported under Miri: https://github.com/rust-lang/miri/issues/4464
9#[cfg(not(miri))]
10#[cfg(test)]
11mod tests;
12
13use crate::farm::{MaybePieceStoredResult, PieceCache, PieceCacheId, PieceCacheOffset, PlotCache};
14use crate::farmer_cache::metrics::FarmerCacheMetrics;
15use crate::farmer_cache::piece_cache_state::PieceCachesState;
16use crate::node_client::NodeClient;
17use crate::utils::run_future_in_dedicated_thread;
18use ab_core_primitives::pieces::{Piece, PieceIndex};
19use ab_core_primitives::segments::SegmentIndex;
20use ab_data_retrieval::piece_getter::PieceGetter;
21use ab_networking::KeyWithDistance;
22use ab_networking::libp2p::PeerId;
23use ab_networking::libp2p::kad::RecordKey;
24use ab_networking::utils::multihash::ToMultihash;
25use async_lock::RwLock as AsyncRwLock;
26use bytesize::ByteSize;
27use event_listener_primitives::{Bag, HandlerId};
28use futures::channel::mpsc;
29use futures::future::{Either, FusedFuture};
30use futures::stream::{FuturesOrdered, FuturesUnordered};
31use futures::{FutureExt, SinkExt, Stream, StreamExt, select, stream};
32use parking_lot::{Mutex, RwLock};
33use prometheus_client::registry::Registry;
34use rand::prelude::*;
35use rayon::prelude::*;
36use std::collections::hash_map::Entry;
37use std::collections::{HashMap, HashSet};
38use std::future::join;
39use std::sync::Arc;
40use std::sync::atomic::{AtomicUsize, Ordering};
41use std::task::Poll;
42use std::time::Duration;
43use std::{fmt, mem};
44use tokio::sync::Semaphore;
45use tokio::task::yield_now;
46use tracing::{Instrument, debug, error, info, info_span, trace, warn};
47
48const WORKER_CHANNEL_CAPACITY: usize = 100;
49const SYNC_BATCH_SIZE: usize = 256;
50const SYNC_CONCURRENT_BATCHES: usize = 4;
51/// Make caches available as they are building without waiting for the initialization to finish,
52/// this number defines an interval in pieces after which cache is updated
53const INTERMEDIATE_CACHE_UPDATE_INTERVAL: usize = 100;
54const INITIAL_SYNC_FARM_INFO_CHECK_INTERVAL: Duration = Duration::from_secs(1);
55
56type HandlerFn<A> = Arc<dyn Fn(&A) + Send + Sync + 'static>;
57type Handler<A> = Bag<HandlerFn<A>, A>;
58type CacheIndex = u8;
59
60#[derive(Default, Debug)]
61struct Handlers {
62    progress: Handler<f32>,
63}
64
65#[derive(Debug, Clone, Copy)]
66struct FarmerCacheOffset {
67    cache_index: CacheIndex,
68    piece_offset: PieceCacheOffset,
69}
70
71impl FarmerCacheOffset {
72    fn new(cache_index: CacheIndex, piece_offset: PieceCacheOffset) -> Self {
73        Self {
74            cache_index,
75            piece_offset,
76        }
77    }
78}
79
80#[derive(Debug, Clone)]
81struct CacheBackend {
82    backend: Arc<dyn PieceCache>,
83    used_capacity: u32,
84    total_capacity: u32,
85}
86
87impl std::ops::Deref for CacheBackend {
88    type Target = Arc<dyn PieceCache>;
89
90    fn deref(&self) -> &Self::Target {
91        &self.backend
92    }
93}
94
95impl CacheBackend {
96    fn new(backend: Arc<dyn PieceCache>, total_capacity: u32) -> Self {
97        Self {
98            backend,
99            used_capacity: 0,
100            total_capacity,
101        }
102    }
103
104    fn next_free(&mut self) -> Option<PieceCacheOffset> {
105        let offset = self.used_capacity;
106        if offset < self.total_capacity {
107            self.used_capacity += 1;
108            Some(PieceCacheOffset(offset))
109        } else {
110            debug!(?offset, total_capacity = ?self.total_capacity, "No free space in cache backend");
111            None
112        }
113    }
114
115    fn free_size(&self) -> u32 {
116        self.total_capacity - self.used_capacity
117    }
118}
119
120#[derive(Debug)]
121struct CacheState {
122    cache_stored_pieces: HashMap<KeyWithDistance, FarmerCacheOffset>,
123    cache_free_offsets: Vec<FarmerCacheOffset>,
124    backend: CacheBackend,
125}
126
127#[derive(Debug)]
128enum WorkerCommand {
129    ReplaceBackingCaches {
130        new_piece_caches: Vec<Arc<dyn PieceCache>>,
131    },
132    ForgetKey {
133        key: RecordKey,
134    },
135}
136
137/// Farmer cache worker used to drive the farmer cache backend
138#[derive(Debug)]
139#[must_use = "Farmer cache will not work unless its worker is running"]
140pub struct FarmerCacheWorker<NC>
141where
142    NC: fmt::Debug,
143{
144    peer_id: PeerId,
145    node_client: NC,
146    piece_caches: Arc<AsyncRwLock<PieceCachesState>>,
147    plot_caches: Arc<PlotCaches>,
148    handlers: Arc<Handlers>,
149    worker_receiver: Option<mpsc::Receiver<WorkerCommand>>,
150    metrics: Option<Arc<FarmerCacheMetrics>>,
151}
152
153impl<NC> FarmerCacheWorker<NC>
154where
155    NC: NodeClient,
156{
157    /// Run the cache worker with provided piece getter.
158    ///
159    /// NOTE: Piece getter must not depend on farmer cache in order to avoid reference cycles!
160    pub async fn run<PG>(mut self, piece_getter: PG)
161    where
162        PG: PieceGetter,
163    {
164        // Limit is dynamically set later
165        let mut last_segment_index_internal = SegmentIndex::ZERO;
166
167        let mut worker_receiver = self
168            .worker_receiver
169            .take()
170            .expect("Always set during worker instantiation");
171
172        if let Some(WorkerCommand::ReplaceBackingCaches { new_piece_caches }) =
173            worker_receiver.next().await
174        {
175            self.initialize(
176                &piece_getter,
177                &mut last_segment_index_internal,
178                new_piece_caches,
179            )
180            .await;
181        } else {
182            // Piece cache is dropped before backing caches were sent
183            return;
184        }
185
186        let mut super_segment_headers_notifications = match self
187            .node_client
188            .subscribe_new_super_segment_headers()
189            .await
190        {
191            Ok(super_segment_headers_notifications) => super_segment_headers_notifications,
192            Err(error) => {
193                error!(%error, "Failed to subscribe to new super segment headers notifications");
194                return;
195            }
196        };
197
198        // Keep up with segment indices that were potentially created since reinitialization
199        self.keep_up_after_initial_sync(&piece_getter, &mut last_segment_index_internal)
200            .await;
201
202        loop {
203            select! {
204                maybe_command = worker_receiver.next() => {
205                    let Some(command) = maybe_command else {
206                        // Nothing else left to do
207                        return;
208                    };
209
210                    self.handle_command(command, &piece_getter, &mut last_segment_index_internal).await;
211                }
212                maybe_super_segment_header = super_segment_headers_notifications.next().fuse() => {
213                    if let Some(super_segment_header) = maybe_super_segment_header {
214                        self
215                            .process_new_segment_index(
216                                &piece_getter,
217                                super_segment_header.max_segment_index.as_inner(),
218                                &mut last_segment_index_internal
219                            )
220                            .await;
221                    } else {
222                        // Keep-up sync only ends with subscription, which lasts for duration of an
223                        // instance
224                        return;
225                    }
226                }
227            }
228        }
229    }
230
231    async fn handle_command<PG>(
232        &self,
233        command: WorkerCommand,
234        piece_getter: &PG,
235        last_segment_index_internal: &mut SegmentIndex,
236    ) where
237        PG: PieceGetter,
238    {
239        match command {
240            WorkerCommand::ReplaceBackingCaches { new_piece_caches } => {
241                self.initialize(piece_getter, last_segment_index_internal, new_piece_caches)
242                    .await;
243            }
244            // TODO: Consider implementing optional re-sync of the piece instead of just forgetting
245            WorkerCommand::ForgetKey { key } => {
246                let mut caches = self.piece_caches.write().await;
247                let key = KeyWithDistance::new_with_record_key(self.peer_id, key);
248                let Some(offset) = caches.remove_stored_piece(&key) else {
249                    // Key not exist
250                    return;
251                };
252
253                let cache_index = offset.cache_index;
254                let piece_offset = offset.piece_offset;
255                let Some(backend) = caches.get_backend(cache_index).cloned() else {
256                    // Cache backend not exist
257                    return;
258                };
259
260                caches.push_dangling_free_offset(offset);
261                match backend.read_piece_index(piece_offset).await {
262                    Ok(Some(piece_index)) => {
263                        trace!(%piece_index, %cache_index, %piece_offset, "Forget piece");
264                    }
265                    Ok(None) => {
266                        warn!(
267                            %cache_index,
268                            %piece_offset,
269                            "Piece index out of range, this is likely an implementation bug, \
270                            not freeing heap element"
271                        );
272                    }
273                    Err(error) => {
274                        error!(
275                            %error,
276                            %cache_index,
277                            ?key,
278                            %piece_offset,
279                            "Error while reading piece from cache"
280                        );
281                    }
282                }
283            }
284        }
285    }
286
287    async fn initialize<PG>(
288        &self,
289        piece_getter: &PG,
290        last_segment_index_internal: &mut SegmentIndex,
291        new_piece_caches: Vec<Arc<dyn PieceCache>>,
292    ) where
293        PG: PieceGetter,
294    {
295        info!("Initializing piece cache");
296
297        // Pull old cache state since it will be replaced with a new one and reuse its allocations
298        let (mut stored_pieces, mut dangling_free_offsets) =
299            mem::take(&mut *self.piece_caches.write().await).reuse();
300
301        debug!("Collecting pieces that were in the cache before");
302
303        if let Some(metrics) = &self.metrics {
304            metrics.piece_cache_capacity_total.set(0);
305            metrics.piece_cache_capacity_used.set(0);
306        }
307
308        let peer_id = self.peer_id;
309
310        // Build cache state of all backends
311        let piece_caches_number = new_piece_caches.len();
312        let maybe_caches_futures = new_piece_caches
313            .into_iter()
314            .enumerate()
315            .filter_map(|(cache_index, new_cache)| {
316                let total_capacity = new_cache.max_num_elements();
317                let mut backend = CacheBackend::new(new_cache, total_capacity);
318                let Ok(cache_index) = CacheIndex::try_from(cache_index) else {
319                    warn!(
320                        ?piece_caches_number,
321                        "Too many piece caches provided, {cache_index} cache will be ignored",
322                    );
323                    return None;
324                };
325
326                if let Some(metrics) = &self.metrics {
327                    metrics
328                        .piece_cache_capacity_total
329                        .inc_by(total_capacity as i64);
330                }
331
332                let init_fut = async move {
333                    let used_capacity = &mut backend.used_capacity;
334
335                    // Hack with first collecting into `Option` with `Option::take()` call
336                    // later is to satisfy compiler that gets confused about ownership
337                    // otherwise
338                    let mut maybe_contents = match backend.backend.contents().await {
339                        Ok(contents) => Some(contents),
340                        Err(error) => {
341                            warn!(%error, "Failed to get cache contents");
342
343                            None
344                        }
345                    };
346
347                    let mut cache_stored_pieces = HashMap::new();
348                    let mut cache_free_offsets = Vec::new();
349
350                    let Some(mut contents) = maybe_contents.take() else {
351                        drop(maybe_contents);
352
353                        return CacheState {
354                            cache_stored_pieces,
355                            cache_free_offsets,
356                            backend,
357                        };
358                    };
359
360                    while let Some(maybe_element_details) = contents.next().await {
361                        let (piece_offset, maybe_piece_index) = match maybe_element_details {
362                            Ok(element_details) => element_details,
363                            Err(error) => {
364                                warn!(%error, "Failed to get cache contents element details");
365                                break;
366                            }
367                        };
368                        let offset = FarmerCacheOffset::new(cache_index, piece_offset);
369                        match maybe_piece_index {
370                            Some(piece_index) => {
371                                *used_capacity = piece_offset.0 + 1;
372                                let record_key = RecordKey::from(piece_index.to_multihash());
373                                let key = KeyWithDistance::new_with_record_key(peer_id, record_key);
374                                cache_stored_pieces.insert(key, offset);
375                            }
376                            None => {
377                                // TODO: Optimize to not store all free offsets, only dangling
378                                //  offsets are actually necessary
379                                cache_free_offsets.push(offset);
380                            }
381                        }
382
383                        // Allow for task to be aborted
384                        yield_now().await;
385                    }
386
387                    drop(maybe_contents);
388                    drop(contents);
389
390                    CacheState {
391                        cache_stored_pieces,
392                        cache_free_offsets,
393                        backend,
394                    }
395                };
396
397                Some(run_future_in_dedicated_thread(
398                    move || init_fut.instrument(info_span!("", %cache_index)),
399                    format!("piece-cache.{cache_index}"),
400                ))
401            })
402            .collect::<Result<Vec<_>, _>>();
403
404        let caches_futures = match maybe_caches_futures {
405            Ok(caches_futures) => caches_futures,
406            Err(error) => {
407                error!(%error, "Failed to spawn piece cache reading thread");
408
409                return;
410            }
411        };
412
413        let mut backends = Vec::with_capacity(caches_futures.len());
414        let mut caches_futures = caches_futures.into_iter().collect::<FuturesOrdered<_>>();
415
416        while let Some(maybe_cache) = caches_futures.next().await {
417            match maybe_cache {
418                Ok(cache) => {
419                    let backend = cache.backend;
420                    for (key, cache_offset) in cache.cache_stored_pieces {
421                        if let Some(old_cache_offset) = stored_pieces.insert(key, cache_offset) {
422                            dangling_free_offsets.push_front(old_cache_offset);
423                        }
424                    }
425                    dangling_free_offsets.extend(
426                        cache.cache_free_offsets.into_iter().filter(|free_offset| {
427                            free_offset.piece_offset.0 < backend.used_capacity
428                        }),
429                    );
430                    backends.push(backend);
431                }
432                Err(_cancelled) => {
433                    error!("Piece cache reading thread panicked");
434
435                    return;
436                }
437            };
438        }
439
440        let mut caches = PieceCachesState::new(stored_pieces, dangling_free_offsets, backends);
441
442        info!("Synchronizing piece cache");
443
444        let last_segment_index = loop {
445            match self.node_client.farmer_app_info().await {
446                Ok(farmer_app_info) => {
447                    let last_segment_index =
448                        farmer_app_info.protocol_info.history_size.segment_index();
449                    // Wait for node to be either fully synced or to be aware of non-zero segment
450                    // index, which would indicate it has started DSN sync and knows about
451                    // up-to-date archived history.
452                    //
453                    // While this doesn't account for situations where node was offline for a long
454                    // time and is aware of old segment headers, this is good enough for piece cache
455                    // sync to proceed and should result in better user experience on average.
456                    if !farmer_app_info.syncing || last_segment_index > SegmentIndex::ZERO {
457                        break last_segment_index;
458                    }
459                }
460                Err(error) => {
461                    error!(
462                        %error,
463                        "Failed to get farmer app info from node, keeping old cache state without \
464                        updates"
465                    );
466
467                    // Not the latest, but at least something
468                    *self.piece_caches.write().await = caches;
469                    return;
470                }
471            }
472
473            tokio::time::sleep(INITIAL_SYNC_FARM_INFO_CHECK_INTERVAL).await;
474        };
475
476        debug!(%last_segment_index, "Identified last segment index");
477
478        // Collect all the piece indices that need to be stored, we will sort them later
479        let segment_indices = Vec::from_iter(SegmentIndex::ZERO..=last_segment_index);
480        // TODO: This may eventually be too much to store in memory, though right now it is a
481        //  non-issue in practice
482        let mut piece_indices_to_store = segment_indices
483            .into_par_iter()
484            .flat_map(|segment_index| {
485                segment_index
486                    .segment_piece_indexes()
487                    .into_par_iter()
488                    .map(|piece_index| {
489                        (
490                            KeyWithDistance::new(self.peer_id, piece_index.to_multihash()),
491                            piece_index,
492                        )
493                    })
494            })
495            .collect::<Vec<_>>();
496
497        // Sort pieces by distance from peer to piece such that they are in ascending order
498        // and have higher chance of download
499        piece_indices_to_store.par_sort_unstable_by(|(a_key, _), (b_key, _)| a_key.cmp(b_key));
500
501        // `HashMap` is faster than `BTreeMap`
502        let mut piece_indices_to_store = piece_indices_to_store
503            .into_iter()
504            .take(caches.total_capacity())
505            .collect::<HashMap<_, _>>();
506
507        let mut piece_caches_capacity_used = vec![0u32; caches.backends().len()];
508        // Filter-out piece indices that are stored, but should not be as well as clean
509        // `inserted_piece_indices` from already stored piece indices, leaving just those that are
510        // still missing in cache
511        caches.free_unneeded_stored_pieces(&mut piece_indices_to_store);
512
513        if let Some(metrics) = &self.metrics {
514            for offset in caches.stored_pieces_offsets() {
515                piece_caches_capacity_used[usize::from(offset.cache_index)] += 1;
516            }
517
518            for cache_used in piece_caches_capacity_used {
519                metrics
520                    .piece_cache_capacity_used
521                    .inc_by(i64::from(cache_used));
522            }
523        }
524
525        // Store whatever correct pieces are immediately available after restart
526        self.piece_caches.write().await.clone_from(&caches);
527        let stored_count = caches.stored_pieces_offsets().len();
528
529        debug!(
530            %stored_count,
531            count = %piece_indices_to_store.len(),
532            "Identified piece indices that should be cached",
533        );
534
535        let pieces_to_download_total = piece_indices_to_store.len() + stored_count;
536        let piece_indices_to_store = piece_indices_to_store
537            .into_values()
538            .collect::<Vec<_>>()
539            // TODO: Allocating chunks here shouldn't be necessary, but otherwise it fails with
540            //  confusing error described in https://github.com/rust-lang/rust/issues/64552 and
541            //  similar upstream issues
542            .chunks(SYNC_BATCH_SIZE)
543            .map(|chunk| chunk.to_vec())
544            .collect::<Vec<_>>();
545
546        let downloaded_pieces_count = AtomicUsize::new(stored_count);
547        let caches = Mutex::new(caches);
548        self.handlers.progress.call_simple(&0.0);
549        let batch_count = piece_indices_to_store.len();
550        let piece_indices_to_store = piece_indices_to_store.into_iter().enumerate();
551
552        let downloading_semaphore = &Semaphore::new(SYNC_BATCH_SIZE * SYNC_CONCURRENT_BATCHES);
553        let ignored_cache_indices = &RwLock::new(HashSet::new());
554
555        let downloading_pieces_stream =
556            stream::iter(piece_indices_to_store.map(|(batch, piece_indices)| {
557                let downloaded_pieces_count = &downloaded_pieces_count;
558                let caches = &caches;
559                let num_pieces = piece_indices.len();
560
561                trace!(
562                    %num_pieces,
563                    %batch,
564                    %batch_count,
565                    first_piece_index = ?piece_indices.first().expect("chunks are never empty"),
566                    last_piece_index = ?piece_indices.last().expect("chunks are never empty"),
567                    downloaded_pieces_count = %downloaded_pieces_count.load(Ordering::Relaxed),
568                    %pieces_to_download_total,
569                    available_permits = %downloading_semaphore.available_permits(),
570                    "Started piece cache sync batch",
571                );
572
573                async move {
574                    let mut permit = downloading_semaphore
575                        .acquire_many(SYNC_BATCH_SIZE as u32)
576                        .await
577                        .expect("Semaphore is never closed; qed");
578                    debug!(%batch, %num_pieces, "Downloading pieces");
579
580                    let pieces_stream = match piece_getter.get_pieces(piece_indices).await {
581                        Ok(pieces_stream) => pieces_stream,
582                        Err(error) => {
583                            error!(
584                                %error,
585                                "Failed to get pieces from piece getter"
586                            );
587                            return;
588                        }
589                    };
590                    let mut pieces_stream = pieces_stream.enumerate();
591
592                    while let Some((index, (piece_index, result))) = pieces_stream.next().await {
593                        debug!(%batch, %index, %piece_index, "Downloaded piece");
594                        // Release slot for future batches, by dropping it along with the piece.
595                        let _permit = permit.split(1);
596
597                        let piece = match result {
598                            Ok(Some(piece)) => {
599                                trace!(%batch, %piece_index, "Downloaded piece successfully");
600                                piece
601                            }
602                            Ok(None) => {
603                                debug!(%batch, %piece_index, "Couldn't find piece");
604                                continue;
605                            }
606                            Err(error) => {
607                                debug!(
608                                    %batch,
609                                    %error,
610                                    %piece_index,
611                                    "Failed to get piece for piece cache"
612                                );
613                                continue;
614                            }
615                        };
616
617                        let (offset, maybe_backend) = {
618                            let mut caches = caches.lock();
619
620                            // Find plot in which there is a place for new piece to be stored
621                            let Some(offset) = caches.pop_free_offset() else {
622                                error!(
623                                    %batch,
624                                    %piece_index,
625                                    "Failed to store piece in cache, there was no space"
626                                );
627                                break;
628                            };
629
630                            (offset, caches.get_backend(offset.cache_index).cloned())
631                        };
632
633                        let cache_index = offset.cache_index;
634                        let piece_offset = offset.piece_offset;
635
636                        let skip_write = ignored_cache_indices.read().contains(&cache_index);
637                        if skip_write {
638                            trace!(
639                                %batch,
640                                %cache_index,
641                                %piece_index,
642                                %piece_offset,
643                                "Skipping known problematic cache index"
644                            );
645                        } else {
646                            if let Some(backend) = maybe_backend
647                                && let Err(error) =
648                                    backend.write_piece(piece_offset, piece_index, &piece).await
649                            {
650                                error!(
651                                    %error,
652                                    %batch,
653                                    %cache_index,
654                                    %piece_index,
655                                    %piece_offset,
656                                    "Failed to write piece into cache, ignoring this cache going \
657                                    forward"
658                                );
659                                ignored_cache_indices.write().insert(cache_index);
660                                continue;
661                            }
662
663                            let key =
664                                KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
665                            caches.lock().push_stored_piece(key, offset);
666                        }
667
668                        let prev_downloaded_pieces_count =
669                            downloaded_pieces_count.fetch_add(1, Ordering::Relaxed);
670                        // Do not print anything or send progress notification after last piece
671                        // until piece cache is written fully below
672                        if prev_downloaded_pieces_count != pieces_to_download_total {
673                            let progress = prev_downloaded_pieces_count as f32
674                                / pieces_to_download_total as f32
675                                * 100.0;
676                            if prev_downloaded_pieces_count
677                                .is_multiple_of(INTERMEDIATE_CACHE_UPDATE_INTERVAL)
678                            {
679                                let mut piece_caches = self.piece_caches.write().await;
680                                piece_caches.clone_from(&caches.lock());
681
682                                info!(
683                                    "Piece cache sync {progress:.2}% complete ({} / {})",
684                                    ByteSize::b(
685                                        (prev_downloaded_pieces_count * Piece::SIZE) as u64,
686                                    )
687                                    .display()
688                                    .iec(),
689                                    ByteSize::b((pieces_to_download_total * Piece::SIZE) as u64,)
690                                        .display()
691                                        .iec(),
692                                );
693                            }
694
695                            self.handlers.progress.call_simple(&progress);
696                        }
697                    }
698
699                    trace!(
700                        %num_pieces,
701                        %batch,
702                        %batch_count,
703                        downloaded_pieces_count = %downloaded_pieces_count.load(Ordering::Relaxed),
704                        %pieces_to_download_total,
705                        available_permits = %downloading_semaphore.available_permits(),
706                        "Finished piece cache sync batch",
707                    );
708                }
709            }));
710
711        // Download several batches concurrently to make sure slow tail of one is compensated by
712        // another
713        downloading_pieces_stream
714            // This allows to schedule new batch while previous batches partially completed, but
715            // avoids excessive memory usage like when all futures are created upfront
716            .buffer_unordered(SYNC_CONCURRENT_BATCHES * 10)
717            // Simply drain everything
718            .for_each(|()| async {})
719            .await;
720
721        *self.piece_caches.write().await = caches.into_inner();
722        self.handlers.progress.call_simple(&100.0);
723        *last_segment_index_internal = last_segment_index;
724
725        info!("Finished piece cache synchronization");
726    }
727
728    async fn process_new_segment_index<PG>(
729        &self,
730        piece_getter: &PG,
731        segment_index: SegmentIndex,
732        last_segment_index_internal: &mut SegmentIndex,
733    ) where
734        PG: PieceGetter,
735    {
736        debug!(
737            %last_segment_index_internal,
738            %segment_index,
739            "Starting to process new segment index"
740        );
741
742        let segment_index_range =
743            (*last_segment_index_internal + SegmentIndex::ONE)..=segment_index;
744
745        if segment_index_range.is_empty() {
746            return;
747        }
748
749        debug!(
750            ?segment_index_range,
751            "Downloading potentially useful pieces"
752        );
753
754        // TODO: Download local pieces first, then download other pieces only after some delay, so
755        //  other farmers on the network have enough time to store their pieces too
756
757        // We do not insert pieces into cache/heap yet, so we don't know if all of these pieces will
758        // be included, but there is a good chance they will be
759        let pieces_to_maybe_include = segment_index_range
760            .flat_map(|segment_index| segment_index.segment_piece_indexes())
761            .map(|piece_index| async move {
762                let should_store_in_piece_cache = self
763                    .piece_caches
764                    .read()
765                    .await
766                    .should_include_key(self.peer_id, piece_index);
767
768                let key = RecordKey::from(piece_index.to_multihash());
769                let should_store_in_plot_cache =
770                    self.plot_caches.should_store(piece_index, &key).await;
771
772                if !(should_store_in_piece_cache || should_store_in_plot_cache) {
773                    trace!(%piece_index, "Piece doesn't need to be cached #1");
774
775                    return None;
776                }
777
778                let maybe_piece_result =
779                    self.node_client
780                        .piece(piece_index)
781                        .await
782                        .inspect_err(|error| {
783                            debug!(
784                                %error,
785                                %segment_index,
786                                %piece_index,
787                                "Failed to retrieve piece from node right after archiving"
788                            );
789                        });
790
791                if let Ok(Some(piece)) = maybe_piece_result {
792                    return Some((piece_index, piece));
793                }
794
795                match piece_getter.get_piece(piece_index).await {
796                    Ok(Some(piece)) => Some((piece_index, piece)),
797                    Ok(None) => {
798                        warn!(
799                            %segment_index,
800                            %piece_index,
801                            "Failed to retrieve piece right after archiving"
802                        );
803
804                        None
805                    }
806                    Err(error) => {
807                        warn!(
808                            %error,
809                            %segment_index,
810                            %piece_index,
811                            "Failed to retrieve piece right after archiving"
812                        );
813
814                        None
815                    }
816                }
817            })
818            .collect::<FuturesUnordered<_>>()
819            .filter_map(|maybe_piece| async move { maybe_piece })
820            .collect::<Vec<_>>()
821            .await;
822
823        debug!(%segment_index, "Downloaded potentially useful pieces");
824
825        // Go through potentially matching pieces again now that segment was acknowledged and try to
826        // persist them if necessary
827        for (piece_index, piece) in pieces_to_maybe_include {
828            if !self
829                .plot_caches
830                .store_additional_piece(piece_index, &piece)
831                .await
832            {
833                trace!(%piece_index, "Piece could not be cached in plot cache");
834            }
835
836            if !self
837                .piece_caches
838                .read()
839                .await
840                .should_include_key(self.peer_id, piece_index)
841            {
842                trace!(%piece_index, "Piece doesn't need to be cached #2");
843
844                continue;
845            }
846
847            trace!(%piece_index, "Piece needs to be cached #1");
848
849            self.persist_piece_in_cache(piece_index, piece).await;
850        }
851
852        *last_segment_index_internal = segment_index;
853
854        debug!(%segment_index, "Finished processing new segment indices");
855    }
856
857    async fn keep_up_after_initial_sync<PG>(
858        &self,
859        piece_getter: &PG,
860        last_segment_index_internal: &mut SegmentIndex,
861    ) where
862        PG: PieceGetter,
863    {
864        let last_segment_index = match self.node_client.farmer_app_info().await {
865            Ok(farmer_app_info) => farmer_app_info.protocol_info.history_size.segment_index(),
866            Err(error) => {
867                error!(
868                    %error,
869                    "Failed to get farmer app info from node, keeping old cache state without \
870                    updates"
871                );
872                return;
873            }
874        };
875
876        if last_segment_index <= *last_segment_index_internal {
877            return;
878        }
879
880        info!(
881            "Syncing piece cache to the latest history size, this may pause block production if \
882            takes too long"
883        );
884
885        // Keep up with segment indices that were potentially created since reinitialization
886        let piece_indices = (*last_segment_index_internal..=last_segment_index)
887            .flat_map(|segment_index| segment_index.segment_piece_indexes());
888
889        // TODO: Download pieces concurrently
890        for piece_index in piece_indices {
891            if !self
892                .piece_caches
893                .read()
894                .await
895                .should_include_key(self.peer_id, piece_index)
896            {
897                trace!(%piece_index, "Piece doesn't need to be cached #3");
898
899                continue;
900            }
901
902            trace!(%piece_index, "Piece needs to be cached #2");
903
904            let result = piece_getter.get_piece(piece_index).await;
905
906            let piece = match result {
907                Ok(Some(piece)) => piece,
908                Ok(None) => {
909                    debug!(%piece_index, "Couldn't find piece");
910                    continue;
911                }
912                Err(error) => {
913                    debug!(
914                        %error,
915                        %piece_index,
916                        "Failed to get piece for piece cache"
917                    );
918                    continue;
919                }
920            };
921
922            self.persist_piece_in_cache(piece_index, piece).await;
923        }
924
925        info!("Finished syncing piece cache to the latest history size");
926
927        *last_segment_index_internal = last_segment_index;
928    }
929
930    /// This assumes it was already checked that piece needs to be stored, no verification for this
931    /// is done internally and invariants will break if this assumption doesn't hold true
932    async fn persist_piece_in_cache(&self, piece_index: PieceIndex, piece: Piece) {
933        let key = KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
934        let mut caches = self.piece_caches.write().await;
935        match caches.should_replace(&key) {
936            // Entry is already occupied, we need to find and replace old piece with new one
937            Some((old_key, offset)) => {
938                let cache_index = offset.cache_index;
939                let piece_offset = offset.piece_offset;
940                let Some(backend) = caches.get_backend(cache_index) else {
941                    // Cache backend not exist
942                    warn!(
943                        %cache_index,
944                        %piece_index,
945                        "Should have a cached backend, but it didn't exist, this is an \
946                        implementation bug"
947                    );
948                    return;
949                };
950                if let Err(error) = backend.write_piece(piece_offset, piece_index, &piece).await {
951                    error!(
952                        %error,
953                        %cache_index,
954                        %piece_index,
955                        %piece_offset,
956                        "Failed to write piece into cache"
957                    );
958                } else {
959                    let old_piece_index = decode_piece_index_from_record_key(old_key.record_key());
960                    trace!(
961                        %cache_index,
962                        %old_piece_index,
963                        %piece_index,
964                        %piece_offset,
965                        "Successfully replaced old cached piece"
966                    );
967                    caches.push_stored_piece(key, offset);
968                }
969            }
970            // There is free space in cache, need to find a free spot and place piece there
971            None => {
972                let Some(offset) = caches.pop_free_offset() else {
973                    warn!(
974                        %piece_index,
975                        "Should have inserted piece into cache, but it didn't happen, this is an \
976                        implementation bug"
977                    );
978                    return;
979                };
980                let cache_index = offset.cache_index;
981                let piece_offset = offset.piece_offset;
982                let Some(backend) = caches.get_backend(cache_index) else {
983                    // Cache backend not exist
984                    warn!(
985                        %cache_index,
986                        %piece_index,
987                        "Should have a cached backend, but it didn't exist, this is an \
988                        implementation bug"
989                    );
990                    return;
991                };
992
993                if let Err(error) = backend.write_piece(piece_offset, piece_index, &piece).await {
994                    error!(
995                        %error,
996                        %cache_index,
997                        %piece_index,
998                        %piece_offset,
999                        "Failed to write piece into cache"
1000                    );
1001                } else {
1002                    trace!(
1003                        %cache_index,
1004                        %piece_index,
1005                        %piece_offset,
1006                        "Successfully stored piece in cache"
1007                    );
1008                    if let Some(metrics) = &self.metrics {
1009                        metrics.piece_cache_capacity_used.inc();
1010                    }
1011                    caches.push_stored_piece(key, offset);
1012                }
1013            }
1014        };
1015    }
1016}
1017
1018#[derive(Debug)]
1019struct PlotCaches {
1020    /// Additional piece caches
1021    caches: AsyncRwLock<Vec<Arc<dyn PlotCache>>>,
1022    /// Next plot cache to use for storing pieces
1023    next_plot_cache: AtomicUsize,
1024}
1025
1026impl PlotCaches {
1027    /// Returns true if there might be space to add a piece to a cache.
1028    /// Returns false if it is already in a cache, or it can't be added to any of the caches.
1029    ///
1030    /// Available space can be overwritten by a sector at any time, so the piece write can still
1031    /// fail even if this returns `true`.
1032    async fn should_store(&self, piece_index: PieceIndex, key: &RecordKey) -> bool {
1033        for (cache_index, cache) in self.caches.read().await.iter().enumerate() {
1034            match cache.is_piece_maybe_stored(key).await {
1035                Ok(MaybePieceStoredResult::No) => {
1036                    // Isn't stored or can't be stored, try another cache if there is one
1037                }
1038                Ok(MaybePieceStoredResult::Vacant) => {
1039                    return true;
1040                }
1041                Ok(MaybePieceStoredResult::Yes) => {
1042                    // Already stored, nothing else left to do
1043                    return false;
1044                }
1045                Err(error) => {
1046                    warn!(
1047                        %cache_index,
1048                        %piece_index,
1049                        %error,
1050                        "Failed to check piece stored in cache"
1051                    );
1052                }
1053            }
1054        }
1055
1056        false
1057    }
1058
1059    /// Store a piece in additional downloaded pieces, if there is space for it.
1060    /// Returns `true` if the piece was added to a cache, and `false` if it couldn't be stored,
1061    /// typically because the cache is full.
1062    async fn store_additional_piece(&self, piece_index: PieceIndex, piece: &Piece) -> bool {
1063        let plot_caches = self.caches.read().await;
1064        let plot_caches_len = plot_caches.len();
1065
1066        // Store pieces in plots using round-robin distribution
1067        for _ in 0..plot_caches_len {
1068            let plot_cache_index =
1069                self.next_plot_cache.fetch_add(1, Ordering::Relaxed) % plot_caches_len;
1070
1071            match plot_caches[plot_cache_index]
1072                .try_store_piece(piece_index, piece)
1073                .await
1074            {
1075                Ok(true) => {
1076                    return true;
1077                }
1078                Ok(false) => {
1079                    continue;
1080                }
1081                Err(error) => {
1082                    error!(
1083                        %error,
1084                        %piece_index,
1085                        %plot_cache_index,
1086                        "Failed to store additional piece in cache"
1087                    );
1088                    continue;
1089                }
1090            }
1091        }
1092
1093        false
1094    }
1095}
1096
1097/// Farmer cache that aggregates different kinds of caches of multiple disks.
1098///
1099/// Pieces in [`PieceCache`] are stored based on capacity and proximity of piece index to farmer's
1100/// network identity. If capacity is not enough to store all pieces in cache then pieces that are
1101/// further from network identity will be evicted, this is helpful for quick retrieval of pieces
1102/// from DSN as well as plotting purposes.
1103///
1104/// [`PlotCache`] is used as a supplementary cache and is primarily helpful for smaller farmers
1105/// where piece cache is not enough to store all the pieces on the network, while there is a lot of
1106/// space in the plot that is not used by sectors yet and can be leverage as extra caching space.
1107#[derive(Debug, Clone)]
1108pub struct FarmerCache {
1109    peer_id: PeerId,
1110    /// Individual dedicated piece caches
1111    piece_caches: Arc<AsyncRwLock<PieceCachesState>>,
1112    /// Additional piece caches
1113    plot_caches: Arc<PlotCaches>,
1114    handlers: Arc<Handlers>,
1115    // We do not want to increase capacity unnecessarily on clone
1116    worker_sender: mpsc::Sender<WorkerCommand>,
1117    metrics: Option<Arc<FarmerCacheMetrics>>,
1118}
1119
1120impl FarmerCache {
1121    /// Create new piece cache instance and corresponding worker.
1122    ///
1123    /// NOTE: Returned future is async, but does blocking operations and should be running in
1124    /// dedicated thread.
1125    pub fn new<NC>(
1126        node_client: NC,
1127        peer_id: PeerId,
1128        registry: Option<&mut Registry>,
1129    ) -> (Self, FarmerCacheWorker<NC>)
1130    where
1131        NC: NodeClient,
1132    {
1133        let caches = Arc::default();
1134        let (worker_sender, worker_receiver) = mpsc::channel(WORKER_CHANNEL_CAPACITY);
1135        let handlers = Arc::new(Handlers::default());
1136
1137        let plot_caches = Arc::new(PlotCaches {
1138            caches: AsyncRwLock::default(),
1139            next_plot_cache: AtomicUsize::new(0),
1140        });
1141        let metrics = registry.map(|registry| Arc::new(FarmerCacheMetrics::new(registry)));
1142
1143        let instance = Self {
1144            peer_id,
1145            piece_caches: Arc::clone(&caches),
1146            plot_caches: Arc::clone(&plot_caches),
1147            handlers: Arc::clone(&handlers),
1148            worker_sender,
1149            metrics: metrics.clone(),
1150        };
1151        let worker = FarmerCacheWorker {
1152            peer_id,
1153            node_client,
1154            piece_caches: caches,
1155            plot_caches,
1156            handlers,
1157            worker_receiver: Some(worker_receiver),
1158            metrics,
1159        };
1160
1161        (instance, worker)
1162    }
1163
1164    /// Get piece from cache
1165    pub async fn get_piece<Key>(&self, key: Key) -> Option<Piece>
1166    where
1167        RecordKey: From<Key>,
1168    {
1169        let key = RecordKey::from(key);
1170        let maybe_piece_found = {
1171            let key = KeyWithDistance::new_with_record_key(self.peer_id, key.clone());
1172            let caches = self.piece_caches.read().await;
1173
1174            caches.get_stored_piece(&key).and_then(|offset| {
1175                let cache_index = offset.cache_index;
1176                let piece_offset = offset.piece_offset;
1177                Some((
1178                    piece_offset,
1179                    cache_index,
1180                    caches.get_backend(cache_index)?.clone(),
1181                ))
1182            })
1183        };
1184
1185        if let Some((piece_offset, cache_index, backend)) = maybe_piece_found {
1186            match backend.read_piece(piece_offset).await {
1187                Ok(maybe_piece) => {
1188                    return match maybe_piece {
1189                        Some((_piece_index, piece)) => {
1190                            if let Some(metrics) = &self.metrics {
1191                                metrics.cache_get_hit.inc();
1192                            }
1193                            Some(piece)
1194                        }
1195                        None => {
1196                            error!(
1197                                %cache_index,
1198                                %piece_offset,
1199                                ?key,
1200                                "Piece was expected to be in cache, but wasn't found there"
1201                            );
1202                            if let Some(metrics) = &self.metrics {
1203                                metrics.cache_get_error.inc();
1204                            }
1205                            None
1206                        }
1207                    };
1208                }
1209                Err(error) => {
1210                    error!(
1211                        %error,
1212                        %cache_index,
1213                        ?key,
1214                        %piece_offset,
1215                        "Error while reading piece from cache"
1216                    );
1217
1218                    if let Err(error) = self
1219                        .worker_sender
1220                        .clone()
1221                        .send(WorkerCommand::ForgetKey { key })
1222                        .await
1223                    {
1224                        trace!(%error, "Failed to send ForgetKey command to worker");
1225                    }
1226
1227                    if let Some(metrics) = &self.metrics {
1228                        metrics.cache_get_error.inc();
1229                    }
1230                    return None;
1231                }
1232            }
1233        }
1234
1235        for cache in self.plot_caches.caches.read().await.iter() {
1236            if let Ok(Some(piece)) = cache.read_piece(&key).await {
1237                if let Some(metrics) = &self.metrics {
1238                    metrics.cache_get_hit.inc();
1239                }
1240                return Some(piece);
1241            }
1242        }
1243
1244        if let Some(metrics) = &self.metrics {
1245            metrics.cache_get_miss.inc();
1246        }
1247        None
1248    }
1249
1250    /// Get pieces from cache.
1251    ///
1252    /// Number of elements in returned stream is the same as number of unique `piece_indices`.
1253    pub async fn get_pieces<'a, PieceIndices>(
1254        &'a self,
1255        piece_indices: PieceIndices,
1256    ) -> impl Stream<Item = (PieceIndex, Option<Piece>)> + Send + Unpin + 'a
1257    where
1258        PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send + 'a> + Send + 'a,
1259    {
1260        let mut pieces_to_get_from_plot_cache = Vec::new();
1261
1262        let pieces_to_read_from_piece_cache = {
1263            let caches = self.piece_caches.read().await;
1264            // Pieces to read from piece cache grouped by backend for efficiency reasons
1265            let mut pieces_to_read_from_piece_cache =
1266                HashMap::<CacheIndex, (CacheBackend, HashMap<_, _>)>::new();
1267
1268            for piece_index in piece_indices {
1269                let key = RecordKey::from(piece_index.to_multihash());
1270
1271                let offset = match caches.get_stored_piece(&KeyWithDistance::new_with_record_key(
1272                    self.peer_id,
1273                    key.clone(),
1274                )) {
1275                    Some(offset) => offset,
1276                    None => {
1277                        pieces_to_get_from_plot_cache.push((piece_index, key));
1278                        continue;
1279                    }
1280                };
1281
1282                let cache_index = offset.cache_index;
1283                let piece_offset = offset.piece_offset;
1284
1285                match pieces_to_read_from_piece_cache.entry(cache_index) {
1286                    Entry::Occupied(mut entry) => {
1287                        let (_backend, pieces) = entry.get_mut();
1288                        pieces.insert(piece_offset, (piece_index, key));
1289                    }
1290                    Entry::Vacant(entry) => {
1291                        let backend = match caches.get_backend(cache_index) {
1292                            Some(backend) => backend.clone(),
1293                            None => {
1294                                pieces_to_get_from_plot_cache.push((piece_index, key));
1295                                continue;
1296                            }
1297                        };
1298                        entry
1299                            .insert((backend, HashMap::from([(piece_offset, (piece_index, key))])));
1300                    }
1301                }
1302            }
1303
1304            pieces_to_read_from_piece_cache
1305        };
1306
1307        let (tx, mut rx) = mpsc::unbounded();
1308
1309        let fut = async move {
1310            let tx = &tx;
1311
1312            let mut reading_from_piece_cache = pieces_to_read_from_piece_cache
1313                .into_iter()
1314                .map(|(cache_index, (backend, mut pieces_to_get))| async move {
1315                    let mut pieces_stream = match backend
1316                        .read_pieces(Box::new(
1317                            pieces_to_get
1318                                .keys()
1319                                .copied()
1320                                .collect::<Vec<_>>()
1321                                .into_iter(),
1322                        ))
1323                        .await
1324                    {
1325                        Ok(pieces_stream) => pieces_stream,
1326                        Err(error) => {
1327                            error!(
1328                                %error,
1329                                %cache_index,
1330                                "Error while reading pieces from cache"
1331                            );
1332
1333                            if let Some(metrics) = &self.metrics {
1334                                metrics.cache_get_error.inc_by(pieces_to_get.len() as u64);
1335                            }
1336                            for (piece_index, _key) in pieces_to_get.into_values() {
1337                                tx.unbounded_send((piece_index, None)).expect(
1338                                    "This future isn't polled after receiver is dropped; qed",
1339                                );
1340                            }
1341                            return;
1342                        }
1343                    };
1344
1345                    while let Some(maybe_piece) = pieces_stream.next().await {
1346                        let result = match maybe_piece {
1347                            Ok((piece_offset, Some((piece_index, piece)))) => {
1348                                pieces_to_get.remove(&piece_offset);
1349
1350                                if let Some(metrics) = &self.metrics {
1351                                    metrics.cache_get_hit.inc();
1352                                }
1353                                (piece_index, Some(piece))
1354                            }
1355                            Ok((piece_offset, None)) => {
1356                                let Some((piece_index, key)) = pieces_to_get.remove(&piece_offset)
1357                                else {
1358                                    debug!(
1359                                        %cache_index,
1360                                        %piece_offset,
1361                                        "Received piece offset that was not expected"
1362                                    );
1363                                    continue;
1364                                };
1365
1366                                error!(
1367                                    %cache_index,
1368                                    %piece_index,
1369                                    %piece_offset,
1370                                    ?key,
1371                                    "Piece was expected to be in cache, but wasn't found there"
1372                                );
1373                                if let Some(metrics) = &self.metrics {
1374                                    metrics.cache_get_error.inc();
1375                                }
1376                                (piece_index, None)
1377                            }
1378                            Err(error) => {
1379                                error!(
1380                                    %error,
1381                                    %cache_index,
1382                                    "Error while reading piece from cache"
1383                                );
1384
1385                                if let Some(metrics) = &self.metrics {
1386                                    metrics.cache_get_error.inc();
1387                                }
1388                                continue;
1389                            }
1390                        };
1391
1392                        tx.unbounded_send(result)
1393                            .expect("This future isn't polled after receiver is dropped; qed");
1394                    }
1395
1396                    if pieces_to_get.is_empty() {
1397                        return;
1398                    }
1399
1400                    if let Some(metrics) = &self.metrics {
1401                        metrics.cache_get_error.inc_by(pieces_to_get.len() as u64);
1402                    }
1403                    for (piece_offset, (piece_index, key)) in pieces_to_get {
1404                        error!(
1405                            %cache_index,
1406                            %piece_index,
1407                            %piece_offset,
1408                            ?key,
1409                            "Piece cache didn't return an entry for offset"
1410                        );
1411
1412                        // Uphold invariant of the method that some result should be returned
1413                        // for every unique piece index
1414                        tx.unbounded_send((piece_index, None))
1415                            .expect("This future isn't polled after receiver is dropped; qed");
1416                    }
1417                })
1418                .collect::<FuturesUnordered<_>>();
1419            // TODO: Can't use this due to https://github.com/rust-lang/rust/issues/64650
1420            // Simply drain everything
1421            // .for_each(|()| async {})
1422
1423            // TODO: Remove once https://github.com/rust-lang/rust/issues/64650 is resolved
1424            let reading_from_piece_cache_fut = async move {
1425                while let Some(()) = reading_from_piece_cache.next().await {
1426                    // Simply drain everything
1427                }
1428            };
1429
1430            let reading_from_plot_cache_fut = async {
1431                if pieces_to_get_from_plot_cache.is_empty() {
1432                    return;
1433                }
1434
1435                for cache in self.plot_caches.caches.read().await.iter() {
1436                    // Iterating over offsets in reverse order to both traverse elements in async
1437                    // code and being able to efficiently remove entries without extra allocations
1438                    for offset in (0..pieces_to_get_from_plot_cache.len()).rev() {
1439                        let (piece_index, key) = &pieces_to_get_from_plot_cache[offset];
1440
1441                        if let Ok(Some(piece)) = cache.read_piece(key).await {
1442                            if let Some(metrics) = &self.metrics {
1443                                metrics.cache_get_hit.inc();
1444                            }
1445                            tx.unbounded_send((*piece_index, Some(piece)))
1446                                .expect("This future isn't polled after receiver is dropped; qed");
1447
1448                            // Due to iteration in reverse order and swapping using elements at the
1449                            // end, this doesn't affect processing of the elements
1450                            pieces_to_get_from_plot_cache.swap_remove(offset);
1451                        }
1452                    }
1453
1454                    if pieces_to_get_from_plot_cache.is_empty() {
1455                        return;
1456                    }
1457                }
1458
1459                if let Some(metrics) = &self.metrics {
1460                    metrics
1461                        .cache_get_miss
1462                        .inc_by(pieces_to_get_from_plot_cache.len() as u64);
1463                }
1464
1465                for (piece_index, _key) in pieces_to_get_from_plot_cache {
1466                    tx.unbounded_send((piece_index, None))
1467                        .expect("This future isn't polled after receiver is dropped; qed");
1468                }
1469            };
1470
1471            join!(reading_from_piece_cache_fut, reading_from_plot_cache_fut).await
1472        };
1473        let mut fut = Box::pin(fut.fuse());
1474
1475        // Drive above future and stream back any pieces that were downloaded so far
1476        stream::poll_fn(move |cx| {
1477            if !fut.is_terminated() {
1478                // Result doesn't matter, we'll need to poll stream below anyway
1479                let _ = fut.poll_unpin(cx);
1480            }
1481
1482            if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) {
1483                return Poll::Ready(maybe_result);
1484            }
1485
1486            // Exit will be done by the stream above
1487            Poll::Pending
1488        })
1489    }
1490
1491    /// Returns a filtered list of pieces that were found in farmer cache, order is not guaranteed
1492    pub async fn has_pieces(&self, mut piece_indices: Vec<PieceIndex>) -> Vec<PieceIndex> {
1493        let mut pieces_to_find = HashMap::<PieceIndex, RecordKey>::from_iter(
1494            piece_indices
1495                .iter()
1496                .map(|piece_index| (*piece_index, RecordKey::from(piece_index.to_multihash()))),
1497        );
1498
1499        // Quick check in piece caches
1500        {
1501            let piece_caches = self.piece_caches.read().await;
1502            pieces_to_find.retain(|_piece_index, key| {
1503                let distance_key = KeyWithDistance::new(self.peer_id, key.clone());
1504                !piece_caches.contains_stored_piece(&distance_key)
1505            });
1506        }
1507
1508        // Early exit if everything is cached
1509        if pieces_to_find.is_empty() {
1510            return piece_indices;
1511        }
1512
1513        // Check plot caches concurrently
1514        if let Some(plot_caches) = self.plot_caches.caches.try_read() {
1515            let plot_caches = &plot_caches;
1516            let not_found = pieces_to_find
1517                .into_iter()
1518                .map(|(piece_index, key)| async move {
1519                    let key = &key;
1520
1521                    let found = plot_caches
1522                        .iter()
1523                        .map(|plot_cache| async {
1524                            matches!(
1525                                plot_cache.is_piece_maybe_stored(key).await,
1526                                Ok(MaybePieceStoredResult::Yes)
1527                            )
1528                        })
1529                        .collect::<FuturesUnordered<_>>()
1530                        .any(|found| async move { found })
1531                        .await;
1532
1533                    if found { None } else { Some(piece_index) }
1534                })
1535                .collect::<FuturesUnordered<_>>()
1536                .filter_map(|maybe_piece_index| async move { maybe_piece_index })
1537                .collect::<HashSet<_>>()
1538                .await;
1539            piece_indices.retain(|piece_index| !not_found.contains(piece_index));
1540        }
1541        piece_indices
1542    }
1543
1544    /// Find piece in cache and return its retrieval details
1545    pub async fn find_piece(
1546        &self,
1547        piece_index: PieceIndex,
1548    ) -> Option<(PieceCacheId, PieceCacheOffset)> {
1549        let caches = self.piece_caches.read().await;
1550
1551        self.find_piece_internal(&caches, piece_index)
1552    }
1553
1554    /// Find pieces in cache and return their retrieval details
1555    pub async fn find_pieces<PieceIndices>(
1556        &self,
1557        piece_indices: PieceIndices,
1558    ) -> Vec<(PieceIndex, PieceCacheId, PieceCacheOffset)>
1559    where
1560        PieceIndices: IntoIterator<Item = PieceIndex>,
1561    {
1562        let caches = self.piece_caches.read().await;
1563
1564        piece_indices
1565            .into_iter()
1566            .filter_map(|piece_index| {
1567                self.find_piece_internal(&caches, piece_index)
1568                    .map(|(cache_id, piece_offset)| (piece_index, cache_id, piece_offset))
1569            })
1570            .collect()
1571    }
1572
1573    fn find_piece_internal(
1574        &self,
1575        caches: &PieceCachesState,
1576        piece_index: PieceIndex,
1577    ) -> Option<(PieceCacheId, PieceCacheOffset)> {
1578        let key = KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
1579
1580        let Some(offset) = caches.get_stored_piece(&key) else {
1581            if let Some(metrics) = &self.metrics {
1582                metrics.cache_find_miss.inc();
1583            }
1584
1585            return None;
1586        };
1587        let piece_offset = offset.piece_offset;
1588
1589        if let Some(backend) = caches.get_backend(offset.cache_index) {
1590            if let Some(metrics) = &self.metrics {
1591                metrics.cache_find_hit.inc();
1592            }
1593            return Some((*backend.id(), piece_offset));
1594        }
1595
1596        if let Some(metrics) = &self.metrics {
1597            metrics.cache_find_miss.inc();
1598        }
1599        None
1600    }
1601
1602    /// Try to store a piece in additional downloaded pieces, if there is space for it.
1603    /// Returns `true` if the piece was added to this cache, and `false` if it was already stored,
1604    /// or there was no space.
1605    pub async fn maybe_store_additional_piece(
1606        &self,
1607        piece_index: PieceIndex,
1608        piece: &Piece,
1609    ) -> bool {
1610        let key = RecordKey::from(piece_index.to_multihash());
1611
1612        let should_store = self.plot_caches.should_store(piece_index, &key).await;
1613
1614        if !should_store {
1615            return false;
1616        }
1617
1618        self.plot_caches
1619            .store_additional_piece(piece_index, piece)
1620            .await
1621    }
1622
1623    /// Initialize replacement of backing caches
1624    pub async fn replace_backing_caches(
1625        &self,
1626        new_piece_caches: Vec<Arc<dyn PieceCache>>,
1627        new_plot_caches: Vec<Arc<dyn PlotCache>>,
1628    ) {
1629        if let Err(error) = self
1630            .worker_sender
1631            .clone()
1632            .send(WorkerCommand::ReplaceBackingCaches { new_piece_caches })
1633            .await
1634        {
1635            warn!(%error, "Failed to replace backing caches, worker exited");
1636        }
1637
1638        *self.plot_caches.caches.write().await = new_plot_caches;
1639    }
1640
1641    /// Subscribe to cache sync notifications
1642    pub fn on_sync_progress(&self, callback: HandlerFn<f32>) -> HandlerId {
1643        self.handlers.progress.add(callback)
1644    }
1645}
1646
1647/// Collection of [`FarmerCache`] instances for load balancing
1648#[derive(Debug, Clone)]
1649pub struct FarmerCaches {
1650    caches: Arc<[FarmerCache]>,
1651}
1652
1653impl From<Arc<[FarmerCache]>> for FarmerCaches {
1654    fn from(caches: Arc<[FarmerCache]>) -> Self {
1655        Self { caches }
1656    }
1657}
1658
1659impl From<FarmerCache> for FarmerCaches {
1660    fn from(cache: FarmerCache) -> Self {
1661        Self {
1662            caches: Arc::new([cache]),
1663        }
1664    }
1665}
1666
1667impl FarmerCaches {
1668    /// Get piece from cache
1669    pub async fn get_piece<Key>(&self, key: Key) -> Option<Piece>
1670    where
1671        RecordKey: From<Key>,
1672    {
1673        let farmer_cache = self.caches.choose(&mut rand::rng())?;
1674        farmer_cache.get_piece(key).await
1675    }
1676
1677    /// Get pieces from cache.
1678    ///
1679    /// Number of elements in returned stream is the same as number of unique `piece_indices`.
1680    pub async fn get_pieces<'a, PieceIndices>(
1681        &'a self,
1682        piece_indices: PieceIndices,
1683    ) -> impl Stream<Item = (PieceIndex, Option<Piece>)> + Send + Unpin + 'a
1684    where
1685        PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send + 'a> + Send + 'a,
1686    {
1687        let Some(farmer_cache) = self.caches.choose(&mut rand::rng()) else {
1688            return Either::Left(stream::iter(
1689                piece_indices
1690                    .into_iter()
1691                    .map(|piece_index| (piece_index, None)),
1692            ));
1693        };
1694
1695        Either::Right(farmer_cache.get_pieces(piece_indices).await)
1696    }
1697
1698    /// Returns a filtered list of pieces that were found in farmer cache, order is not guaranteed
1699    pub async fn has_pieces(&self, piece_indices: Vec<PieceIndex>) -> Vec<PieceIndex> {
1700        let Some(farmer_cache) = self.caches.choose(&mut rand::rng()) else {
1701            return Vec::new();
1702        };
1703
1704        farmer_cache.has_pieces(piece_indices).await
1705    }
1706
1707    /// Try to store a piece in additional downloaded pieces, if there is space for it.
1708    /// Returns `true` if the piece was added to one or more caches, and `false` if it was already
1709    /// stored, or there was no space.
1710    pub async fn maybe_store_additional_piece(
1711        &self,
1712        piece_index: PieceIndex,
1713        piece: &Piece,
1714    ) -> bool {
1715        // Run all the futures to completion, and take a non-short-circuiting any() on the results.
1716        self.caches
1717            .iter()
1718            .map(|farmer_cache| farmer_cache.maybe_store_additional_piece(piece_index, piece))
1719            .collect::<FuturesUnordered<_>>()
1720            .fold::<bool, _, _>(false, |acc, stored| async move { acc || stored })
1721            .await
1722    }
1723}
1724
1725/// Extracts the `PieceIndex` from a `RecordKey`.
1726fn decode_piece_index_from_record_key(key: &RecordKey) -> PieceIndex {
1727    let len = key.as_ref().len();
1728    let s = len - PieceIndex::SIZE;
1729
1730    let mut piece_index_bytes = [0u8; PieceIndex::SIZE];
1731    piece_index_bytes.copy_from_slice(&key.as_ref()[s..]);
1732
1733    PieceIndex::from_bytes(piece_index_bytes)
1734}