1mod metrics;
7mod piece_cache_state;
8#[cfg(not(miri))]
10#[cfg(test)]
11mod tests;
12
13use crate::farm::{MaybePieceStoredResult, PieceCache, PieceCacheId, PieceCacheOffset, PlotCache};
14use crate::farmer_cache::metrics::FarmerCacheMetrics;
15use crate::farmer_cache::piece_cache_state::PieceCachesState;
16use crate::node_client::NodeClient;
17use crate::utils::run_future_in_dedicated_thread;
18use ab_core_primitives::pieces::{Piece, PieceIndex};
19use ab_core_primitives::segments::{SegmentHeader, SegmentIndex};
20use ab_data_retrieval::piece_getter::PieceGetter;
21use ab_networking::KeyWithDistance;
22use ab_networking::libp2p::PeerId;
23use ab_networking::libp2p::kad::RecordKey;
24use ab_networking::utils::multihash::ToMultihash;
25use async_lock::RwLock as AsyncRwLock;
26use bytesize::ByteSize;
27use event_listener_primitives::{Bag, HandlerId};
28use futures::channel::mpsc;
29use futures::future::{Either, FusedFuture};
30use futures::stream::{FuturesOrdered, FuturesUnordered};
31use futures::{FutureExt, SinkExt, Stream, StreamExt, select, stream};
32use parking_lot::{Mutex, RwLock};
33use prometheus_client::registry::Registry;
34use rand::prelude::*;
35use rayon::prelude::*;
36use std::collections::hash_map::Entry;
37use std::collections::{HashMap, HashSet};
38use std::future::join;
39use std::sync::Arc;
40use std::sync::atomic::{AtomicUsize, Ordering};
41use std::task::Poll;
42use std::time::Duration;
43use std::{fmt, mem};
44use tokio::sync::Semaphore;
45use tokio::task::yield_now;
46use tracing::{Instrument, debug, error, info, info_span, trace, warn};
47
48const WORKER_CHANNEL_CAPACITY: usize = 100;
49const SYNC_BATCH_SIZE: usize = 256;
50const SYNC_CONCURRENT_BATCHES: usize = 4;
51const INTERMEDIATE_CACHE_UPDATE_INTERVAL: usize = 100;
54const INITIAL_SYNC_FARM_INFO_CHECK_INTERVAL: Duration = Duration::from_secs(1);
55
56type HandlerFn<A> = Arc<dyn Fn(&A) + Send + Sync + 'static>;
57type Handler<A> = Bag<HandlerFn<A>, A>;
58type CacheIndex = u8;
59
60#[derive(Default, Debug)]
61struct Handlers {
62 progress: Handler<f32>,
63}
64
65#[derive(Debug, Clone, Copy)]
66struct FarmerCacheOffset {
67 cache_index: CacheIndex,
68 piece_offset: PieceCacheOffset,
69}
70
71impl FarmerCacheOffset {
72 fn new(cache_index: CacheIndex, piece_offset: PieceCacheOffset) -> Self {
73 Self {
74 cache_index,
75 piece_offset,
76 }
77 }
78}
79
80#[derive(Debug, Clone)]
81struct CacheBackend {
82 backend: Arc<dyn PieceCache>,
83 used_capacity: u32,
84 total_capacity: u32,
85}
86
87impl std::ops::Deref for CacheBackend {
88 type Target = Arc<dyn PieceCache>;
89
90 fn deref(&self) -> &Self::Target {
91 &self.backend
92 }
93}
94
95impl CacheBackend {
96 fn new(backend: Arc<dyn PieceCache>, total_capacity: u32) -> Self {
97 Self {
98 backend,
99 used_capacity: 0,
100 total_capacity,
101 }
102 }
103
104 fn next_free(&mut self) -> Option<PieceCacheOffset> {
105 let offset = self.used_capacity;
106 if offset < self.total_capacity {
107 self.used_capacity += 1;
108 Some(PieceCacheOffset(offset))
109 } else {
110 debug!(?offset, total_capacity = ?self.total_capacity, "No free space in cache backend");
111 None
112 }
113 }
114
115 fn free_size(&self) -> u32 {
116 self.total_capacity - self.used_capacity
117 }
118}
119
120#[derive(Debug)]
121struct CacheState {
122 cache_stored_pieces: HashMap<KeyWithDistance, FarmerCacheOffset>,
123 cache_free_offsets: Vec<FarmerCacheOffset>,
124 backend: CacheBackend,
125}
126
127#[derive(Debug)]
128enum WorkerCommand {
129 ReplaceBackingCaches {
130 new_piece_caches: Vec<Arc<dyn PieceCache>>,
131 },
132 ForgetKey {
133 key: RecordKey,
134 },
135}
136
137#[derive(Debug)]
139#[must_use = "Farmer cache will not work unless its worker is running"]
140pub struct FarmerCacheWorker<NC>
141where
142 NC: fmt::Debug,
143{
144 peer_id: PeerId,
145 node_client: NC,
146 piece_caches: Arc<AsyncRwLock<PieceCachesState>>,
147 plot_caches: Arc<PlotCaches>,
148 handlers: Arc<Handlers>,
149 worker_receiver: Option<mpsc::Receiver<WorkerCommand>>,
150 metrics: Option<Arc<FarmerCacheMetrics>>,
151}
152
153impl<NC> FarmerCacheWorker<NC>
154where
155 NC: NodeClient,
156{
157 pub async fn run<PG>(mut self, piece_getter: PG)
161 where
162 PG: PieceGetter,
163 {
164 let mut last_segment_index_internal = SegmentIndex::ZERO;
166
167 let mut worker_receiver = self
168 .worker_receiver
169 .take()
170 .expect("Always set during worker instantiation");
171
172 if let Some(WorkerCommand::ReplaceBackingCaches { new_piece_caches }) =
173 worker_receiver.next().await
174 {
175 self.initialize(
176 &piece_getter,
177 &mut last_segment_index_internal,
178 new_piece_caches,
179 )
180 .await;
181 } else {
182 return;
184 }
185
186 let mut segment_headers_notifications =
187 match self.node_client.subscribe_archived_segment_headers().await {
188 Ok(segment_headers_notifications) => segment_headers_notifications,
189 Err(error) => {
190 error!(%error, "Failed to subscribe to archived segments notifications");
191 return;
192 }
193 };
194
195 self.keep_up_after_initial_sync(&piece_getter, &mut last_segment_index_internal)
199 .await;
200
201 loop {
202 select! {
203 maybe_command = worker_receiver.next() => {
204 let Some(command) = maybe_command else {
205 return;
207 };
208
209 self.handle_command(command, &piece_getter, &mut last_segment_index_internal).await;
210 }
211 maybe_segment_header = segment_headers_notifications.next().fuse() => {
212 if let Some(segment_header) = maybe_segment_header {
213 self.process_segment_header(&piece_getter, segment_header, &mut last_segment_index_internal).await;
214 } else {
215 return;
218 }
219 }
220 }
221 }
222 }
223
224 async fn handle_command<PG>(
225 &self,
226 command: WorkerCommand,
227 piece_getter: &PG,
228 last_segment_index_internal: &mut SegmentIndex,
229 ) where
230 PG: PieceGetter,
231 {
232 match command {
233 WorkerCommand::ReplaceBackingCaches { new_piece_caches } => {
234 self.initialize(piece_getter, last_segment_index_internal, new_piece_caches)
235 .await;
236 }
237 WorkerCommand::ForgetKey { key } => {
239 let mut caches = self.piece_caches.write().await;
240 let key = KeyWithDistance::new_with_record_key(self.peer_id, key);
241 let Some(offset) = caches.remove_stored_piece(&key) else {
242 return;
244 };
245
246 let cache_index = offset.cache_index;
247 let piece_offset = offset.piece_offset;
248 let Some(backend) = caches.get_backend(cache_index).cloned() else {
249 return;
251 };
252
253 caches.push_dangling_free_offset(offset);
254 match backend.read_piece_index(piece_offset).await {
255 Ok(Some(piece_index)) => {
256 trace!(%piece_index, %cache_index, %piece_offset, "Forget piece");
257 }
258 Ok(None) => {
259 warn!(
260 %cache_index,
261 %piece_offset,
262 "Piece index out of range, this is likely an implementation bug, \
263 not freeing heap element"
264 );
265 }
266 Err(error) => {
267 error!(
268 %error,
269 %cache_index,
270 ?key,
271 %piece_offset,
272 "Error while reading piece from cache"
273 );
274 }
275 }
276 }
277 }
278 }
279
280 async fn initialize<PG>(
281 &self,
282 piece_getter: &PG,
283 last_segment_index_internal: &mut SegmentIndex,
284 new_piece_caches: Vec<Arc<dyn PieceCache>>,
285 ) where
286 PG: PieceGetter,
287 {
288 info!("Initializing piece cache");
289
290 let (mut stored_pieces, mut dangling_free_offsets) =
292 mem::take(&mut *self.piece_caches.write().await).reuse();
293
294 debug!("Collecting pieces that were in the cache before");
295
296 if let Some(metrics) = &self.metrics {
297 metrics.piece_cache_capacity_total.set(0);
298 metrics.piece_cache_capacity_used.set(0);
299 }
300
301 let peer_id = self.peer_id;
302
303 let piece_caches_number = new_piece_caches.len();
305 let maybe_caches_futures = new_piece_caches
306 .into_iter()
307 .enumerate()
308 .filter_map(|(cache_index, new_cache)| {
309 let total_capacity = new_cache.max_num_elements();
310 let mut backend = CacheBackend::new(new_cache, total_capacity);
311 let Ok(cache_index) = CacheIndex::try_from(cache_index) else {
312 warn!(
313 ?piece_caches_number,
314 "Too many piece caches provided, {cache_index} cache will be ignored",
315 );
316 return None;
317 };
318
319 if let Some(metrics) = &self.metrics {
320 metrics
321 .piece_cache_capacity_total
322 .inc_by(total_capacity as i64);
323 }
324
325 let init_fut = async move {
326 let used_capacity = &mut backend.used_capacity;
327
328 let mut maybe_contents = match backend.backend.contents().await {
332 Ok(contents) => Some(contents),
333 Err(error) => {
334 warn!(%error, "Failed to get cache contents");
335
336 None
337 }
338 };
339
340 #[allow(clippy::mutable_key_type)]
341 let mut cache_stored_pieces = HashMap::new();
342 let mut cache_free_offsets = Vec::new();
343
344 let Some(mut contents) = maybe_contents.take() else {
345 drop(maybe_contents);
346
347 return CacheState {
348 cache_stored_pieces,
349 cache_free_offsets,
350 backend,
351 };
352 };
353
354 while let Some(maybe_element_details) = contents.next().await {
355 let (piece_offset, maybe_piece_index) = match maybe_element_details {
356 Ok(element_details) => element_details,
357 Err(error) => {
358 warn!(%error, "Failed to get cache contents element details");
359 break;
360 }
361 };
362 let offset = FarmerCacheOffset::new(cache_index, piece_offset);
363 match maybe_piece_index {
364 Some(piece_index) => {
365 *used_capacity = piece_offset.0 + 1;
366 let record_key = RecordKey::from(piece_index.to_multihash());
367 let key = KeyWithDistance::new_with_record_key(peer_id, record_key);
368 cache_stored_pieces.insert(key, offset);
369 }
370 None => {
371 cache_free_offsets.push(offset);
374 }
375 }
376
377 yield_now().await;
379 }
380
381 drop(maybe_contents);
382 drop(contents);
383
384 CacheState {
385 cache_stored_pieces,
386 cache_free_offsets,
387 backend,
388 }
389 };
390
391 Some(run_future_in_dedicated_thread(
392 move || init_fut.instrument(info_span!("", %cache_index)),
393 format!("piece-cache.{cache_index}"),
394 ))
395 })
396 .collect::<Result<Vec<_>, _>>();
397
398 let caches_futures = match maybe_caches_futures {
399 Ok(caches_futures) => caches_futures,
400 Err(error) => {
401 error!(%error, "Failed to spawn piece cache reading thread");
402
403 return;
404 }
405 };
406
407 let mut backends = Vec::with_capacity(caches_futures.len());
408 let mut caches_futures = caches_futures.into_iter().collect::<FuturesOrdered<_>>();
409
410 while let Some(maybe_cache) = caches_futures.next().await {
411 match maybe_cache {
412 Ok(cache) => {
413 let backend = cache.backend;
414 for (key, cache_offset) in cache.cache_stored_pieces {
415 if let Some(old_cache_offset) = stored_pieces.insert(key, cache_offset) {
416 dangling_free_offsets.push_front(old_cache_offset);
417 }
418 }
419 dangling_free_offsets.extend(
420 cache.cache_free_offsets.into_iter().filter(|free_offset| {
421 free_offset.piece_offset.0 < backend.used_capacity
422 }),
423 );
424 backends.push(backend);
425 }
426 Err(_cancelled) => {
427 error!("Piece cache reading thread panicked");
428
429 return;
430 }
431 };
432 }
433
434 let mut caches = PieceCachesState::new(stored_pieces, dangling_free_offsets, backends);
435
436 info!("Synchronizing piece cache");
437
438 let last_segment_index = loop {
439 match self.node_client.farmer_app_info().await {
440 Ok(farmer_app_info) => {
441 let last_segment_index =
442 farmer_app_info.protocol_info.history_size.segment_index();
443 if !farmer_app_info.syncing || last_segment_index > SegmentIndex::ZERO {
451 break last_segment_index;
452 }
453 }
454 Err(error) => {
455 error!(
456 %error,
457 "Failed to get farmer app info from node, keeping old cache state without \
458 updates"
459 );
460
461 *self.piece_caches.write().await = caches;
463 return;
464 }
465 }
466
467 tokio::time::sleep(INITIAL_SYNC_FARM_INFO_CHECK_INTERVAL).await;
468 };
469
470 debug!(%last_segment_index, "Identified last segment index");
471
472 let segment_indices = Vec::from_iter(SegmentIndex::ZERO..=last_segment_index);
474 let mut piece_indices_to_store = segment_indices
477 .into_par_iter()
478 .flat_map(|segment_index| {
479 segment_index
480 .segment_piece_indexes()
481 .into_par_iter()
482 .map(|piece_index| {
483 (
484 KeyWithDistance::new(self.peer_id, piece_index.to_multihash()),
485 piece_index,
486 )
487 })
488 })
489 .collect::<Vec<_>>();
490
491 piece_indices_to_store.par_sort_unstable_by(|(a_key, _), (b_key, _)| a_key.cmp(b_key));
494
495 let mut piece_indices_to_store = piece_indices_to_store
497 .into_iter()
498 .take(caches.total_capacity())
499 .collect::<HashMap<_, _>>();
500
501 let mut piece_caches_capacity_used = vec![0u32; caches.backends().len()];
502 caches.free_unneeded_stored_pieces(&mut piece_indices_to_store);
506
507 if let Some(metrics) = &self.metrics {
508 for offset in caches.stored_pieces_offsets() {
509 piece_caches_capacity_used[usize::from(offset.cache_index)] += 1;
510 }
511
512 for cache_used in piece_caches_capacity_used {
513 metrics
514 .piece_cache_capacity_used
515 .inc_by(i64::from(cache_used));
516 }
517 }
518
519 self.piece_caches.write().await.clone_from(&caches);
521 let stored_count = caches.stored_pieces_offsets().len();
522
523 debug!(
524 %stored_count,
525 count = %piece_indices_to_store.len(),
526 "Identified piece indices that should be cached",
527 );
528
529 let pieces_to_download_total = piece_indices_to_store.len() + stored_count;
530 let piece_indices_to_store = piece_indices_to_store
531 .into_values()
532 .collect::<Vec<_>>()
533 .chunks(SYNC_BATCH_SIZE)
537 .map(|chunk| chunk.to_vec())
538 .collect::<Vec<_>>();
539
540 let downloaded_pieces_count = AtomicUsize::new(stored_count);
541 let caches = Mutex::new(caches);
542 self.handlers.progress.call_simple(&0.0);
543 let batch_count = piece_indices_to_store.len();
544 let piece_indices_to_store = piece_indices_to_store.into_iter().enumerate();
545
546 let downloading_semaphore = &Semaphore::new(SYNC_BATCH_SIZE * SYNC_CONCURRENT_BATCHES);
547 let ignored_cache_indices = &RwLock::new(HashSet::new());
548
549 let downloading_pieces_stream =
550 stream::iter(piece_indices_to_store.map(|(batch, piece_indices)| {
551 let downloaded_pieces_count = &downloaded_pieces_count;
552 let caches = &caches;
553 let num_pieces = piece_indices.len();
554
555 trace!(
556 %num_pieces,
557 %batch,
558 %batch_count,
559 first_piece_index = ?piece_indices.first().expect("chunks are never empty"),
560 last_piece_index = ?piece_indices.last().expect("chunks are never empty"),
561 downloaded_pieces_count = %downloaded_pieces_count.load(Ordering::Relaxed),
562 %pieces_to_download_total,
563 available_permits = %downloading_semaphore.available_permits(),
564 "Started piece cache sync batch",
565 );
566
567 async move {
568 let mut permit = downloading_semaphore
569 .acquire_many(SYNC_BATCH_SIZE as u32)
570 .await
571 .expect("Semaphore is never closed; qed");
572 debug!(%batch, %num_pieces, "Downloading pieces");
573
574 let pieces_stream = match piece_getter.get_pieces(piece_indices).await {
575 Ok(pieces_stream) => pieces_stream,
576 Err(error) => {
577 error!(
578 %error,
579 "Failed to get pieces from piece getter"
580 );
581 return;
582 }
583 };
584 let mut pieces_stream = pieces_stream.enumerate();
585
586 while let Some((index, (piece_index, result))) = pieces_stream.next().await {
587 debug!(%batch, %index, %piece_index, "Downloaded piece");
588 let _permit = permit.split(1);
590
591 let piece = match result {
592 Ok(Some(piece)) => {
593 trace!(%batch, %piece_index, "Downloaded piece successfully");
594 piece
595 }
596 Ok(None) => {
597 debug!(%batch, %piece_index, "Couldn't find piece");
598 continue;
599 }
600 Err(error) => {
601 debug!(
602 %batch,
603 %error,
604 %piece_index,
605 "Failed to get piece for piece cache"
606 );
607 continue;
608 }
609 };
610
611 let (offset, maybe_backend) = {
612 let mut caches = caches.lock();
613
614 let Some(offset) = caches.pop_free_offset() else {
616 error!(
617 %batch,
618 %piece_index,
619 "Failed to store piece in cache, there was no space"
620 );
621 break;
622 };
623
624 (offset, caches.get_backend(offset.cache_index).cloned())
625 };
626
627 let cache_index = offset.cache_index;
628 let piece_offset = offset.piece_offset;
629
630 let skip_write = ignored_cache_indices.read().contains(&cache_index);
631 if skip_write {
632 trace!(
633 %batch,
634 %cache_index,
635 %piece_index,
636 %piece_offset,
637 "Skipping known problematic cache index"
638 );
639 } else {
640 if let Some(backend) = maybe_backend
641 && let Err(error) =
642 backend.write_piece(piece_offset, piece_index, &piece).await
643 {
644 error!(
645 %error,
646 %batch,
647 %cache_index,
648 %piece_index,
649 %piece_offset,
650 "Failed to write piece into cache, ignoring this cache going \
651 forward"
652 );
653 ignored_cache_indices.write().insert(cache_index);
654 continue;
655 }
656
657 let key =
658 KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
659 caches.lock().push_stored_piece(key, offset);
660 }
661
662 let prev_downloaded_pieces_count =
663 downloaded_pieces_count.fetch_add(1, Ordering::Relaxed);
664 if prev_downloaded_pieces_count != pieces_to_download_total {
667 let progress = prev_downloaded_pieces_count as f32
668 / pieces_to_download_total as f32
669 * 100.0;
670 if prev_downloaded_pieces_count
671 .is_multiple_of(INTERMEDIATE_CACHE_UPDATE_INTERVAL)
672 {
673 let mut piece_caches = self.piece_caches.write().await;
674 piece_caches.clone_from(&caches.lock());
675
676 info!(
677 "Piece cache sync {progress:.2}% complete ({} / {})",
678 ByteSize::b(
679 (prev_downloaded_pieces_count * Piece::SIZE) as u64,
680 )
681 .display()
682 .iec(),
683 ByteSize::b((pieces_to_download_total * Piece::SIZE) as u64,)
684 .display()
685 .iec(),
686 );
687 }
688
689 self.handlers.progress.call_simple(&progress);
690 }
691 }
692
693 trace!(
694 %num_pieces,
695 %batch,
696 %batch_count,
697 downloaded_pieces_count = %downloaded_pieces_count.load(Ordering::Relaxed),
698 %pieces_to_download_total,
699 available_permits = %downloading_semaphore.available_permits(),
700 "Finished piece cache sync batch",
701 );
702 }
703 }));
704
705 downloading_pieces_stream
708 .buffer_unordered(SYNC_CONCURRENT_BATCHES * 10)
711 .for_each(|()| async {})
713 .await;
714
715 *self.piece_caches.write().await = caches.into_inner();
716 self.handlers.progress.call_simple(&100.0);
717 *last_segment_index_internal = last_segment_index;
718
719 info!("Finished piece cache synchronization");
720 }
721
722 async fn process_segment_header<PG>(
723 &self,
724 piece_getter: &PG,
725 segment_header: SegmentHeader,
726 last_segment_index_internal: &mut SegmentIndex,
727 ) where
728 PG: PieceGetter,
729 {
730 let segment_index = segment_header.segment_index();
731 debug!(%segment_index, "Starting to process newly archived segment");
732
733 if *last_segment_index_internal < segment_index {
734 debug!(%segment_index, "Downloading potentially useful pieces");
735
736 let pieces_to_maybe_include = segment_index
740 .segment_piece_indexes()
741 .into_iter()
742 .map(|piece_index| async move {
743 let should_store_in_piece_cache = self
744 .piece_caches
745 .read()
746 .await
747 .should_include_key(self.peer_id, piece_index);
748
749 let key = RecordKey::from(piece_index.to_multihash());
750 let should_store_in_plot_cache =
751 self.plot_caches.should_store(piece_index, &key).await;
752
753 if !(should_store_in_piece_cache || should_store_in_plot_cache) {
754 trace!(%piece_index, "Piece doesn't need to be cached #1");
755
756 return None;
757 }
758
759 let maybe_piece_result =
760 self.node_client
761 .piece(piece_index)
762 .await
763 .inspect_err(|error| {
764 debug!(
765 %error,
766 %segment_index,
767 %piece_index,
768 "Failed to retrieve piece from node right after archiving"
769 );
770 });
771
772 if let Ok(Some(piece)) = maybe_piece_result {
773 return Some((piece_index, piece));
774 }
775
776 match piece_getter.get_piece(piece_index).await {
777 Ok(Some(piece)) => Some((piece_index, piece)),
778 Ok(None) => {
779 warn!(
780 %segment_index,
781 %piece_index,
782 "Failed to retrieve piece right after archiving"
783 );
784
785 None
786 }
787 Err(error) => {
788 warn!(
789 %error,
790 %segment_index,
791 %piece_index,
792 "Failed to retrieve piece right after archiving"
793 );
794
795 None
796 }
797 }
798 })
799 .collect::<FuturesUnordered<_>>()
800 .filter_map(|maybe_piece| async move { maybe_piece })
801 .collect::<Vec<_>>()
802 .await;
803
804 debug!(%segment_index, "Downloaded potentially useful pieces");
805
806 self.acknowledge_archived_segment_processing(segment_index)
807 .await;
808
809 for (piece_index, piece) in pieces_to_maybe_include {
812 if !self
813 .plot_caches
814 .store_additional_piece(piece_index, &piece)
815 .await
816 {
817 trace!(%piece_index, "Piece could not be cached in plot cache");
818 }
819
820 if !self
821 .piece_caches
822 .read()
823 .await
824 .should_include_key(self.peer_id, piece_index)
825 {
826 trace!(%piece_index, "Piece doesn't need to be cached #2");
827
828 continue;
829 }
830
831 trace!(%piece_index, "Piece needs to be cached #1");
832
833 self.persist_piece_in_cache(piece_index, piece).await;
834 }
835
836 *last_segment_index_internal = segment_index;
837 } else {
838 self.acknowledge_archived_segment_processing(segment_index)
839 .await;
840 }
841
842 debug!(%segment_index, "Finished processing newly archived segment");
843 }
844
845 async fn acknowledge_archived_segment_processing(&self, segment_index: SegmentIndex) {
846 match self
847 .node_client
848 .acknowledge_archived_segment_header(segment_index)
849 .await
850 {
851 Ok(()) => {
852 debug!(%segment_index, "Acknowledged archived segment");
853 }
854 Err(error) => {
855 error!(%segment_index, ?error, "Failed to acknowledge archived segment");
856 }
857 };
858 }
859
860 async fn keep_up_after_initial_sync<PG>(
861 &self,
862 piece_getter: &PG,
863 last_segment_index_internal: &mut SegmentIndex,
864 ) where
865 PG: PieceGetter,
866 {
867 let last_segment_index = match self.node_client.farmer_app_info().await {
868 Ok(farmer_app_info) => farmer_app_info.protocol_info.history_size.segment_index(),
869 Err(error) => {
870 error!(
871 %error,
872 "Failed to get farmer app info from node, keeping old cache state without \
873 updates"
874 );
875 return;
876 }
877 };
878
879 if last_segment_index <= *last_segment_index_internal {
880 return;
881 }
882
883 info!(
884 "Syncing piece cache to the latest history size, this may pause block production if \
885 takes too long"
886 );
887
888 let piece_indices = (*last_segment_index_internal..=last_segment_index)
890 .flat_map(|segment_index| segment_index.segment_piece_indexes());
891
892 for piece_index in piece_indices {
894 if !self
895 .piece_caches
896 .read()
897 .await
898 .should_include_key(self.peer_id, piece_index)
899 {
900 trace!(%piece_index, "Piece doesn't need to be cached #3");
901
902 continue;
903 }
904
905 trace!(%piece_index, "Piece needs to be cached #2");
906
907 let result = piece_getter.get_piece(piece_index).await;
908
909 let piece = match result {
910 Ok(Some(piece)) => piece,
911 Ok(None) => {
912 debug!(%piece_index, "Couldn't find piece");
913 continue;
914 }
915 Err(error) => {
916 debug!(
917 %error,
918 %piece_index,
919 "Failed to get piece for piece cache"
920 );
921 continue;
922 }
923 };
924
925 self.persist_piece_in_cache(piece_index, piece).await;
926 }
927
928 info!("Finished syncing piece cache to the latest history size");
929
930 *last_segment_index_internal = last_segment_index;
931 }
932
933 async fn persist_piece_in_cache(&self, piece_index: PieceIndex, piece: Piece) {
936 let key = KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
937 let mut caches = self.piece_caches.write().await;
938 match caches.should_replace(&key) {
939 Some((old_key, offset)) => {
941 let cache_index = offset.cache_index;
942 let piece_offset = offset.piece_offset;
943 let Some(backend) = caches.get_backend(cache_index) else {
944 warn!(
946 %cache_index,
947 %piece_index,
948 "Should have a cached backend, but it didn't exist, this is an \
949 implementation bug"
950 );
951 return;
952 };
953 if let Err(error) = backend.write_piece(piece_offset, piece_index, &piece).await {
954 error!(
955 %error,
956 %cache_index,
957 %piece_index,
958 %piece_offset,
959 "Failed to write piece into cache"
960 );
961 } else {
962 let old_piece_index = decode_piece_index_from_record_key(old_key.record_key());
963 trace!(
964 %cache_index,
965 %old_piece_index,
966 %piece_index,
967 %piece_offset,
968 "Successfully replaced old cached piece"
969 );
970 caches.push_stored_piece(key, offset);
971 }
972 }
973 None => {
975 let Some(offset) = caches.pop_free_offset() else {
976 warn!(
977 %piece_index,
978 "Should have inserted piece into cache, but it didn't happen, this is an \
979 implementation bug"
980 );
981 return;
982 };
983 let cache_index = offset.cache_index;
984 let piece_offset = offset.piece_offset;
985 let Some(backend) = caches.get_backend(cache_index) else {
986 warn!(
988 %cache_index,
989 %piece_index,
990 "Should have a cached backend, but it didn't exist, this is an \
991 implementation bug"
992 );
993 return;
994 };
995
996 if let Err(error) = backend.write_piece(piece_offset, piece_index, &piece).await {
997 error!(
998 %error,
999 %cache_index,
1000 %piece_index,
1001 %piece_offset,
1002 "Failed to write piece into cache"
1003 );
1004 } else {
1005 trace!(
1006 %cache_index,
1007 %piece_index,
1008 %piece_offset,
1009 "Successfully stored piece in cache"
1010 );
1011 if let Some(metrics) = &self.metrics {
1012 metrics.piece_cache_capacity_used.inc();
1013 }
1014 caches.push_stored_piece(key, offset);
1015 }
1016 }
1017 };
1018 }
1019}
1020
1021#[derive(Debug)]
1022struct PlotCaches {
1023 caches: AsyncRwLock<Vec<Arc<dyn PlotCache>>>,
1025 next_plot_cache: AtomicUsize,
1027}
1028
1029impl PlotCaches {
1030 async fn should_store(&self, piece_index: PieceIndex, key: &RecordKey) -> bool {
1036 for (cache_index, cache) in self.caches.read().await.iter().enumerate() {
1037 match cache.is_piece_maybe_stored(key).await {
1038 Ok(MaybePieceStoredResult::No) => {
1039 }
1041 Ok(MaybePieceStoredResult::Vacant) => {
1042 return true;
1043 }
1044 Ok(MaybePieceStoredResult::Yes) => {
1045 return false;
1047 }
1048 Err(error) => {
1049 warn!(
1050 %cache_index,
1051 %piece_index,
1052 %error,
1053 "Failed to check piece stored in cache"
1054 );
1055 }
1056 }
1057 }
1058
1059 false
1060 }
1061
1062 async fn store_additional_piece(&self, piece_index: PieceIndex, piece: &Piece) -> bool {
1066 let plot_caches = self.caches.read().await;
1067 let plot_caches_len = plot_caches.len();
1068
1069 for _ in 0..plot_caches_len {
1071 let plot_cache_index =
1072 self.next_plot_cache.fetch_add(1, Ordering::Relaxed) % plot_caches_len;
1073
1074 match plot_caches[plot_cache_index]
1075 .try_store_piece(piece_index, piece)
1076 .await
1077 {
1078 Ok(true) => {
1079 return true;
1080 }
1081 Ok(false) => {
1082 continue;
1083 }
1084 Err(error) => {
1085 error!(
1086 %error,
1087 %piece_index,
1088 %plot_cache_index,
1089 "Failed to store additional piece in cache"
1090 );
1091 continue;
1092 }
1093 }
1094 }
1095
1096 false
1097 }
1098}
1099
1100#[derive(Debug, Clone)]
1111pub struct FarmerCache {
1112 peer_id: PeerId,
1113 piece_caches: Arc<AsyncRwLock<PieceCachesState>>,
1115 plot_caches: Arc<PlotCaches>,
1117 handlers: Arc<Handlers>,
1118 worker_sender: mpsc::Sender<WorkerCommand>,
1120 metrics: Option<Arc<FarmerCacheMetrics>>,
1121}
1122
1123impl FarmerCache {
1124 pub fn new<NC>(
1129 node_client: NC,
1130 peer_id: PeerId,
1131 registry: Option<&mut Registry>,
1132 ) -> (Self, FarmerCacheWorker<NC>)
1133 where
1134 NC: NodeClient,
1135 {
1136 let caches = Arc::default();
1137 let (worker_sender, worker_receiver) = mpsc::channel(WORKER_CHANNEL_CAPACITY);
1138 let handlers = Arc::new(Handlers::default());
1139
1140 let plot_caches = Arc::new(PlotCaches {
1141 caches: AsyncRwLock::default(),
1142 next_plot_cache: AtomicUsize::new(0),
1143 });
1144 let metrics = registry.map(|registry| Arc::new(FarmerCacheMetrics::new(registry)));
1145
1146 let instance = Self {
1147 peer_id,
1148 piece_caches: Arc::clone(&caches),
1149 plot_caches: Arc::clone(&plot_caches),
1150 handlers: Arc::clone(&handlers),
1151 worker_sender,
1152 metrics: metrics.clone(),
1153 };
1154 let worker = FarmerCacheWorker {
1155 peer_id,
1156 node_client,
1157 piece_caches: caches,
1158 plot_caches,
1159 handlers,
1160 worker_receiver: Some(worker_receiver),
1161 metrics,
1162 };
1163
1164 (instance, worker)
1165 }
1166
1167 pub async fn get_piece<Key>(&self, key: Key) -> Option<Piece>
1169 where
1170 RecordKey: From<Key>,
1171 {
1172 let key = RecordKey::from(key);
1173 let maybe_piece_found = {
1174 let key = KeyWithDistance::new_with_record_key(self.peer_id, key.clone());
1175 let caches = self.piece_caches.read().await;
1176
1177 caches.get_stored_piece(&key).and_then(|offset| {
1178 let cache_index = offset.cache_index;
1179 let piece_offset = offset.piece_offset;
1180 Some((
1181 piece_offset,
1182 cache_index,
1183 caches.get_backend(cache_index)?.clone(),
1184 ))
1185 })
1186 };
1187
1188 if let Some((piece_offset, cache_index, backend)) = maybe_piece_found {
1189 match backend.read_piece(piece_offset).await {
1190 Ok(maybe_piece) => {
1191 return match maybe_piece {
1192 Some((_piece_index, piece)) => {
1193 if let Some(metrics) = &self.metrics {
1194 metrics.cache_get_hit.inc();
1195 }
1196 Some(piece)
1197 }
1198 None => {
1199 error!(
1200 %cache_index,
1201 %piece_offset,
1202 ?key,
1203 "Piece was expected to be in cache, but wasn't found there"
1204 );
1205 if let Some(metrics) = &self.metrics {
1206 metrics.cache_get_error.inc();
1207 }
1208 None
1209 }
1210 };
1211 }
1212 Err(error) => {
1213 error!(
1214 %error,
1215 %cache_index,
1216 ?key,
1217 %piece_offset,
1218 "Error while reading piece from cache"
1219 );
1220
1221 if let Err(error) = self
1222 .worker_sender
1223 .clone()
1224 .send(WorkerCommand::ForgetKey { key })
1225 .await
1226 {
1227 trace!(%error, "Failed to send ForgetKey command to worker");
1228 }
1229
1230 if let Some(metrics) = &self.metrics {
1231 metrics.cache_get_error.inc();
1232 }
1233 return None;
1234 }
1235 }
1236 }
1237
1238 for cache in self.plot_caches.caches.read().await.iter() {
1239 if let Ok(Some(piece)) = cache.read_piece(&key).await {
1240 if let Some(metrics) = &self.metrics {
1241 metrics.cache_get_hit.inc();
1242 }
1243 return Some(piece);
1244 }
1245 }
1246
1247 if let Some(metrics) = &self.metrics {
1248 metrics.cache_get_miss.inc();
1249 }
1250 None
1251 }
1252
1253 pub async fn get_pieces<'a, PieceIndices>(
1257 &'a self,
1258 piece_indices: PieceIndices,
1259 ) -> impl Stream<Item = (PieceIndex, Option<Piece>)> + Send + Unpin + 'a
1260 where
1261 PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send + 'a> + Send + 'a,
1262 {
1263 let mut pieces_to_get_from_plot_cache = Vec::new();
1264
1265 let pieces_to_read_from_piece_cache = {
1266 let caches = self.piece_caches.read().await;
1267 let mut pieces_to_read_from_piece_cache =
1269 HashMap::<CacheIndex, (CacheBackend, HashMap<_, _>)>::new();
1270
1271 for piece_index in piece_indices {
1272 let key = RecordKey::from(piece_index.to_multihash());
1273
1274 let offset = match caches.get_stored_piece(&KeyWithDistance::new_with_record_key(
1275 self.peer_id,
1276 key.clone(),
1277 )) {
1278 Some(offset) => offset,
1279 None => {
1280 pieces_to_get_from_plot_cache.push((piece_index, key));
1281 continue;
1282 }
1283 };
1284
1285 let cache_index = offset.cache_index;
1286 let piece_offset = offset.piece_offset;
1287
1288 match pieces_to_read_from_piece_cache.entry(cache_index) {
1289 Entry::Occupied(mut entry) => {
1290 let (_backend, pieces) = entry.get_mut();
1291 pieces.insert(piece_offset, (piece_index, key));
1292 }
1293 Entry::Vacant(entry) => {
1294 let backend = match caches.get_backend(cache_index) {
1295 Some(backend) => backend.clone(),
1296 None => {
1297 pieces_to_get_from_plot_cache.push((piece_index, key));
1298 continue;
1299 }
1300 };
1301 entry
1302 .insert((backend, HashMap::from([(piece_offset, (piece_index, key))])));
1303 }
1304 }
1305 }
1306
1307 pieces_to_read_from_piece_cache
1308 };
1309
1310 let (tx, mut rx) = mpsc::unbounded();
1311
1312 let fut = async move {
1313 let tx = &tx;
1314
1315 let mut reading_from_piece_cache = pieces_to_read_from_piece_cache
1316 .into_iter()
1317 .map(|(cache_index, (backend, mut pieces_to_get))| async move {
1318 let mut pieces_stream = match backend
1319 .read_pieces(Box::new(
1320 pieces_to_get
1321 .keys()
1322 .copied()
1323 .collect::<Vec<_>>()
1324 .into_iter(),
1325 ))
1326 .await
1327 {
1328 Ok(pieces_stream) => pieces_stream,
1329 Err(error) => {
1330 error!(
1331 %error,
1332 %cache_index,
1333 "Error while reading pieces from cache"
1334 );
1335
1336 if let Some(metrics) = &self.metrics {
1337 metrics.cache_get_error.inc_by(pieces_to_get.len() as u64);
1338 }
1339 for (piece_index, _key) in pieces_to_get.into_values() {
1340 tx.unbounded_send((piece_index, None)).expect(
1341 "This future isn't polled after receiver is dropped; qed",
1342 );
1343 }
1344 return;
1345 }
1346 };
1347
1348 while let Some(maybe_piece) = pieces_stream.next().await {
1349 let result = match maybe_piece {
1350 Ok((piece_offset, Some((piece_index, piece)))) => {
1351 pieces_to_get.remove(&piece_offset);
1352
1353 if let Some(metrics) = &self.metrics {
1354 metrics.cache_get_hit.inc();
1355 }
1356 (piece_index, Some(piece))
1357 }
1358 Ok((piece_offset, None)) => {
1359 let Some((piece_index, key)) = pieces_to_get.remove(&piece_offset)
1360 else {
1361 debug!(
1362 %cache_index,
1363 %piece_offset,
1364 "Received piece offset that was not expected"
1365 );
1366 continue;
1367 };
1368
1369 error!(
1370 %cache_index,
1371 %piece_index,
1372 %piece_offset,
1373 ?key,
1374 "Piece was expected to be in cache, but wasn't found there"
1375 );
1376 if let Some(metrics) = &self.metrics {
1377 metrics.cache_get_error.inc();
1378 }
1379 (piece_index, None)
1380 }
1381 Err(error) => {
1382 error!(
1383 %error,
1384 %cache_index,
1385 "Error while reading piece from cache"
1386 );
1387
1388 if let Some(metrics) = &self.metrics {
1389 metrics.cache_get_error.inc();
1390 }
1391 continue;
1392 }
1393 };
1394
1395 tx.unbounded_send(result)
1396 .expect("This future isn't polled after receiver is dropped; qed");
1397 }
1398
1399 if pieces_to_get.is_empty() {
1400 return;
1401 }
1402
1403 if let Some(metrics) = &self.metrics {
1404 metrics.cache_get_error.inc_by(pieces_to_get.len() as u64);
1405 }
1406 for (piece_offset, (piece_index, key)) in pieces_to_get {
1407 error!(
1408 %cache_index,
1409 %piece_index,
1410 %piece_offset,
1411 ?key,
1412 "Piece cache didn't return an entry for offset"
1413 );
1414
1415 tx.unbounded_send((piece_index, None))
1418 .expect("This future isn't polled after receiver is dropped; qed");
1419 }
1420 })
1421 .collect::<FuturesUnordered<_>>();
1422 let reading_from_piece_cache_fut = async move {
1428 while let Some(()) = reading_from_piece_cache.next().await {
1429 }
1431 };
1432
1433 let reading_from_plot_cache_fut = async {
1434 if pieces_to_get_from_plot_cache.is_empty() {
1435 return;
1436 }
1437
1438 for cache in self.plot_caches.caches.read().await.iter() {
1439 for offset in (0..pieces_to_get_from_plot_cache.len()).rev() {
1442 let (piece_index, key) = &pieces_to_get_from_plot_cache[offset];
1443
1444 if let Ok(Some(piece)) = cache.read_piece(key).await {
1445 if let Some(metrics) = &self.metrics {
1446 metrics.cache_get_hit.inc();
1447 }
1448 tx.unbounded_send((*piece_index, Some(piece)))
1449 .expect("This future isn't polled after receiver is dropped; qed");
1450
1451 pieces_to_get_from_plot_cache.swap_remove(offset);
1454 }
1455 }
1456
1457 if pieces_to_get_from_plot_cache.is_empty() {
1458 return;
1459 }
1460 }
1461
1462 if let Some(metrics) = &self.metrics {
1463 metrics
1464 .cache_get_miss
1465 .inc_by(pieces_to_get_from_plot_cache.len() as u64);
1466 }
1467
1468 for (piece_index, _key) in pieces_to_get_from_plot_cache {
1469 tx.unbounded_send((piece_index, None))
1470 .expect("This future isn't polled after receiver is dropped; qed");
1471 }
1472 };
1473
1474 join!(reading_from_piece_cache_fut, reading_from_plot_cache_fut).await
1475 };
1476 let mut fut = Box::pin(fut.fuse());
1477
1478 stream::poll_fn(move |cx| {
1480 if !fut.is_terminated() {
1481 let _ = fut.poll_unpin(cx);
1483 }
1484
1485 if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) {
1486 return Poll::Ready(maybe_result);
1487 }
1488
1489 Poll::Pending
1491 })
1492 }
1493
1494 pub async fn has_pieces(&self, mut piece_indices: Vec<PieceIndex>) -> Vec<PieceIndex> {
1496 let mut pieces_to_find = HashMap::<PieceIndex, RecordKey>::from_iter(
1497 piece_indices
1498 .iter()
1499 .map(|piece_index| (*piece_index, RecordKey::from(piece_index.to_multihash()))),
1500 );
1501
1502 {
1504 let piece_caches = self.piece_caches.read().await;
1505 pieces_to_find.retain(|_piece_index, key| {
1506 let distance_key = KeyWithDistance::new(self.peer_id, key.clone());
1507 !piece_caches.contains_stored_piece(&distance_key)
1508 });
1509 }
1510
1511 if pieces_to_find.is_empty() {
1513 return piece_indices;
1514 }
1515
1516 if let Some(plot_caches) = self.plot_caches.caches.try_read() {
1518 let plot_caches = &plot_caches;
1519 let not_found = pieces_to_find
1520 .into_iter()
1521 .map(|(piece_index, key)| async move {
1522 let key = &key;
1523
1524 let found = plot_caches
1525 .iter()
1526 .map(|plot_cache| async {
1527 matches!(
1528 plot_cache.is_piece_maybe_stored(key).await,
1529 Ok(MaybePieceStoredResult::Yes)
1530 )
1531 })
1532 .collect::<FuturesUnordered<_>>()
1533 .any(|found| async move { found })
1534 .await;
1535
1536 if found { None } else { Some(piece_index) }
1537 })
1538 .collect::<FuturesUnordered<_>>()
1539 .filter_map(|maybe_piece_index| async move { maybe_piece_index })
1540 .collect::<HashSet<_>>()
1541 .await;
1542 piece_indices.retain(|piece_index| !not_found.contains(piece_index));
1543 }
1544 piece_indices
1545 }
1546
1547 pub async fn find_piece(
1549 &self,
1550 piece_index: PieceIndex,
1551 ) -> Option<(PieceCacheId, PieceCacheOffset)> {
1552 let caches = self.piece_caches.read().await;
1553
1554 self.find_piece_internal(&caches, piece_index)
1555 }
1556
1557 pub async fn find_pieces<PieceIndices>(
1559 &self,
1560 piece_indices: PieceIndices,
1561 ) -> Vec<(PieceIndex, PieceCacheId, PieceCacheOffset)>
1562 where
1563 PieceIndices: IntoIterator<Item = PieceIndex>,
1564 {
1565 let caches = self.piece_caches.read().await;
1566
1567 piece_indices
1568 .into_iter()
1569 .filter_map(|piece_index| {
1570 self.find_piece_internal(&caches, piece_index)
1571 .map(|(cache_id, piece_offset)| (piece_index, cache_id, piece_offset))
1572 })
1573 .collect()
1574 }
1575
1576 fn find_piece_internal(
1577 &self,
1578 caches: &PieceCachesState,
1579 piece_index: PieceIndex,
1580 ) -> Option<(PieceCacheId, PieceCacheOffset)> {
1581 let key = KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
1582
1583 let Some(offset) = caches.get_stored_piece(&key) else {
1584 if let Some(metrics) = &self.metrics {
1585 metrics.cache_find_miss.inc();
1586 }
1587
1588 return None;
1589 };
1590 let piece_offset = offset.piece_offset;
1591
1592 if let Some(backend) = caches.get_backend(offset.cache_index) {
1593 if let Some(metrics) = &self.metrics {
1594 metrics.cache_find_hit.inc();
1595 }
1596 return Some((*backend.id(), piece_offset));
1597 }
1598
1599 if let Some(metrics) = &self.metrics {
1600 metrics.cache_find_miss.inc();
1601 }
1602 None
1603 }
1604
1605 pub async fn maybe_store_additional_piece(
1609 &self,
1610 piece_index: PieceIndex,
1611 piece: &Piece,
1612 ) -> bool {
1613 let key = RecordKey::from(piece_index.to_multihash());
1614
1615 let should_store = self.plot_caches.should_store(piece_index, &key).await;
1616
1617 if !should_store {
1618 return false;
1619 }
1620
1621 self.plot_caches
1622 .store_additional_piece(piece_index, piece)
1623 .await
1624 }
1625
1626 pub async fn replace_backing_caches(
1628 &self,
1629 new_piece_caches: Vec<Arc<dyn PieceCache>>,
1630 new_plot_caches: Vec<Arc<dyn PlotCache>>,
1631 ) {
1632 if let Err(error) = self
1633 .worker_sender
1634 .clone()
1635 .send(WorkerCommand::ReplaceBackingCaches { new_piece_caches })
1636 .await
1637 {
1638 warn!(%error, "Failed to replace backing caches, worker exited");
1639 }
1640
1641 *self.plot_caches.caches.write().await = new_plot_caches;
1642 }
1643
1644 pub fn on_sync_progress(&self, callback: HandlerFn<f32>) -> HandlerId {
1646 self.handlers.progress.add(callback)
1647 }
1648}
1649
1650#[derive(Debug, Clone)]
1652pub struct FarmerCaches {
1653 caches: Arc<[FarmerCache]>,
1654}
1655
1656impl From<Arc<[FarmerCache]>> for FarmerCaches {
1657 fn from(caches: Arc<[FarmerCache]>) -> Self {
1658 Self { caches }
1659 }
1660}
1661
1662impl From<FarmerCache> for FarmerCaches {
1663 fn from(cache: FarmerCache) -> Self {
1664 Self {
1665 caches: Arc::new([cache]),
1666 }
1667 }
1668}
1669
1670impl FarmerCaches {
1671 pub async fn get_piece<Key>(&self, key: Key) -> Option<Piece>
1673 where
1674 RecordKey: From<Key>,
1675 {
1676 let farmer_cache = self.caches.choose(&mut rand::rng())?;
1677 farmer_cache.get_piece(key).await
1678 }
1679
1680 pub async fn get_pieces<'a, PieceIndices>(
1684 &'a self,
1685 piece_indices: PieceIndices,
1686 ) -> impl Stream<Item = (PieceIndex, Option<Piece>)> + Send + Unpin + 'a
1687 where
1688 PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send + 'a> + Send + 'a,
1689 {
1690 let Some(farmer_cache) = self.caches.choose(&mut rand::rng()) else {
1691 return Either::Left(stream::iter(
1692 piece_indices
1693 .into_iter()
1694 .map(|piece_index| (piece_index, None)),
1695 ));
1696 };
1697
1698 Either::Right(farmer_cache.get_pieces(piece_indices).await)
1699 }
1700
1701 pub async fn has_pieces(&self, piece_indices: Vec<PieceIndex>) -> Vec<PieceIndex> {
1703 let Some(farmer_cache) = self.caches.choose(&mut rand::rng()) else {
1704 return Vec::new();
1705 };
1706
1707 farmer_cache.has_pieces(piece_indices).await
1708 }
1709
1710 pub async fn maybe_store_additional_piece(
1714 &self,
1715 piece_index: PieceIndex,
1716 piece: &Piece,
1717 ) -> bool {
1718 self.caches
1720 .iter()
1721 .map(|farmer_cache| farmer_cache.maybe_store_additional_piece(piece_index, piece))
1722 .collect::<FuturesUnordered<_>>()
1723 .fold::<bool, _, _>(false, |acc, stored| async move { acc || stored })
1724 .await
1725 }
1726}
1727
1728fn decode_piece_index_from_record_key(key: &RecordKey) -> PieceIndex {
1730 let len = key.as_ref().len();
1731 let s = len - PieceIndex::SIZE;
1732
1733 let mut piece_index_bytes = [0u8; PieceIndex::SIZE];
1734 piece_index_bytes.copy_from_slice(&key.as_ref()[s..]);
1735
1736 PieceIndex::from_bytes(piece_index_bytes)
1737}