1mod metrics;
7mod piece_cache_state;
8#[cfg(not(miri))]
10#[cfg(test)]
11mod tests;
12
13use crate::farm::{MaybePieceStoredResult, PieceCache, PieceCacheId, PieceCacheOffset, PlotCache};
14use crate::farmer_cache::metrics::FarmerCacheMetrics;
15use crate::farmer_cache::piece_cache_state::PieceCachesState;
16use crate::node_client::NodeClient;
17use crate::utils::run_future_in_dedicated_thread;
18use ab_core_primitives::pieces::{Piece, PieceIndex};
19use ab_core_primitives::segments::{SegmentHeader, SegmentIndex};
20use ab_data_retrieval::piece_getter::PieceGetter;
21use ab_networking::KeyWithDistance;
22use ab_networking::libp2p::PeerId;
23use ab_networking::libp2p::kad::RecordKey;
24use ab_networking::utils::multihash::ToMultihash;
25use async_lock::RwLock as AsyncRwLock;
26use bytesize::ByteSize;
27use event_listener_primitives::{Bag, HandlerId};
28use futures::channel::mpsc;
29use futures::future::{Either, FusedFuture};
30use futures::stream::{FuturesOrdered, FuturesUnordered};
31use futures::{FutureExt, SinkExt, Stream, StreamExt, select, stream};
32use parking_lot::{Mutex, RwLock};
33use prometheus_client::registry::Registry;
34use rand::prelude::*;
35use rayon::prelude::*;
36use std::collections::hash_map::Entry;
37use std::collections::{HashMap, HashSet};
38use std::future::join;
39use std::sync::Arc;
40use std::sync::atomic::{AtomicUsize, Ordering};
41use std::task::Poll;
42use std::time::Duration;
43use std::{fmt, mem};
44use tokio::sync::Semaphore;
45use tokio::task::yield_now;
46use tracing::{Instrument, debug, error, info, info_span, trace, warn};
47
48const WORKER_CHANNEL_CAPACITY: usize = 100;
49const SYNC_BATCH_SIZE: usize = 256;
50const SYNC_CONCURRENT_BATCHES: usize = 4;
51const INTERMEDIATE_CACHE_UPDATE_INTERVAL: usize = 100;
54const INITIAL_SYNC_FARM_INFO_CHECK_INTERVAL: Duration = Duration::from_secs(1);
55
56type HandlerFn<A> = Arc<dyn Fn(&A) + Send + Sync + 'static>;
57type Handler<A> = Bag<HandlerFn<A>, A>;
58type CacheIndex = u8;
59
60#[derive(Default, Debug)]
61struct Handlers {
62 progress: Handler<f32>,
63}
64
65#[derive(Debug, Clone, Copy)]
66struct FarmerCacheOffset {
67 cache_index: CacheIndex,
68 piece_offset: PieceCacheOffset,
69}
70
71impl FarmerCacheOffset {
72 fn new(cache_index: CacheIndex, piece_offset: PieceCacheOffset) -> Self {
73 Self {
74 cache_index,
75 piece_offset,
76 }
77 }
78}
79
80#[derive(Debug, Clone)]
81struct CacheBackend {
82 backend: Arc<dyn PieceCache>,
83 used_capacity: u32,
84 total_capacity: u32,
85}
86
87impl std::ops::Deref for CacheBackend {
88 type Target = Arc<dyn PieceCache>;
89
90 fn deref(&self) -> &Self::Target {
91 &self.backend
92 }
93}
94
95impl CacheBackend {
96 fn new(backend: Arc<dyn PieceCache>, total_capacity: u32) -> Self {
97 Self {
98 backend,
99 used_capacity: 0,
100 total_capacity,
101 }
102 }
103
104 fn next_free(&mut self) -> Option<PieceCacheOffset> {
105 let offset = self.used_capacity;
106 if offset < self.total_capacity {
107 self.used_capacity += 1;
108 Some(PieceCacheOffset(offset))
109 } else {
110 debug!(?offset, total_capacity = ?self.total_capacity, "No free space in cache backend");
111 None
112 }
113 }
114
115 fn free_size(&self) -> u32 {
116 self.total_capacity - self.used_capacity
117 }
118}
119
120#[derive(Debug)]
121struct CacheState {
122 cache_stored_pieces: HashMap<KeyWithDistance, FarmerCacheOffset>,
123 cache_free_offsets: Vec<FarmerCacheOffset>,
124 backend: CacheBackend,
125}
126
127#[derive(Debug)]
128enum WorkerCommand {
129 ReplaceBackingCaches {
130 new_piece_caches: Vec<Arc<dyn PieceCache>>,
131 },
132 ForgetKey {
133 key: RecordKey,
134 },
135}
136
137#[derive(Debug)]
139#[must_use = "Farmer cache will not work unless its worker is running"]
140pub struct FarmerCacheWorker<NC>
141where
142 NC: fmt::Debug,
143{
144 peer_id: PeerId,
145 node_client: NC,
146 piece_caches: Arc<AsyncRwLock<PieceCachesState>>,
147 plot_caches: Arc<PlotCaches>,
148 handlers: Arc<Handlers>,
149 worker_receiver: Option<mpsc::Receiver<WorkerCommand>>,
150 metrics: Option<Arc<FarmerCacheMetrics>>,
151}
152
153impl<NC> FarmerCacheWorker<NC>
154where
155 NC: NodeClient,
156{
157 pub async fn run<PG>(mut self, piece_getter: PG)
161 where
162 PG: PieceGetter,
163 {
164 let mut last_segment_index_internal = SegmentIndex::ZERO;
166
167 let mut worker_receiver = self
168 .worker_receiver
169 .take()
170 .expect("Always set during worker instantiation");
171
172 if let Some(WorkerCommand::ReplaceBackingCaches { new_piece_caches }) =
173 worker_receiver.next().await
174 {
175 self.initialize(
176 &piece_getter,
177 &mut last_segment_index_internal,
178 new_piece_caches,
179 )
180 .await;
181 } else {
182 return;
184 }
185
186 let mut segment_headers_notifications =
187 match self.node_client.subscribe_archived_segment_headers().await {
188 Ok(segment_headers_notifications) => segment_headers_notifications,
189 Err(error) => {
190 error!(%error, "Failed to subscribe to archived segments notifications");
191 return;
192 }
193 };
194
195 self.keep_up_after_initial_sync(&piece_getter, &mut last_segment_index_internal)
199 .await;
200
201 loop {
202 select! {
203 maybe_command = worker_receiver.next() => {
204 let Some(command) = maybe_command else {
205 return;
207 };
208
209 self.handle_command(command, &piece_getter, &mut last_segment_index_internal).await;
210 }
211 maybe_segment_header = segment_headers_notifications.next().fuse() => {
212 if let Some(segment_header) = maybe_segment_header {
213 self.process_segment_header(&piece_getter, segment_header, &mut last_segment_index_internal).await;
214 } else {
215 return;
218 }
219 }
220 }
221 }
222 }
223
224 async fn handle_command<PG>(
225 &self,
226 command: WorkerCommand,
227 piece_getter: &PG,
228 last_segment_index_internal: &mut SegmentIndex,
229 ) where
230 PG: PieceGetter,
231 {
232 match command {
233 WorkerCommand::ReplaceBackingCaches { new_piece_caches } => {
234 self.initialize(piece_getter, last_segment_index_internal, new_piece_caches)
235 .await;
236 }
237 WorkerCommand::ForgetKey { key } => {
239 let mut caches = self.piece_caches.write().await;
240 let key = KeyWithDistance::new_with_record_key(self.peer_id, key);
241 let Some(offset) = caches.remove_stored_piece(&key) else {
242 return;
244 };
245
246 let cache_index = offset.cache_index;
247 let piece_offset = offset.piece_offset;
248 let Some(backend) = caches.get_backend(cache_index).cloned() else {
249 return;
251 };
252
253 caches.push_dangling_free_offset(offset);
254 match backend.read_piece_index(piece_offset).await {
255 Ok(Some(piece_index)) => {
256 trace!(%piece_index, %cache_index, %piece_offset, "Forget piece");
257 }
258 Ok(None) => {
259 warn!(
260 %cache_index,
261 %piece_offset,
262 "Piece index out of range, this is likely an implementation bug, \
263 not freeing heap element"
264 );
265 }
266 Err(error) => {
267 error!(
268 %error,
269 %cache_index,
270 ?key,
271 %piece_offset,
272 "Error while reading piece from cache"
273 );
274 }
275 }
276 }
277 }
278 }
279
280 async fn initialize<PG>(
281 &self,
282 piece_getter: &PG,
283 last_segment_index_internal: &mut SegmentIndex,
284 new_piece_caches: Vec<Arc<dyn PieceCache>>,
285 ) where
286 PG: PieceGetter,
287 {
288 info!("Initializing piece cache");
289
290 let (mut stored_pieces, mut dangling_free_offsets) =
292 mem::take(&mut *self.piece_caches.write().await).reuse();
293
294 debug!("Collecting pieces that were in the cache before");
295
296 if let Some(metrics) = &self.metrics {
297 metrics.piece_cache_capacity_total.set(0);
298 metrics.piece_cache_capacity_used.set(0);
299 }
300
301 let peer_id = self.peer_id;
302
303 let piece_caches_number = new_piece_caches.len();
305 let maybe_caches_futures = new_piece_caches
306 .into_iter()
307 .enumerate()
308 .filter_map(|(cache_index, new_cache)| {
309 let total_capacity = new_cache.max_num_elements();
310 let mut backend = CacheBackend::new(new_cache, total_capacity);
311 let Ok(cache_index) = CacheIndex::try_from(cache_index) else {
312 warn!(
313 ?piece_caches_number,
314 "Too many piece caches provided, {cache_index} cache will be ignored",
315 );
316 return None;
317 };
318
319 if let Some(metrics) = &self.metrics {
320 metrics
321 .piece_cache_capacity_total
322 .inc_by(total_capacity as i64);
323 }
324
325 let init_fut = async move {
326 let used_capacity = &mut backend.used_capacity;
327
328 let mut maybe_contents = match backend.backend.contents().await {
332 Ok(contents) => Some(contents),
333 Err(error) => {
334 warn!(%error, "Failed to get cache contents");
335
336 None
337 }
338 };
339
340 let mut cache_stored_pieces = HashMap::new();
341 let mut cache_free_offsets = Vec::new();
342
343 let Some(mut contents) = maybe_contents.take() else {
344 drop(maybe_contents);
345
346 return CacheState {
347 cache_stored_pieces,
348 cache_free_offsets,
349 backend,
350 };
351 };
352
353 while let Some(maybe_element_details) = contents.next().await {
354 let (piece_offset, maybe_piece_index) = match maybe_element_details {
355 Ok(element_details) => element_details,
356 Err(error) => {
357 warn!(%error, "Failed to get cache contents element details");
358 break;
359 }
360 };
361 let offset = FarmerCacheOffset::new(cache_index, piece_offset);
362 match maybe_piece_index {
363 Some(piece_index) => {
364 *used_capacity = piece_offset.0 + 1;
365 let record_key = RecordKey::from(piece_index.to_multihash());
366 let key = KeyWithDistance::new_with_record_key(peer_id, record_key);
367 cache_stored_pieces.insert(key, offset);
368 }
369 None => {
370 cache_free_offsets.push(offset);
373 }
374 }
375
376 yield_now().await;
378 }
379
380 drop(maybe_contents);
381 drop(contents);
382
383 CacheState {
384 cache_stored_pieces,
385 cache_free_offsets,
386 backend,
387 }
388 };
389
390 Some(run_future_in_dedicated_thread(
391 move || init_fut.instrument(info_span!("", %cache_index)),
392 format!("piece-cache.{cache_index}"),
393 ))
394 })
395 .collect::<Result<Vec<_>, _>>();
396
397 let caches_futures = match maybe_caches_futures {
398 Ok(caches_futures) => caches_futures,
399 Err(error) => {
400 error!(%error, "Failed to spawn piece cache reading thread");
401
402 return;
403 }
404 };
405
406 let mut backends = Vec::with_capacity(caches_futures.len());
407 let mut caches_futures = caches_futures.into_iter().collect::<FuturesOrdered<_>>();
408
409 while let Some(maybe_cache) = caches_futures.next().await {
410 match maybe_cache {
411 Ok(cache) => {
412 let backend = cache.backend;
413 for (key, cache_offset) in cache.cache_stored_pieces {
414 if let Some(old_cache_offset) = stored_pieces.insert(key, cache_offset) {
415 dangling_free_offsets.push_front(old_cache_offset);
416 }
417 }
418 dangling_free_offsets.extend(
419 cache.cache_free_offsets.into_iter().filter(|free_offset| {
420 free_offset.piece_offset.0 < backend.used_capacity
421 }),
422 );
423 backends.push(backend);
424 }
425 Err(_cancelled) => {
426 error!("Piece cache reading thread panicked");
427
428 return;
429 }
430 };
431 }
432
433 let mut caches = PieceCachesState::new(stored_pieces, dangling_free_offsets, backends);
434
435 info!("Synchronizing piece cache");
436
437 let last_segment_index = loop {
438 match self.node_client.farmer_app_info().await {
439 Ok(farmer_app_info) => {
440 let last_segment_index =
441 farmer_app_info.protocol_info.history_size.segment_index();
442 if !farmer_app_info.syncing || last_segment_index > SegmentIndex::ZERO {
450 break last_segment_index;
451 }
452 }
453 Err(error) => {
454 error!(
455 %error,
456 "Failed to get farmer app info from node, keeping old cache state without \
457 updates"
458 );
459
460 *self.piece_caches.write().await = caches;
462 return;
463 }
464 }
465
466 tokio::time::sleep(INITIAL_SYNC_FARM_INFO_CHECK_INTERVAL).await;
467 };
468
469 debug!(%last_segment_index, "Identified last segment index");
470
471 let segment_indices = Vec::from_iter(SegmentIndex::ZERO..=last_segment_index);
473 let mut piece_indices_to_store = segment_indices
476 .into_par_iter()
477 .flat_map(|segment_index| {
478 segment_index
479 .segment_piece_indexes()
480 .into_par_iter()
481 .map(|piece_index| {
482 (
483 KeyWithDistance::new(self.peer_id, piece_index.to_multihash()),
484 piece_index,
485 )
486 })
487 })
488 .collect::<Vec<_>>();
489
490 piece_indices_to_store.par_sort_unstable_by(|(a_key, _), (b_key, _)| a_key.cmp(b_key));
493
494 let mut piece_indices_to_store = piece_indices_to_store
496 .into_iter()
497 .take(caches.total_capacity())
498 .collect::<HashMap<_, _>>();
499
500 let mut piece_caches_capacity_used = vec![0u32; caches.backends().len()];
501 caches.free_unneeded_stored_pieces(&mut piece_indices_to_store);
505
506 if let Some(metrics) = &self.metrics {
507 for offset in caches.stored_pieces_offsets() {
508 piece_caches_capacity_used[usize::from(offset.cache_index)] += 1;
509 }
510
511 for cache_used in piece_caches_capacity_used {
512 metrics
513 .piece_cache_capacity_used
514 .inc_by(i64::from(cache_used));
515 }
516 }
517
518 self.piece_caches.write().await.clone_from(&caches);
520 let stored_count = caches.stored_pieces_offsets().len();
521
522 debug!(
523 %stored_count,
524 count = %piece_indices_to_store.len(),
525 "Identified piece indices that should be cached",
526 );
527
528 let pieces_to_download_total = piece_indices_to_store.len() + stored_count;
529 let piece_indices_to_store = piece_indices_to_store
530 .into_values()
531 .collect::<Vec<_>>()
532 .chunks(SYNC_BATCH_SIZE)
536 .map(|chunk| chunk.to_vec())
537 .collect::<Vec<_>>();
538
539 let downloaded_pieces_count = AtomicUsize::new(stored_count);
540 let caches = Mutex::new(caches);
541 self.handlers.progress.call_simple(&0.0);
542 let batch_count = piece_indices_to_store.len();
543 let piece_indices_to_store = piece_indices_to_store.into_iter().enumerate();
544
545 let downloading_semaphore = &Semaphore::new(SYNC_BATCH_SIZE * SYNC_CONCURRENT_BATCHES);
546 let ignored_cache_indices = &RwLock::new(HashSet::new());
547
548 let downloading_pieces_stream =
549 stream::iter(piece_indices_to_store.map(|(batch, piece_indices)| {
550 let downloaded_pieces_count = &downloaded_pieces_count;
551 let caches = &caches;
552 let num_pieces = piece_indices.len();
553
554 trace!(
555 %num_pieces,
556 %batch,
557 %batch_count,
558 first_piece_index = ?piece_indices.first().expect("chunks are never empty"),
559 last_piece_index = ?piece_indices.last().expect("chunks are never empty"),
560 downloaded_pieces_count = %downloaded_pieces_count.load(Ordering::Relaxed),
561 %pieces_to_download_total,
562 available_permits = %downloading_semaphore.available_permits(),
563 "Started piece cache sync batch",
564 );
565
566 async move {
567 let mut permit = downloading_semaphore
568 .acquire_many(SYNC_BATCH_SIZE as u32)
569 .await
570 .expect("Semaphore is never closed; qed");
571 debug!(%batch, %num_pieces, "Downloading pieces");
572
573 let pieces_stream = match piece_getter.get_pieces(piece_indices).await {
574 Ok(pieces_stream) => pieces_stream,
575 Err(error) => {
576 error!(
577 %error,
578 "Failed to get pieces from piece getter"
579 );
580 return;
581 }
582 };
583 let mut pieces_stream = pieces_stream.enumerate();
584
585 while let Some((index, (piece_index, result))) = pieces_stream.next().await {
586 debug!(%batch, %index, %piece_index, "Downloaded piece");
587 let _permit = permit.split(1);
589
590 let piece = match result {
591 Ok(Some(piece)) => {
592 trace!(%batch, %piece_index, "Downloaded piece successfully");
593 piece
594 }
595 Ok(None) => {
596 debug!(%batch, %piece_index, "Couldn't find piece");
597 continue;
598 }
599 Err(error) => {
600 debug!(
601 %batch,
602 %error,
603 %piece_index,
604 "Failed to get piece for piece cache"
605 );
606 continue;
607 }
608 };
609
610 let (offset, maybe_backend) = {
611 let mut caches = caches.lock();
612
613 let Some(offset) = caches.pop_free_offset() else {
615 error!(
616 %batch,
617 %piece_index,
618 "Failed to store piece in cache, there was no space"
619 );
620 break;
621 };
622
623 (offset, caches.get_backend(offset.cache_index).cloned())
624 };
625
626 let cache_index = offset.cache_index;
627 let piece_offset = offset.piece_offset;
628
629 let skip_write = ignored_cache_indices.read().contains(&cache_index);
630 if skip_write {
631 trace!(
632 %batch,
633 %cache_index,
634 %piece_index,
635 %piece_offset,
636 "Skipping known problematic cache index"
637 );
638 } else {
639 if let Some(backend) = maybe_backend
640 && let Err(error) =
641 backend.write_piece(piece_offset, piece_index, &piece).await
642 {
643 error!(
644 %error,
645 %batch,
646 %cache_index,
647 %piece_index,
648 %piece_offset,
649 "Failed to write piece into cache, ignoring this cache going \
650 forward"
651 );
652 ignored_cache_indices.write().insert(cache_index);
653 continue;
654 }
655
656 let key =
657 KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
658 caches.lock().push_stored_piece(key, offset);
659 }
660
661 let prev_downloaded_pieces_count =
662 downloaded_pieces_count.fetch_add(1, Ordering::Relaxed);
663 if prev_downloaded_pieces_count != pieces_to_download_total {
666 let progress = prev_downloaded_pieces_count as f32
667 / pieces_to_download_total as f32
668 * 100.0;
669 if prev_downloaded_pieces_count
670 .is_multiple_of(INTERMEDIATE_CACHE_UPDATE_INTERVAL)
671 {
672 let mut piece_caches = self.piece_caches.write().await;
673 piece_caches.clone_from(&caches.lock());
674
675 info!(
676 "Piece cache sync {progress:.2}% complete ({} / {})",
677 ByteSize::b(
678 (prev_downloaded_pieces_count * Piece::SIZE) as u64,
679 )
680 .display()
681 .iec(),
682 ByteSize::b((pieces_to_download_total * Piece::SIZE) as u64,)
683 .display()
684 .iec(),
685 );
686 }
687
688 self.handlers.progress.call_simple(&progress);
689 }
690 }
691
692 trace!(
693 %num_pieces,
694 %batch,
695 %batch_count,
696 downloaded_pieces_count = %downloaded_pieces_count.load(Ordering::Relaxed),
697 %pieces_to_download_total,
698 available_permits = %downloading_semaphore.available_permits(),
699 "Finished piece cache sync batch",
700 );
701 }
702 }));
703
704 downloading_pieces_stream
707 .buffer_unordered(SYNC_CONCURRENT_BATCHES * 10)
710 .for_each(|()| async {})
712 .await;
713
714 *self.piece_caches.write().await = caches.into_inner();
715 self.handlers.progress.call_simple(&100.0);
716 *last_segment_index_internal = last_segment_index;
717
718 info!("Finished piece cache synchronization");
719 }
720
721 async fn process_segment_header<PG>(
722 &self,
723 piece_getter: &PG,
724 segment_header: SegmentHeader,
725 last_segment_index_internal: &mut SegmentIndex,
726 ) where
727 PG: PieceGetter,
728 {
729 let segment_index = segment_header.segment_index();
730 debug!(%segment_index, "Starting to process newly archived segment");
731
732 if *last_segment_index_internal < segment_index {
733 debug!(%segment_index, "Downloading potentially useful pieces");
734
735 let pieces_to_maybe_include = segment_index
739 .segment_piece_indexes()
740 .into_iter()
741 .map(|piece_index| async move {
742 let should_store_in_piece_cache = self
743 .piece_caches
744 .read()
745 .await
746 .should_include_key(self.peer_id, piece_index);
747
748 let key = RecordKey::from(piece_index.to_multihash());
749 let should_store_in_plot_cache =
750 self.plot_caches.should_store(piece_index, &key).await;
751
752 if !(should_store_in_piece_cache || should_store_in_plot_cache) {
753 trace!(%piece_index, "Piece doesn't need to be cached #1");
754
755 return None;
756 }
757
758 let maybe_piece_result =
759 self.node_client
760 .piece(piece_index)
761 .await
762 .inspect_err(|error| {
763 debug!(
764 %error,
765 %segment_index,
766 %piece_index,
767 "Failed to retrieve piece from node right after archiving"
768 );
769 });
770
771 if let Ok(Some(piece)) = maybe_piece_result {
772 return Some((piece_index, piece));
773 }
774
775 match piece_getter.get_piece(piece_index).await {
776 Ok(Some(piece)) => Some((piece_index, piece)),
777 Ok(None) => {
778 warn!(
779 %segment_index,
780 %piece_index,
781 "Failed to retrieve piece right after archiving"
782 );
783
784 None
785 }
786 Err(error) => {
787 warn!(
788 %error,
789 %segment_index,
790 %piece_index,
791 "Failed to retrieve piece right after archiving"
792 );
793
794 None
795 }
796 }
797 })
798 .collect::<FuturesUnordered<_>>()
799 .filter_map(|maybe_piece| async move { maybe_piece })
800 .collect::<Vec<_>>()
801 .await;
802
803 debug!(%segment_index, "Downloaded potentially useful pieces");
804
805 self.acknowledge_archived_segment_processing(segment_index)
806 .await;
807
808 for (piece_index, piece) in pieces_to_maybe_include {
811 if !self
812 .plot_caches
813 .store_additional_piece(piece_index, &piece)
814 .await
815 {
816 trace!(%piece_index, "Piece could not be cached in plot cache");
817 }
818
819 if !self
820 .piece_caches
821 .read()
822 .await
823 .should_include_key(self.peer_id, piece_index)
824 {
825 trace!(%piece_index, "Piece doesn't need to be cached #2");
826
827 continue;
828 }
829
830 trace!(%piece_index, "Piece needs to be cached #1");
831
832 self.persist_piece_in_cache(piece_index, piece).await;
833 }
834
835 *last_segment_index_internal = segment_index;
836 } else {
837 self.acknowledge_archived_segment_processing(segment_index)
838 .await;
839 }
840
841 debug!(%segment_index, "Finished processing newly archived segment");
842 }
843
844 async fn acknowledge_archived_segment_processing(&self, segment_index: SegmentIndex) {
845 match self
846 .node_client
847 .acknowledge_archived_segment_header(segment_index)
848 .await
849 {
850 Ok(()) => {
851 debug!(%segment_index, "Acknowledged archived segment");
852 }
853 Err(error) => {
854 error!(%segment_index, ?error, "Failed to acknowledge archived segment");
855 }
856 };
857 }
858
859 async fn keep_up_after_initial_sync<PG>(
860 &self,
861 piece_getter: &PG,
862 last_segment_index_internal: &mut SegmentIndex,
863 ) where
864 PG: PieceGetter,
865 {
866 let last_segment_index = match self.node_client.farmer_app_info().await {
867 Ok(farmer_app_info) => farmer_app_info.protocol_info.history_size.segment_index(),
868 Err(error) => {
869 error!(
870 %error,
871 "Failed to get farmer app info from node, keeping old cache state without \
872 updates"
873 );
874 return;
875 }
876 };
877
878 if last_segment_index <= *last_segment_index_internal {
879 return;
880 }
881
882 info!(
883 "Syncing piece cache to the latest history size, this may pause block production if \
884 takes too long"
885 );
886
887 let piece_indices = (*last_segment_index_internal..=last_segment_index)
889 .flat_map(|segment_index| segment_index.segment_piece_indexes());
890
891 for piece_index in piece_indices {
893 if !self
894 .piece_caches
895 .read()
896 .await
897 .should_include_key(self.peer_id, piece_index)
898 {
899 trace!(%piece_index, "Piece doesn't need to be cached #3");
900
901 continue;
902 }
903
904 trace!(%piece_index, "Piece needs to be cached #2");
905
906 let result = piece_getter.get_piece(piece_index).await;
907
908 let piece = match result {
909 Ok(Some(piece)) => piece,
910 Ok(None) => {
911 debug!(%piece_index, "Couldn't find piece");
912 continue;
913 }
914 Err(error) => {
915 debug!(
916 %error,
917 %piece_index,
918 "Failed to get piece for piece cache"
919 );
920 continue;
921 }
922 };
923
924 self.persist_piece_in_cache(piece_index, piece).await;
925 }
926
927 info!("Finished syncing piece cache to the latest history size");
928
929 *last_segment_index_internal = last_segment_index;
930 }
931
932 async fn persist_piece_in_cache(&self, piece_index: PieceIndex, piece: Piece) {
935 let key = KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
936 let mut caches = self.piece_caches.write().await;
937 match caches.should_replace(&key) {
938 Some((old_key, offset)) => {
940 let cache_index = offset.cache_index;
941 let piece_offset = offset.piece_offset;
942 let Some(backend) = caches.get_backend(cache_index) else {
943 warn!(
945 %cache_index,
946 %piece_index,
947 "Should have a cached backend, but it didn't exist, this is an \
948 implementation bug"
949 );
950 return;
951 };
952 if let Err(error) = backend.write_piece(piece_offset, piece_index, &piece).await {
953 error!(
954 %error,
955 %cache_index,
956 %piece_index,
957 %piece_offset,
958 "Failed to write piece into cache"
959 );
960 } else {
961 let old_piece_index = decode_piece_index_from_record_key(old_key.record_key());
962 trace!(
963 %cache_index,
964 %old_piece_index,
965 %piece_index,
966 %piece_offset,
967 "Successfully replaced old cached piece"
968 );
969 caches.push_stored_piece(key, offset);
970 }
971 }
972 None => {
974 let Some(offset) = caches.pop_free_offset() else {
975 warn!(
976 %piece_index,
977 "Should have inserted piece into cache, but it didn't happen, this is an \
978 implementation bug"
979 );
980 return;
981 };
982 let cache_index = offset.cache_index;
983 let piece_offset = offset.piece_offset;
984 let Some(backend) = caches.get_backend(cache_index) else {
985 warn!(
987 %cache_index,
988 %piece_index,
989 "Should have a cached backend, but it didn't exist, this is an \
990 implementation bug"
991 );
992 return;
993 };
994
995 if let Err(error) = backend.write_piece(piece_offset, piece_index, &piece).await {
996 error!(
997 %error,
998 %cache_index,
999 %piece_index,
1000 %piece_offset,
1001 "Failed to write piece into cache"
1002 );
1003 } else {
1004 trace!(
1005 %cache_index,
1006 %piece_index,
1007 %piece_offset,
1008 "Successfully stored piece in cache"
1009 );
1010 if let Some(metrics) = &self.metrics {
1011 metrics.piece_cache_capacity_used.inc();
1012 }
1013 caches.push_stored_piece(key, offset);
1014 }
1015 }
1016 };
1017 }
1018}
1019
1020#[derive(Debug)]
1021struct PlotCaches {
1022 caches: AsyncRwLock<Vec<Arc<dyn PlotCache>>>,
1024 next_plot_cache: AtomicUsize,
1026}
1027
1028impl PlotCaches {
1029 async fn should_store(&self, piece_index: PieceIndex, key: &RecordKey) -> bool {
1035 for (cache_index, cache) in self.caches.read().await.iter().enumerate() {
1036 match cache.is_piece_maybe_stored(key).await {
1037 Ok(MaybePieceStoredResult::No) => {
1038 }
1040 Ok(MaybePieceStoredResult::Vacant) => {
1041 return true;
1042 }
1043 Ok(MaybePieceStoredResult::Yes) => {
1044 return false;
1046 }
1047 Err(error) => {
1048 warn!(
1049 %cache_index,
1050 %piece_index,
1051 %error,
1052 "Failed to check piece stored in cache"
1053 );
1054 }
1055 }
1056 }
1057
1058 false
1059 }
1060
1061 async fn store_additional_piece(&self, piece_index: PieceIndex, piece: &Piece) -> bool {
1065 let plot_caches = self.caches.read().await;
1066 let plot_caches_len = plot_caches.len();
1067
1068 for _ in 0..plot_caches_len {
1070 let plot_cache_index =
1071 self.next_plot_cache.fetch_add(1, Ordering::Relaxed) % plot_caches_len;
1072
1073 match plot_caches[plot_cache_index]
1074 .try_store_piece(piece_index, piece)
1075 .await
1076 {
1077 Ok(true) => {
1078 return true;
1079 }
1080 Ok(false) => {
1081 continue;
1082 }
1083 Err(error) => {
1084 error!(
1085 %error,
1086 %piece_index,
1087 %plot_cache_index,
1088 "Failed to store additional piece in cache"
1089 );
1090 continue;
1091 }
1092 }
1093 }
1094
1095 false
1096 }
1097}
1098
1099#[derive(Debug, Clone)]
1110pub struct FarmerCache {
1111 peer_id: PeerId,
1112 piece_caches: Arc<AsyncRwLock<PieceCachesState>>,
1114 plot_caches: Arc<PlotCaches>,
1116 handlers: Arc<Handlers>,
1117 worker_sender: mpsc::Sender<WorkerCommand>,
1119 metrics: Option<Arc<FarmerCacheMetrics>>,
1120}
1121
1122impl FarmerCache {
1123 pub fn new<NC>(
1128 node_client: NC,
1129 peer_id: PeerId,
1130 registry: Option<&mut Registry>,
1131 ) -> (Self, FarmerCacheWorker<NC>)
1132 where
1133 NC: NodeClient,
1134 {
1135 let caches = Arc::default();
1136 let (worker_sender, worker_receiver) = mpsc::channel(WORKER_CHANNEL_CAPACITY);
1137 let handlers = Arc::new(Handlers::default());
1138
1139 let plot_caches = Arc::new(PlotCaches {
1140 caches: AsyncRwLock::default(),
1141 next_plot_cache: AtomicUsize::new(0),
1142 });
1143 let metrics = registry.map(|registry| Arc::new(FarmerCacheMetrics::new(registry)));
1144
1145 let instance = Self {
1146 peer_id,
1147 piece_caches: Arc::clone(&caches),
1148 plot_caches: Arc::clone(&plot_caches),
1149 handlers: Arc::clone(&handlers),
1150 worker_sender,
1151 metrics: metrics.clone(),
1152 };
1153 let worker = FarmerCacheWorker {
1154 peer_id,
1155 node_client,
1156 piece_caches: caches,
1157 plot_caches,
1158 handlers,
1159 worker_receiver: Some(worker_receiver),
1160 metrics,
1161 };
1162
1163 (instance, worker)
1164 }
1165
1166 pub async fn get_piece<Key>(&self, key: Key) -> Option<Piece>
1168 where
1169 RecordKey: From<Key>,
1170 {
1171 let key = RecordKey::from(key);
1172 let maybe_piece_found = {
1173 let key = KeyWithDistance::new_with_record_key(self.peer_id, key.clone());
1174 let caches = self.piece_caches.read().await;
1175
1176 caches.get_stored_piece(&key).and_then(|offset| {
1177 let cache_index = offset.cache_index;
1178 let piece_offset = offset.piece_offset;
1179 Some((
1180 piece_offset,
1181 cache_index,
1182 caches.get_backend(cache_index)?.clone(),
1183 ))
1184 })
1185 };
1186
1187 if let Some((piece_offset, cache_index, backend)) = maybe_piece_found {
1188 match backend.read_piece(piece_offset).await {
1189 Ok(maybe_piece) => {
1190 return match maybe_piece {
1191 Some((_piece_index, piece)) => {
1192 if let Some(metrics) = &self.metrics {
1193 metrics.cache_get_hit.inc();
1194 }
1195 Some(piece)
1196 }
1197 None => {
1198 error!(
1199 %cache_index,
1200 %piece_offset,
1201 ?key,
1202 "Piece was expected to be in cache, but wasn't found there"
1203 );
1204 if let Some(metrics) = &self.metrics {
1205 metrics.cache_get_error.inc();
1206 }
1207 None
1208 }
1209 };
1210 }
1211 Err(error) => {
1212 error!(
1213 %error,
1214 %cache_index,
1215 ?key,
1216 %piece_offset,
1217 "Error while reading piece from cache"
1218 );
1219
1220 if let Err(error) = self
1221 .worker_sender
1222 .clone()
1223 .send(WorkerCommand::ForgetKey { key })
1224 .await
1225 {
1226 trace!(%error, "Failed to send ForgetKey command to worker");
1227 }
1228
1229 if let Some(metrics) = &self.metrics {
1230 metrics.cache_get_error.inc();
1231 }
1232 return None;
1233 }
1234 }
1235 }
1236
1237 for cache in self.plot_caches.caches.read().await.iter() {
1238 if let Ok(Some(piece)) = cache.read_piece(&key).await {
1239 if let Some(metrics) = &self.metrics {
1240 metrics.cache_get_hit.inc();
1241 }
1242 return Some(piece);
1243 }
1244 }
1245
1246 if let Some(metrics) = &self.metrics {
1247 metrics.cache_get_miss.inc();
1248 }
1249 None
1250 }
1251
1252 pub async fn get_pieces<'a, PieceIndices>(
1256 &'a self,
1257 piece_indices: PieceIndices,
1258 ) -> impl Stream<Item = (PieceIndex, Option<Piece>)> + Send + Unpin + 'a
1259 where
1260 PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send + 'a> + Send + 'a,
1261 {
1262 let mut pieces_to_get_from_plot_cache = Vec::new();
1263
1264 let pieces_to_read_from_piece_cache = {
1265 let caches = self.piece_caches.read().await;
1266 let mut pieces_to_read_from_piece_cache =
1268 HashMap::<CacheIndex, (CacheBackend, HashMap<_, _>)>::new();
1269
1270 for piece_index in piece_indices {
1271 let key = RecordKey::from(piece_index.to_multihash());
1272
1273 let offset = match caches.get_stored_piece(&KeyWithDistance::new_with_record_key(
1274 self.peer_id,
1275 key.clone(),
1276 )) {
1277 Some(offset) => offset,
1278 None => {
1279 pieces_to_get_from_plot_cache.push((piece_index, key));
1280 continue;
1281 }
1282 };
1283
1284 let cache_index = offset.cache_index;
1285 let piece_offset = offset.piece_offset;
1286
1287 match pieces_to_read_from_piece_cache.entry(cache_index) {
1288 Entry::Occupied(mut entry) => {
1289 let (_backend, pieces) = entry.get_mut();
1290 pieces.insert(piece_offset, (piece_index, key));
1291 }
1292 Entry::Vacant(entry) => {
1293 let backend = match caches.get_backend(cache_index) {
1294 Some(backend) => backend.clone(),
1295 None => {
1296 pieces_to_get_from_plot_cache.push((piece_index, key));
1297 continue;
1298 }
1299 };
1300 entry
1301 .insert((backend, HashMap::from([(piece_offset, (piece_index, key))])));
1302 }
1303 }
1304 }
1305
1306 pieces_to_read_from_piece_cache
1307 };
1308
1309 let (tx, mut rx) = mpsc::unbounded();
1310
1311 let fut = async move {
1312 let tx = &tx;
1313
1314 let mut reading_from_piece_cache = pieces_to_read_from_piece_cache
1315 .into_iter()
1316 .map(|(cache_index, (backend, mut pieces_to_get))| async move {
1317 let mut pieces_stream = match backend
1318 .read_pieces(Box::new(
1319 pieces_to_get
1320 .keys()
1321 .copied()
1322 .collect::<Vec<_>>()
1323 .into_iter(),
1324 ))
1325 .await
1326 {
1327 Ok(pieces_stream) => pieces_stream,
1328 Err(error) => {
1329 error!(
1330 %error,
1331 %cache_index,
1332 "Error while reading pieces from cache"
1333 );
1334
1335 if let Some(metrics) = &self.metrics {
1336 metrics.cache_get_error.inc_by(pieces_to_get.len() as u64);
1337 }
1338 for (piece_index, _key) in pieces_to_get.into_values() {
1339 tx.unbounded_send((piece_index, None)).expect(
1340 "This future isn't polled after receiver is dropped; qed",
1341 );
1342 }
1343 return;
1344 }
1345 };
1346
1347 while let Some(maybe_piece) = pieces_stream.next().await {
1348 let result = match maybe_piece {
1349 Ok((piece_offset, Some((piece_index, piece)))) => {
1350 pieces_to_get.remove(&piece_offset);
1351
1352 if let Some(metrics) = &self.metrics {
1353 metrics.cache_get_hit.inc();
1354 }
1355 (piece_index, Some(piece))
1356 }
1357 Ok((piece_offset, None)) => {
1358 let Some((piece_index, key)) = pieces_to_get.remove(&piece_offset)
1359 else {
1360 debug!(
1361 %cache_index,
1362 %piece_offset,
1363 "Received piece offset that was not expected"
1364 );
1365 continue;
1366 };
1367
1368 error!(
1369 %cache_index,
1370 %piece_index,
1371 %piece_offset,
1372 ?key,
1373 "Piece was expected to be in cache, but wasn't found there"
1374 );
1375 if let Some(metrics) = &self.metrics {
1376 metrics.cache_get_error.inc();
1377 }
1378 (piece_index, None)
1379 }
1380 Err(error) => {
1381 error!(
1382 %error,
1383 %cache_index,
1384 "Error while reading piece from cache"
1385 );
1386
1387 if let Some(metrics) = &self.metrics {
1388 metrics.cache_get_error.inc();
1389 }
1390 continue;
1391 }
1392 };
1393
1394 tx.unbounded_send(result)
1395 .expect("This future isn't polled after receiver is dropped; qed");
1396 }
1397
1398 if pieces_to_get.is_empty() {
1399 return;
1400 }
1401
1402 if let Some(metrics) = &self.metrics {
1403 metrics.cache_get_error.inc_by(pieces_to_get.len() as u64);
1404 }
1405 for (piece_offset, (piece_index, key)) in pieces_to_get {
1406 error!(
1407 %cache_index,
1408 %piece_index,
1409 %piece_offset,
1410 ?key,
1411 "Piece cache didn't return an entry for offset"
1412 );
1413
1414 tx.unbounded_send((piece_index, None))
1417 .expect("This future isn't polled after receiver is dropped; qed");
1418 }
1419 })
1420 .collect::<FuturesUnordered<_>>();
1421 let reading_from_piece_cache_fut = async move {
1427 while let Some(()) = reading_from_piece_cache.next().await {
1428 }
1430 };
1431
1432 let reading_from_plot_cache_fut = async {
1433 if pieces_to_get_from_plot_cache.is_empty() {
1434 return;
1435 }
1436
1437 for cache in self.plot_caches.caches.read().await.iter() {
1438 for offset in (0..pieces_to_get_from_plot_cache.len()).rev() {
1441 let (piece_index, key) = &pieces_to_get_from_plot_cache[offset];
1442
1443 if let Ok(Some(piece)) = cache.read_piece(key).await {
1444 if let Some(metrics) = &self.metrics {
1445 metrics.cache_get_hit.inc();
1446 }
1447 tx.unbounded_send((*piece_index, Some(piece)))
1448 .expect("This future isn't polled after receiver is dropped; qed");
1449
1450 pieces_to_get_from_plot_cache.swap_remove(offset);
1453 }
1454 }
1455
1456 if pieces_to_get_from_plot_cache.is_empty() {
1457 return;
1458 }
1459 }
1460
1461 if let Some(metrics) = &self.metrics {
1462 metrics
1463 .cache_get_miss
1464 .inc_by(pieces_to_get_from_plot_cache.len() as u64);
1465 }
1466
1467 for (piece_index, _key) in pieces_to_get_from_plot_cache {
1468 tx.unbounded_send((piece_index, None))
1469 .expect("This future isn't polled after receiver is dropped; qed");
1470 }
1471 };
1472
1473 join!(reading_from_piece_cache_fut, reading_from_plot_cache_fut).await
1474 };
1475 let mut fut = Box::pin(fut.fuse());
1476
1477 stream::poll_fn(move |cx| {
1479 if !fut.is_terminated() {
1480 let _ = fut.poll_unpin(cx);
1482 }
1483
1484 if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) {
1485 return Poll::Ready(maybe_result);
1486 }
1487
1488 Poll::Pending
1490 })
1491 }
1492
1493 pub async fn has_pieces(&self, mut piece_indices: Vec<PieceIndex>) -> Vec<PieceIndex> {
1495 let mut pieces_to_find = HashMap::<PieceIndex, RecordKey>::from_iter(
1496 piece_indices
1497 .iter()
1498 .map(|piece_index| (*piece_index, RecordKey::from(piece_index.to_multihash()))),
1499 );
1500
1501 {
1503 let piece_caches = self.piece_caches.read().await;
1504 pieces_to_find.retain(|_piece_index, key| {
1505 let distance_key = KeyWithDistance::new(self.peer_id, key.clone());
1506 !piece_caches.contains_stored_piece(&distance_key)
1507 });
1508 }
1509
1510 if pieces_to_find.is_empty() {
1512 return piece_indices;
1513 }
1514
1515 if let Some(plot_caches) = self.plot_caches.caches.try_read() {
1517 let plot_caches = &plot_caches;
1518 let not_found = pieces_to_find
1519 .into_iter()
1520 .map(|(piece_index, key)| async move {
1521 let key = &key;
1522
1523 let found = plot_caches
1524 .iter()
1525 .map(|plot_cache| async {
1526 matches!(
1527 plot_cache.is_piece_maybe_stored(key).await,
1528 Ok(MaybePieceStoredResult::Yes)
1529 )
1530 })
1531 .collect::<FuturesUnordered<_>>()
1532 .any(|found| async move { found })
1533 .await;
1534
1535 if found { None } else { Some(piece_index) }
1536 })
1537 .collect::<FuturesUnordered<_>>()
1538 .filter_map(|maybe_piece_index| async move { maybe_piece_index })
1539 .collect::<HashSet<_>>()
1540 .await;
1541 piece_indices.retain(|piece_index| !not_found.contains(piece_index));
1542 }
1543 piece_indices
1544 }
1545
1546 pub async fn find_piece(
1548 &self,
1549 piece_index: PieceIndex,
1550 ) -> Option<(PieceCacheId, PieceCacheOffset)> {
1551 let caches = self.piece_caches.read().await;
1552
1553 self.find_piece_internal(&caches, piece_index)
1554 }
1555
1556 pub async fn find_pieces<PieceIndices>(
1558 &self,
1559 piece_indices: PieceIndices,
1560 ) -> Vec<(PieceIndex, PieceCacheId, PieceCacheOffset)>
1561 where
1562 PieceIndices: IntoIterator<Item = PieceIndex>,
1563 {
1564 let caches = self.piece_caches.read().await;
1565
1566 piece_indices
1567 .into_iter()
1568 .filter_map(|piece_index| {
1569 self.find_piece_internal(&caches, piece_index)
1570 .map(|(cache_id, piece_offset)| (piece_index, cache_id, piece_offset))
1571 })
1572 .collect()
1573 }
1574
1575 fn find_piece_internal(
1576 &self,
1577 caches: &PieceCachesState,
1578 piece_index: PieceIndex,
1579 ) -> Option<(PieceCacheId, PieceCacheOffset)> {
1580 let key = KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
1581
1582 let Some(offset) = caches.get_stored_piece(&key) else {
1583 if let Some(metrics) = &self.metrics {
1584 metrics.cache_find_miss.inc();
1585 }
1586
1587 return None;
1588 };
1589 let piece_offset = offset.piece_offset;
1590
1591 if let Some(backend) = caches.get_backend(offset.cache_index) {
1592 if let Some(metrics) = &self.metrics {
1593 metrics.cache_find_hit.inc();
1594 }
1595 return Some((*backend.id(), piece_offset));
1596 }
1597
1598 if let Some(metrics) = &self.metrics {
1599 metrics.cache_find_miss.inc();
1600 }
1601 None
1602 }
1603
1604 pub async fn maybe_store_additional_piece(
1608 &self,
1609 piece_index: PieceIndex,
1610 piece: &Piece,
1611 ) -> bool {
1612 let key = RecordKey::from(piece_index.to_multihash());
1613
1614 let should_store = self.plot_caches.should_store(piece_index, &key).await;
1615
1616 if !should_store {
1617 return false;
1618 }
1619
1620 self.plot_caches
1621 .store_additional_piece(piece_index, piece)
1622 .await
1623 }
1624
1625 pub async fn replace_backing_caches(
1627 &self,
1628 new_piece_caches: Vec<Arc<dyn PieceCache>>,
1629 new_plot_caches: Vec<Arc<dyn PlotCache>>,
1630 ) {
1631 if let Err(error) = self
1632 .worker_sender
1633 .clone()
1634 .send(WorkerCommand::ReplaceBackingCaches { new_piece_caches })
1635 .await
1636 {
1637 warn!(%error, "Failed to replace backing caches, worker exited");
1638 }
1639
1640 *self.plot_caches.caches.write().await = new_plot_caches;
1641 }
1642
1643 pub fn on_sync_progress(&self, callback: HandlerFn<f32>) -> HandlerId {
1645 self.handlers.progress.add(callback)
1646 }
1647}
1648
1649#[derive(Debug, Clone)]
1651pub struct FarmerCaches {
1652 caches: Arc<[FarmerCache]>,
1653}
1654
1655impl From<Arc<[FarmerCache]>> for FarmerCaches {
1656 fn from(caches: Arc<[FarmerCache]>) -> Self {
1657 Self { caches }
1658 }
1659}
1660
1661impl From<FarmerCache> for FarmerCaches {
1662 fn from(cache: FarmerCache) -> Self {
1663 Self {
1664 caches: Arc::new([cache]),
1665 }
1666 }
1667}
1668
1669impl FarmerCaches {
1670 pub async fn get_piece<Key>(&self, key: Key) -> Option<Piece>
1672 where
1673 RecordKey: From<Key>,
1674 {
1675 let farmer_cache = self.caches.choose(&mut rand::rng())?;
1676 farmer_cache.get_piece(key).await
1677 }
1678
1679 pub async fn get_pieces<'a, PieceIndices>(
1683 &'a self,
1684 piece_indices: PieceIndices,
1685 ) -> impl Stream<Item = (PieceIndex, Option<Piece>)> + Send + Unpin + 'a
1686 where
1687 PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send + 'a> + Send + 'a,
1688 {
1689 let Some(farmer_cache) = self.caches.choose(&mut rand::rng()) else {
1690 return Either::Left(stream::iter(
1691 piece_indices
1692 .into_iter()
1693 .map(|piece_index| (piece_index, None)),
1694 ));
1695 };
1696
1697 Either::Right(farmer_cache.get_pieces(piece_indices).await)
1698 }
1699
1700 pub async fn has_pieces(&self, piece_indices: Vec<PieceIndex>) -> Vec<PieceIndex> {
1702 let Some(farmer_cache) = self.caches.choose(&mut rand::rng()) else {
1703 return Vec::new();
1704 };
1705
1706 farmer_cache.has_pieces(piece_indices).await
1707 }
1708
1709 pub async fn maybe_store_additional_piece(
1713 &self,
1714 piece_index: PieceIndex,
1715 piece: &Piece,
1716 ) -> bool {
1717 self.caches
1719 .iter()
1720 .map(|farmer_cache| farmer_cache.maybe_store_additional_piece(piece_index, piece))
1721 .collect::<FuturesUnordered<_>>()
1722 .fold::<bool, _, _>(false, |acc, stored| async move { acc || stored })
1723 .await
1724 }
1725}
1726
1727fn decode_piece_index_from_record_key(key: &RecordKey) -> PieceIndex {
1729 let len = key.as_ref().len();
1730 let s = len - PieceIndex::SIZE;
1731
1732 let mut piece_index_bytes = [0u8; PieceIndex::SIZE];
1733 piece_index_bytes.copy_from_slice(&key.as_ref()[s..]);
1734
1735 PieceIndex::from_bytes(piece_index_bytes)
1736}