1mod metrics;
7mod piece_cache_state;
8#[cfg(not(miri))]
10#[cfg(test)]
11mod tests;
12
13use crate::farm::{MaybePieceStoredResult, PieceCache, PieceCacheId, PieceCacheOffset, PlotCache};
14use crate::farmer_cache::metrics::FarmerCacheMetrics;
15use crate::farmer_cache::piece_cache_state::PieceCachesState;
16use crate::node_client::NodeClient;
17use crate::utils::run_future_in_dedicated_thread;
18use ab_core_primitives::pieces::{Piece, PieceIndex};
19use ab_core_primitives::segments::SegmentIndex;
20use ab_data_retrieval::piece_getter::PieceGetter;
21use ab_networking::KeyWithDistance;
22use ab_networking::libp2p::PeerId;
23use ab_networking::libp2p::kad::RecordKey;
24use ab_networking::utils::multihash::ToMultihash;
25use async_lock::RwLock as AsyncRwLock;
26use bytesize::ByteSize;
27use event_listener_primitives::{Bag, HandlerId};
28use futures::channel::mpsc;
29use futures::future::{Either, FusedFuture};
30use futures::stream::{FuturesOrdered, FuturesUnordered};
31use futures::{FutureExt, SinkExt, Stream, StreamExt, select, stream};
32use parking_lot::{Mutex, RwLock};
33use prometheus_client::registry::Registry;
34use rand::prelude::*;
35use rayon::prelude::*;
36use std::collections::hash_map::Entry;
37use std::collections::{HashMap, HashSet};
38use std::future::join;
39use std::sync::Arc;
40use std::sync::atomic::{AtomicUsize, Ordering};
41use std::task::Poll;
42use std::time::Duration;
43use std::{fmt, mem};
44use tokio::sync::Semaphore;
45use tokio::task::yield_now;
46use tracing::{Instrument, debug, error, info, info_span, trace, warn};
47
48const WORKER_CHANNEL_CAPACITY: usize = 100;
49const SYNC_BATCH_SIZE: usize = 256;
50const SYNC_CONCURRENT_BATCHES: usize = 4;
51const INTERMEDIATE_CACHE_UPDATE_INTERVAL: usize = 100;
54const INITIAL_SYNC_FARM_INFO_CHECK_INTERVAL: Duration = Duration::from_secs(1);
55
56type HandlerFn<A> = Arc<dyn Fn(&A) + Send + Sync + 'static>;
57type Handler<A> = Bag<HandlerFn<A>, A>;
58type CacheIndex = u8;
59
60#[derive(Default, Debug)]
61struct Handlers {
62 progress: Handler<f32>,
63}
64
65#[derive(Debug, Clone, Copy)]
66struct FarmerCacheOffset {
67 cache_index: CacheIndex,
68 piece_offset: PieceCacheOffset,
69}
70
71impl FarmerCacheOffset {
72 fn new(cache_index: CacheIndex, piece_offset: PieceCacheOffset) -> Self {
73 Self {
74 cache_index,
75 piece_offset,
76 }
77 }
78}
79
80#[derive(Debug, Clone)]
81struct CacheBackend {
82 backend: Arc<dyn PieceCache>,
83 used_capacity: u32,
84 total_capacity: u32,
85}
86
87impl std::ops::Deref for CacheBackend {
88 type Target = Arc<dyn PieceCache>;
89
90 fn deref(&self) -> &Self::Target {
91 &self.backend
92 }
93}
94
95impl CacheBackend {
96 fn new(backend: Arc<dyn PieceCache>, total_capacity: u32) -> Self {
97 Self {
98 backend,
99 used_capacity: 0,
100 total_capacity,
101 }
102 }
103
104 fn next_free(&mut self) -> Option<PieceCacheOffset> {
105 let offset = self.used_capacity;
106 if offset < self.total_capacity {
107 self.used_capacity += 1;
108 Some(PieceCacheOffset(offset))
109 } else {
110 debug!(?offset, total_capacity = ?self.total_capacity, "No free space in cache backend");
111 None
112 }
113 }
114
115 fn free_size(&self) -> u32 {
116 self.total_capacity - self.used_capacity
117 }
118}
119
120#[derive(Debug)]
121struct CacheState {
122 cache_stored_pieces: HashMap<KeyWithDistance, FarmerCacheOffset>,
123 cache_free_offsets: Vec<FarmerCacheOffset>,
124 backend: CacheBackend,
125}
126
127#[derive(Debug)]
128enum WorkerCommand {
129 ReplaceBackingCaches {
130 new_piece_caches: Vec<Arc<dyn PieceCache>>,
131 },
132 ForgetKey {
133 key: RecordKey,
134 },
135}
136
137#[derive(Debug)]
139#[must_use = "Farmer cache will not work unless its worker is running"]
140pub struct FarmerCacheWorker<NC>
141where
142 NC: fmt::Debug,
143{
144 peer_id: PeerId,
145 node_client: NC,
146 piece_caches: Arc<AsyncRwLock<PieceCachesState>>,
147 plot_caches: Arc<PlotCaches>,
148 handlers: Arc<Handlers>,
149 worker_receiver: Option<mpsc::Receiver<WorkerCommand>>,
150 metrics: Option<Arc<FarmerCacheMetrics>>,
151}
152
153impl<NC> FarmerCacheWorker<NC>
154where
155 NC: NodeClient,
156{
157 pub async fn run<PG>(mut self, piece_getter: PG)
161 where
162 PG: PieceGetter,
163 {
164 let mut last_segment_index_internal = SegmentIndex::ZERO;
166
167 let mut worker_receiver = self
168 .worker_receiver
169 .take()
170 .expect("Always set during worker instantiation");
171
172 if let Some(WorkerCommand::ReplaceBackingCaches { new_piece_caches }) =
173 worker_receiver.next().await
174 {
175 self.initialize(
176 &piece_getter,
177 &mut last_segment_index_internal,
178 new_piece_caches,
179 )
180 .await;
181 } else {
182 return;
184 }
185
186 let mut super_segment_headers_notifications = match self
187 .node_client
188 .subscribe_new_super_segment_headers()
189 .await
190 {
191 Ok(super_segment_headers_notifications) => super_segment_headers_notifications,
192 Err(error) => {
193 error!(%error, "Failed to subscribe to new super segment headers notifications");
194 return;
195 }
196 };
197
198 self.keep_up_after_initial_sync(&piece_getter, &mut last_segment_index_internal)
200 .await;
201
202 loop {
203 select! {
204 maybe_command = worker_receiver.next() => {
205 let Some(command) = maybe_command else {
206 return;
208 };
209
210 self.handle_command(command, &piece_getter, &mut last_segment_index_internal).await;
211 }
212 maybe_super_segment_header = super_segment_headers_notifications.next().fuse() => {
213 if let Some(super_segment_header) = maybe_super_segment_header {
214 self
215 .process_new_segment_index(
216 &piece_getter,
217 super_segment_header.max_segment_index.as_inner(),
218 &mut last_segment_index_internal
219 )
220 .await;
221 } else {
222 return;
225 }
226 }
227 }
228 }
229 }
230
231 async fn handle_command<PG>(
232 &self,
233 command: WorkerCommand,
234 piece_getter: &PG,
235 last_segment_index_internal: &mut SegmentIndex,
236 ) where
237 PG: PieceGetter,
238 {
239 match command {
240 WorkerCommand::ReplaceBackingCaches { new_piece_caches } => {
241 self.initialize(piece_getter, last_segment_index_internal, new_piece_caches)
242 .await;
243 }
244 WorkerCommand::ForgetKey { key } => {
246 let mut caches = self.piece_caches.write().await;
247 let key = KeyWithDistance::new_with_record_key(self.peer_id, key);
248 let Some(offset) = caches.remove_stored_piece(&key) else {
249 return;
251 };
252
253 let cache_index = offset.cache_index;
254 let piece_offset = offset.piece_offset;
255 let Some(backend) = caches.get_backend(cache_index).cloned() else {
256 return;
258 };
259
260 caches.push_dangling_free_offset(offset);
261 match backend.read_piece_index(piece_offset).await {
262 Ok(Some(piece_index)) => {
263 trace!(%piece_index, %cache_index, %piece_offset, "Forget piece");
264 }
265 Ok(None) => {
266 warn!(
267 %cache_index,
268 %piece_offset,
269 "Piece index out of range, this is likely an implementation bug, \
270 not freeing heap element"
271 );
272 }
273 Err(error) => {
274 error!(
275 %error,
276 %cache_index,
277 ?key,
278 %piece_offset,
279 "Error while reading piece from cache"
280 );
281 }
282 }
283 }
284 }
285 }
286
287 async fn initialize<PG>(
288 &self,
289 piece_getter: &PG,
290 last_segment_index_internal: &mut SegmentIndex,
291 new_piece_caches: Vec<Arc<dyn PieceCache>>,
292 ) where
293 PG: PieceGetter,
294 {
295 info!("Initializing piece cache");
296
297 let (mut stored_pieces, mut dangling_free_offsets) =
299 mem::take(&mut *self.piece_caches.write().await).reuse();
300
301 debug!("Collecting pieces that were in the cache before");
302
303 if let Some(metrics) = &self.metrics {
304 metrics.piece_cache_capacity_total.set(0);
305 metrics.piece_cache_capacity_used.set(0);
306 }
307
308 let peer_id = self.peer_id;
309
310 let piece_caches_number = new_piece_caches.len();
312 let maybe_caches_futures = new_piece_caches
313 .into_iter()
314 .enumerate()
315 .filter_map(|(cache_index, new_cache)| {
316 let total_capacity = new_cache.max_num_elements();
317 let mut backend = CacheBackend::new(new_cache, total_capacity);
318 let Ok(cache_index) = CacheIndex::try_from(cache_index) else {
319 warn!(
320 ?piece_caches_number,
321 "Too many piece caches provided, {cache_index} cache will be ignored",
322 );
323 return None;
324 };
325
326 if let Some(metrics) = &self.metrics {
327 metrics
328 .piece_cache_capacity_total
329 .inc_by(total_capacity as i64);
330 }
331
332 let init_fut = async move {
333 let used_capacity = &mut backend.used_capacity;
334
335 let mut maybe_contents = match backend.backend.contents().await {
339 Ok(contents) => Some(contents),
340 Err(error) => {
341 warn!(%error, "Failed to get cache contents");
342
343 None
344 }
345 };
346
347 let mut cache_stored_pieces = HashMap::new();
348 let mut cache_free_offsets = Vec::new();
349
350 let Some(mut contents) = maybe_contents.take() else {
351 drop(maybe_contents);
352
353 return CacheState {
354 cache_stored_pieces,
355 cache_free_offsets,
356 backend,
357 };
358 };
359
360 while let Some(maybe_element_details) = contents.next().await {
361 let (piece_offset, maybe_piece_index) = match maybe_element_details {
362 Ok(element_details) => element_details,
363 Err(error) => {
364 warn!(%error, "Failed to get cache contents element details");
365 break;
366 }
367 };
368 let offset = FarmerCacheOffset::new(cache_index, piece_offset);
369 match maybe_piece_index {
370 Some(piece_index) => {
371 *used_capacity = piece_offset.0 + 1;
372 let record_key = RecordKey::from(piece_index.to_multihash());
373 let key = KeyWithDistance::new_with_record_key(peer_id, record_key);
374 cache_stored_pieces.insert(key, offset);
375 }
376 None => {
377 cache_free_offsets.push(offset);
380 }
381 }
382
383 yield_now().await;
385 }
386
387 drop(maybe_contents);
388 drop(contents);
389
390 CacheState {
391 cache_stored_pieces,
392 cache_free_offsets,
393 backend,
394 }
395 };
396
397 Some(run_future_in_dedicated_thread(
398 move || init_fut.instrument(info_span!("", %cache_index)),
399 format!("piece-cache.{cache_index}"),
400 ))
401 })
402 .collect::<Result<Vec<_>, _>>();
403
404 let caches_futures = match maybe_caches_futures {
405 Ok(caches_futures) => caches_futures,
406 Err(error) => {
407 error!(%error, "Failed to spawn piece cache reading thread");
408
409 return;
410 }
411 };
412
413 let mut backends = Vec::with_capacity(caches_futures.len());
414 let mut caches_futures = caches_futures.into_iter().collect::<FuturesOrdered<_>>();
415
416 while let Some(maybe_cache) = caches_futures.next().await {
417 match maybe_cache {
418 Ok(cache) => {
419 let backend = cache.backend;
420 for (key, cache_offset) in cache.cache_stored_pieces {
421 if let Some(old_cache_offset) = stored_pieces.insert(key, cache_offset) {
422 dangling_free_offsets.push_front(old_cache_offset);
423 }
424 }
425 dangling_free_offsets.extend(
426 cache.cache_free_offsets.into_iter().filter(|free_offset| {
427 free_offset.piece_offset.0 < backend.used_capacity
428 }),
429 );
430 backends.push(backend);
431 }
432 Err(_cancelled) => {
433 error!("Piece cache reading thread panicked");
434
435 return;
436 }
437 };
438 }
439
440 let mut caches = PieceCachesState::new(stored_pieces, dangling_free_offsets, backends);
441
442 info!("Synchronizing piece cache");
443
444 let last_segment_index = loop {
445 match self.node_client.farmer_app_info().await {
446 Ok(farmer_app_info) => {
447 let last_segment_index =
448 farmer_app_info.protocol_info.history_size.segment_index();
449 if !farmer_app_info.syncing || last_segment_index > SegmentIndex::ZERO {
457 break last_segment_index;
458 }
459 }
460 Err(error) => {
461 error!(
462 %error,
463 "Failed to get farmer app info from node, keeping old cache state without \
464 updates"
465 );
466
467 *self.piece_caches.write().await = caches;
469 return;
470 }
471 }
472
473 tokio::time::sleep(INITIAL_SYNC_FARM_INFO_CHECK_INTERVAL).await;
474 };
475
476 debug!(%last_segment_index, "Identified last segment index");
477
478 let segment_indices = Vec::from_iter(SegmentIndex::ZERO..=last_segment_index);
480 let mut piece_indices_to_store = segment_indices
483 .into_par_iter()
484 .flat_map(|segment_index| {
485 segment_index
486 .segment_piece_indexes()
487 .into_par_iter()
488 .map(|piece_index| {
489 (
490 KeyWithDistance::new(self.peer_id, piece_index.to_multihash()),
491 piece_index,
492 )
493 })
494 })
495 .collect::<Vec<_>>();
496
497 piece_indices_to_store.par_sort_unstable_by(|(a_key, _), (b_key, _)| a_key.cmp(b_key));
500
501 let mut piece_indices_to_store = piece_indices_to_store
503 .into_iter()
504 .take(caches.total_capacity())
505 .collect::<HashMap<_, _>>();
506
507 let mut piece_caches_capacity_used = vec![0u32; caches.backends().len()];
508 caches.free_unneeded_stored_pieces(&mut piece_indices_to_store);
512
513 if let Some(metrics) = &self.metrics {
514 for offset in caches.stored_pieces_offsets() {
515 piece_caches_capacity_used[usize::from(offset.cache_index)] += 1;
516 }
517
518 for cache_used in piece_caches_capacity_used {
519 metrics
520 .piece_cache_capacity_used
521 .inc_by(i64::from(cache_used));
522 }
523 }
524
525 self.piece_caches.write().await.clone_from(&caches);
527 let stored_count = caches.stored_pieces_offsets().len();
528
529 debug!(
530 %stored_count,
531 count = %piece_indices_to_store.len(),
532 "Identified piece indices that should be cached",
533 );
534
535 let pieces_to_download_total = piece_indices_to_store.len() + stored_count;
536 let piece_indices_to_store = piece_indices_to_store
537 .into_values()
538 .collect::<Vec<_>>()
539 .chunks(SYNC_BATCH_SIZE)
543 .map(|chunk| chunk.to_vec())
544 .collect::<Vec<_>>();
545
546 let downloaded_pieces_count = AtomicUsize::new(stored_count);
547 let caches = Mutex::new(caches);
548 self.handlers.progress.call_simple(&0.0);
549 let batch_count = piece_indices_to_store.len();
550 let piece_indices_to_store = piece_indices_to_store.into_iter().enumerate();
551
552 let downloading_semaphore = &Semaphore::new(SYNC_BATCH_SIZE * SYNC_CONCURRENT_BATCHES);
553 let ignored_cache_indices = &RwLock::new(HashSet::new());
554
555 let downloading_pieces_stream =
556 stream::iter(piece_indices_to_store.map(|(batch, piece_indices)| {
557 let downloaded_pieces_count = &downloaded_pieces_count;
558 let caches = &caches;
559 let num_pieces = piece_indices.len();
560
561 trace!(
562 %num_pieces,
563 %batch,
564 %batch_count,
565 first_piece_index = ?piece_indices.first().expect("chunks are never empty"),
566 last_piece_index = ?piece_indices.last().expect("chunks are never empty"),
567 downloaded_pieces_count = %downloaded_pieces_count.load(Ordering::Relaxed),
568 %pieces_to_download_total,
569 available_permits = %downloading_semaphore.available_permits(),
570 "Started piece cache sync batch",
571 );
572
573 async move {
574 let mut permit = downloading_semaphore
575 .acquire_many(SYNC_BATCH_SIZE as u32)
576 .await
577 .expect("Semaphore is never closed; qed");
578 debug!(%batch, %num_pieces, "Downloading pieces");
579
580 let pieces_stream = match piece_getter.get_pieces(piece_indices).await {
581 Ok(pieces_stream) => pieces_stream,
582 Err(error) => {
583 error!(
584 %error,
585 "Failed to get pieces from piece getter"
586 );
587 return;
588 }
589 };
590 let mut pieces_stream = pieces_stream.enumerate();
591
592 while let Some((index, (piece_index, result))) = pieces_stream.next().await {
593 debug!(%batch, %index, %piece_index, "Downloaded piece");
594 let _permit = permit.split(1);
596
597 let piece = match result {
598 Ok(Some(piece)) => {
599 trace!(%batch, %piece_index, "Downloaded piece successfully");
600 piece
601 }
602 Ok(None) => {
603 debug!(%batch, %piece_index, "Couldn't find piece");
604 continue;
605 }
606 Err(error) => {
607 debug!(
608 %batch,
609 %error,
610 %piece_index,
611 "Failed to get piece for piece cache"
612 );
613 continue;
614 }
615 };
616
617 let (offset, maybe_backend) = {
618 let mut caches = caches.lock();
619
620 let Some(offset) = caches.pop_free_offset() else {
622 error!(
623 %batch,
624 %piece_index,
625 "Failed to store piece in cache, there was no space"
626 );
627 break;
628 };
629
630 (offset, caches.get_backend(offset.cache_index).cloned())
631 };
632
633 let cache_index = offset.cache_index;
634 let piece_offset = offset.piece_offset;
635
636 let skip_write = ignored_cache_indices.read().contains(&cache_index);
637 if skip_write {
638 trace!(
639 %batch,
640 %cache_index,
641 %piece_index,
642 %piece_offset,
643 "Skipping known problematic cache index"
644 );
645 } else {
646 if let Some(backend) = maybe_backend
647 && let Err(error) =
648 backend.write_piece(piece_offset, piece_index, &piece).await
649 {
650 error!(
651 %error,
652 %batch,
653 %cache_index,
654 %piece_index,
655 %piece_offset,
656 "Failed to write piece into cache, ignoring this cache going \
657 forward"
658 );
659 ignored_cache_indices.write().insert(cache_index);
660 continue;
661 }
662
663 let key =
664 KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
665 caches.lock().push_stored_piece(key, offset);
666 }
667
668 let prev_downloaded_pieces_count =
669 downloaded_pieces_count.fetch_add(1, Ordering::Relaxed);
670 if prev_downloaded_pieces_count != pieces_to_download_total {
673 let progress = prev_downloaded_pieces_count as f32
674 / pieces_to_download_total as f32
675 * 100.0;
676 if prev_downloaded_pieces_count
677 .is_multiple_of(INTERMEDIATE_CACHE_UPDATE_INTERVAL)
678 {
679 let mut piece_caches = self.piece_caches.write().await;
680 piece_caches.clone_from(&caches.lock());
681
682 info!(
683 "Piece cache sync {progress:.2}% complete ({} / {})",
684 ByteSize::b(
685 (prev_downloaded_pieces_count * Piece::SIZE) as u64,
686 )
687 .display()
688 .iec(),
689 ByteSize::b((pieces_to_download_total * Piece::SIZE) as u64,)
690 .display()
691 .iec(),
692 );
693 }
694
695 self.handlers.progress.call_simple(&progress);
696 }
697 }
698
699 trace!(
700 %num_pieces,
701 %batch,
702 %batch_count,
703 downloaded_pieces_count = %downloaded_pieces_count.load(Ordering::Relaxed),
704 %pieces_to_download_total,
705 available_permits = %downloading_semaphore.available_permits(),
706 "Finished piece cache sync batch",
707 );
708 }
709 }));
710
711 downloading_pieces_stream
714 .buffer_unordered(SYNC_CONCURRENT_BATCHES * 10)
717 .for_each(|()| async {})
719 .await;
720
721 *self.piece_caches.write().await = caches.into_inner();
722 self.handlers.progress.call_simple(&100.0);
723 *last_segment_index_internal = last_segment_index;
724
725 info!("Finished piece cache synchronization");
726 }
727
728 async fn process_new_segment_index<PG>(
729 &self,
730 piece_getter: &PG,
731 segment_index: SegmentIndex,
732 last_segment_index_internal: &mut SegmentIndex,
733 ) where
734 PG: PieceGetter,
735 {
736 debug!(
737 %last_segment_index_internal,
738 %segment_index,
739 "Starting to process new segment index"
740 );
741
742 let segment_index_range =
743 (*last_segment_index_internal + SegmentIndex::ONE)..=segment_index;
744
745 if segment_index_range.is_empty() {
746 return;
747 }
748
749 debug!(
750 ?segment_index_range,
751 "Downloading potentially useful pieces"
752 );
753
754 let pieces_to_maybe_include = segment_index_range
760 .flat_map(|segment_index| segment_index.segment_piece_indexes())
761 .map(|piece_index| async move {
762 let should_store_in_piece_cache = self
763 .piece_caches
764 .read()
765 .await
766 .should_include_key(self.peer_id, piece_index);
767
768 let key = RecordKey::from(piece_index.to_multihash());
769 let should_store_in_plot_cache =
770 self.plot_caches.should_store(piece_index, &key).await;
771
772 if !(should_store_in_piece_cache || should_store_in_plot_cache) {
773 trace!(%piece_index, "Piece doesn't need to be cached #1");
774
775 return None;
776 }
777
778 let maybe_piece_result =
779 self.node_client
780 .piece(piece_index)
781 .await
782 .inspect_err(|error| {
783 debug!(
784 %error,
785 %segment_index,
786 %piece_index,
787 "Failed to retrieve piece from node right after archiving"
788 );
789 });
790
791 if let Ok(Some(piece)) = maybe_piece_result {
792 return Some((piece_index, piece));
793 }
794
795 match piece_getter.get_piece(piece_index).await {
796 Ok(Some(piece)) => Some((piece_index, piece)),
797 Ok(None) => {
798 warn!(
799 %segment_index,
800 %piece_index,
801 "Failed to retrieve piece right after archiving"
802 );
803
804 None
805 }
806 Err(error) => {
807 warn!(
808 %error,
809 %segment_index,
810 %piece_index,
811 "Failed to retrieve piece right after archiving"
812 );
813
814 None
815 }
816 }
817 })
818 .collect::<FuturesUnordered<_>>()
819 .filter_map(|maybe_piece| async move { maybe_piece })
820 .collect::<Vec<_>>()
821 .await;
822
823 debug!(%segment_index, "Downloaded potentially useful pieces");
824
825 for (piece_index, piece) in pieces_to_maybe_include {
828 if !self
829 .plot_caches
830 .store_additional_piece(piece_index, &piece)
831 .await
832 {
833 trace!(%piece_index, "Piece could not be cached in plot cache");
834 }
835
836 if !self
837 .piece_caches
838 .read()
839 .await
840 .should_include_key(self.peer_id, piece_index)
841 {
842 trace!(%piece_index, "Piece doesn't need to be cached #2");
843
844 continue;
845 }
846
847 trace!(%piece_index, "Piece needs to be cached #1");
848
849 self.persist_piece_in_cache(piece_index, piece).await;
850 }
851
852 *last_segment_index_internal = segment_index;
853
854 debug!(%segment_index, "Finished processing new segment indices");
855 }
856
857 async fn keep_up_after_initial_sync<PG>(
858 &self,
859 piece_getter: &PG,
860 last_segment_index_internal: &mut SegmentIndex,
861 ) where
862 PG: PieceGetter,
863 {
864 let last_segment_index = match self.node_client.farmer_app_info().await {
865 Ok(farmer_app_info) => farmer_app_info.protocol_info.history_size.segment_index(),
866 Err(error) => {
867 error!(
868 %error,
869 "Failed to get farmer app info from node, keeping old cache state without \
870 updates"
871 );
872 return;
873 }
874 };
875
876 if last_segment_index <= *last_segment_index_internal {
877 return;
878 }
879
880 info!(
881 "Syncing piece cache to the latest history size, this may pause block production if \
882 takes too long"
883 );
884
885 let piece_indices = (*last_segment_index_internal..=last_segment_index)
887 .flat_map(|segment_index| segment_index.segment_piece_indexes());
888
889 for piece_index in piece_indices {
891 if !self
892 .piece_caches
893 .read()
894 .await
895 .should_include_key(self.peer_id, piece_index)
896 {
897 trace!(%piece_index, "Piece doesn't need to be cached #3");
898
899 continue;
900 }
901
902 trace!(%piece_index, "Piece needs to be cached #2");
903
904 let result = piece_getter.get_piece(piece_index).await;
905
906 let piece = match result {
907 Ok(Some(piece)) => piece,
908 Ok(None) => {
909 debug!(%piece_index, "Couldn't find piece");
910 continue;
911 }
912 Err(error) => {
913 debug!(
914 %error,
915 %piece_index,
916 "Failed to get piece for piece cache"
917 );
918 continue;
919 }
920 };
921
922 self.persist_piece_in_cache(piece_index, piece).await;
923 }
924
925 info!("Finished syncing piece cache to the latest history size");
926
927 *last_segment_index_internal = last_segment_index;
928 }
929
930 async fn persist_piece_in_cache(&self, piece_index: PieceIndex, piece: Piece) {
933 let key = KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
934 let mut caches = self.piece_caches.write().await;
935 match caches.should_replace(&key) {
936 Some((old_key, offset)) => {
938 let cache_index = offset.cache_index;
939 let piece_offset = offset.piece_offset;
940 let Some(backend) = caches.get_backend(cache_index) else {
941 warn!(
943 %cache_index,
944 %piece_index,
945 "Should have a cached backend, but it didn't exist, this is an \
946 implementation bug"
947 );
948 return;
949 };
950 if let Err(error) = backend.write_piece(piece_offset, piece_index, &piece).await {
951 error!(
952 %error,
953 %cache_index,
954 %piece_index,
955 %piece_offset,
956 "Failed to write piece into cache"
957 );
958 } else {
959 let old_piece_index = decode_piece_index_from_record_key(old_key.record_key());
960 trace!(
961 %cache_index,
962 %old_piece_index,
963 %piece_index,
964 %piece_offset,
965 "Successfully replaced old cached piece"
966 );
967 caches.push_stored_piece(key, offset);
968 }
969 }
970 None => {
972 let Some(offset) = caches.pop_free_offset() else {
973 warn!(
974 %piece_index,
975 "Should have inserted piece into cache, but it didn't happen, this is an \
976 implementation bug"
977 );
978 return;
979 };
980 let cache_index = offset.cache_index;
981 let piece_offset = offset.piece_offset;
982 let Some(backend) = caches.get_backend(cache_index) else {
983 warn!(
985 %cache_index,
986 %piece_index,
987 "Should have a cached backend, but it didn't exist, this is an \
988 implementation bug"
989 );
990 return;
991 };
992
993 if let Err(error) = backend.write_piece(piece_offset, piece_index, &piece).await {
994 error!(
995 %error,
996 %cache_index,
997 %piece_index,
998 %piece_offset,
999 "Failed to write piece into cache"
1000 );
1001 } else {
1002 trace!(
1003 %cache_index,
1004 %piece_index,
1005 %piece_offset,
1006 "Successfully stored piece in cache"
1007 );
1008 if let Some(metrics) = &self.metrics {
1009 metrics.piece_cache_capacity_used.inc();
1010 }
1011 caches.push_stored_piece(key, offset);
1012 }
1013 }
1014 };
1015 }
1016}
1017
1018#[derive(Debug)]
1019struct PlotCaches {
1020 caches: AsyncRwLock<Vec<Arc<dyn PlotCache>>>,
1022 next_plot_cache: AtomicUsize,
1024}
1025
1026impl PlotCaches {
1027 async fn should_store(&self, piece_index: PieceIndex, key: &RecordKey) -> bool {
1033 for (cache_index, cache) in self.caches.read().await.iter().enumerate() {
1034 match cache.is_piece_maybe_stored(key).await {
1035 Ok(MaybePieceStoredResult::No) => {
1036 }
1038 Ok(MaybePieceStoredResult::Vacant) => {
1039 return true;
1040 }
1041 Ok(MaybePieceStoredResult::Yes) => {
1042 return false;
1044 }
1045 Err(error) => {
1046 warn!(
1047 %cache_index,
1048 %piece_index,
1049 %error,
1050 "Failed to check piece stored in cache"
1051 );
1052 }
1053 }
1054 }
1055
1056 false
1057 }
1058
1059 async fn store_additional_piece(&self, piece_index: PieceIndex, piece: &Piece) -> bool {
1063 let plot_caches = self.caches.read().await;
1064 let plot_caches_len = plot_caches.len();
1065
1066 for _ in 0..plot_caches_len {
1068 let plot_cache_index =
1069 self.next_plot_cache.fetch_add(1, Ordering::Relaxed) % plot_caches_len;
1070
1071 match plot_caches[plot_cache_index]
1072 .try_store_piece(piece_index, piece)
1073 .await
1074 {
1075 Ok(true) => {
1076 return true;
1077 }
1078 Ok(false) => {
1079 continue;
1080 }
1081 Err(error) => {
1082 error!(
1083 %error,
1084 %piece_index,
1085 %plot_cache_index,
1086 "Failed to store additional piece in cache"
1087 );
1088 continue;
1089 }
1090 }
1091 }
1092
1093 false
1094 }
1095}
1096
1097#[derive(Debug, Clone)]
1108pub struct FarmerCache {
1109 peer_id: PeerId,
1110 piece_caches: Arc<AsyncRwLock<PieceCachesState>>,
1112 plot_caches: Arc<PlotCaches>,
1114 handlers: Arc<Handlers>,
1115 worker_sender: mpsc::Sender<WorkerCommand>,
1117 metrics: Option<Arc<FarmerCacheMetrics>>,
1118}
1119
1120impl FarmerCache {
1121 pub fn new<NC>(
1126 node_client: NC,
1127 peer_id: PeerId,
1128 registry: Option<&mut Registry>,
1129 ) -> (Self, FarmerCacheWorker<NC>)
1130 where
1131 NC: NodeClient,
1132 {
1133 let caches = Arc::default();
1134 let (worker_sender, worker_receiver) = mpsc::channel(WORKER_CHANNEL_CAPACITY);
1135 let handlers = Arc::new(Handlers::default());
1136
1137 let plot_caches = Arc::new(PlotCaches {
1138 caches: AsyncRwLock::default(),
1139 next_plot_cache: AtomicUsize::new(0),
1140 });
1141 let metrics = registry.map(|registry| Arc::new(FarmerCacheMetrics::new(registry)));
1142
1143 let instance = Self {
1144 peer_id,
1145 piece_caches: Arc::clone(&caches),
1146 plot_caches: Arc::clone(&plot_caches),
1147 handlers: Arc::clone(&handlers),
1148 worker_sender,
1149 metrics: metrics.clone(),
1150 };
1151 let worker = FarmerCacheWorker {
1152 peer_id,
1153 node_client,
1154 piece_caches: caches,
1155 plot_caches,
1156 handlers,
1157 worker_receiver: Some(worker_receiver),
1158 metrics,
1159 };
1160
1161 (instance, worker)
1162 }
1163
1164 pub async fn get_piece<Key>(&self, key: Key) -> Option<Piece>
1166 where
1167 RecordKey: From<Key>,
1168 {
1169 let key = RecordKey::from(key);
1170 let maybe_piece_found = {
1171 let key = KeyWithDistance::new_with_record_key(self.peer_id, key.clone());
1172 let caches = self.piece_caches.read().await;
1173
1174 caches.get_stored_piece(&key).and_then(|offset| {
1175 let cache_index = offset.cache_index;
1176 let piece_offset = offset.piece_offset;
1177 Some((
1178 piece_offset,
1179 cache_index,
1180 caches.get_backend(cache_index)?.clone(),
1181 ))
1182 })
1183 };
1184
1185 if let Some((piece_offset, cache_index, backend)) = maybe_piece_found {
1186 match backend.read_piece(piece_offset).await {
1187 Ok(maybe_piece) => {
1188 return match maybe_piece {
1189 Some((_piece_index, piece)) => {
1190 if let Some(metrics) = &self.metrics {
1191 metrics.cache_get_hit.inc();
1192 }
1193 Some(piece)
1194 }
1195 None => {
1196 error!(
1197 %cache_index,
1198 %piece_offset,
1199 ?key,
1200 "Piece was expected to be in cache, but wasn't found there"
1201 );
1202 if let Some(metrics) = &self.metrics {
1203 metrics.cache_get_error.inc();
1204 }
1205 None
1206 }
1207 };
1208 }
1209 Err(error) => {
1210 error!(
1211 %error,
1212 %cache_index,
1213 ?key,
1214 %piece_offset,
1215 "Error while reading piece from cache"
1216 );
1217
1218 if let Err(error) = self
1219 .worker_sender
1220 .clone()
1221 .send(WorkerCommand::ForgetKey { key })
1222 .await
1223 {
1224 trace!(%error, "Failed to send ForgetKey command to worker");
1225 }
1226
1227 if let Some(metrics) = &self.metrics {
1228 metrics.cache_get_error.inc();
1229 }
1230 return None;
1231 }
1232 }
1233 }
1234
1235 for cache in self.plot_caches.caches.read().await.iter() {
1236 if let Ok(Some(piece)) = cache.read_piece(&key).await {
1237 if let Some(metrics) = &self.metrics {
1238 metrics.cache_get_hit.inc();
1239 }
1240 return Some(piece);
1241 }
1242 }
1243
1244 if let Some(metrics) = &self.metrics {
1245 metrics.cache_get_miss.inc();
1246 }
1247 None
1248 }
1249
1250 pub async fn get_pieces<'a, PieceIndices>(
1254 &'a self,
1255 piece_indices: PieceIndices,
1256 ) -> impl Stream<Item = (PieceIndex, Option<Piece>)> + Send + Unpin + 'a
1257 where
1258 PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send + 'a> + Send + 'a,
1259 {
1260 let mut pieces_to_get_from_plot_cache = Vec::new();
1261
1262 let pieces_to_read_from_piece_cache = {
1263 let caches = self.piece_caches.read().await;
1264 let mut pieces_to_read_from_piece_cache =
1266 HashMap::<CacheIndex, (CacheBackend, HashMap<_, _>)>::new();
1267
1268 for piece_index in piece_indices {
1269 let key = RecordKey::from(piece_index.to_multihash());
1270
1271 let offset = match caches.get_stored_piece(&KeyWithDistance::new_with_record_key(
1272 self.peer_id,
1273 key.clone(),
1274 )) {
1275 Some(offset) => offset,
1276 None => {
1277 pieces_to_get_from_plot_cache.push((piece_index, key));
1278 continue;
1279 }
1280 };
1281
1282 let cache_index = offset.cache_index;
1283 let piece_offset = offset.piece_offset;
1284
1285 match pieces_to_read_from_piece_cache.entry(cache_index) {
1286 Entry::Occupied(mut entry) => {
1287 let (_backend, pieces) = entry.get_mut();
1288 pieces.insert(piece_offset, (piece_index, key));
1289 }
1290 Entry::Vacant(entry) => {
1291 let backend = match caches.get_backend(cache_index) {
1292 Some(backend) => backend.clone(),
1293 None => {
1294 pieces_to_get_from_plot_cache.push((piece_index, key));
1295 continue;
1296 }
1297 };
1298 entry
1299 .insert((backend, HashMap::from([(piece_offset, (piece_index, key))])));
1300 }
1301 }
1302 }
1303
1304 pieces_to_read_from_piece_cache
1305 };
1306
1307 let (tx, mut rx) = mpsc::unbounded();
1308
1309 let fut = async move {
1310 let tx = &tx;
1311
1312 let mut reading_from_piece_cache = pieces_to_read_from_piece_cache
1313 .into_iter()
1314 .map(|(cache_index, (backend, mut pieces_to_get))| async move {
1315 let mut pieces_stream = match backend
1316 .read_pieces(Box::new(
1317 pieces_to_get
1318 .keys()
1319 .copied()
1320 .collect::<Vec<_>>()
1321 .into_iter(),
1322 ))
1323 .await
1324 {
1325 Ok(pieces_stream) => pieces_stream,
1326 Err(error) => {
1327 error!(
1328 %error,
1329 %cache_index,
1330 "Error while reading pieces from cache"
1331 );
1332
1333 if let Some(metrics) = &self.metrics {
1334 metrics.cache_get_error.inc_by(pieces_to_get.len() as u64);
1335 }
1336 for (piece_index, _key) in pieces_to_get.into_values() {
1337 tx.unbounded_send((piece_index, None)).expect(
1338 "This future isn't polled after receiver is dropped; qed",
1339 );
1340 }
1341 return;
1342 }
1343 };
1344
1345 while let Some(maybe_piece) = pieces_stream.next().await {
1346 let result = match maybe_piece {
1347 Ok((piece_offset, Some((piece_index, piece)))) => {
1348 pieces_to_get.remove(&piece_offset);
1349
1350 if let Some(metrics) = &self.metrics {
1351 metrics.cache_get_hit.inc();
1352 }
1353 (piece_index, Some(piece))
1354 }
1355 Ok((piece_offset, None)) => {
1356 let Some((piece_index, key)) = pieces_to_get.remove(&piece_offset)
1357 else {
1358 debug!(
1359 %cache_index,
1360 %piece_offset,
1361 "Received piece offset that was not expected"
1362 );
1363 continue;
1364 };
1365
1366 error!(
1367 %cache_index,
1368 %piece_index,
1369 %piece_offset,
1370 ?key,
1371 "Piece was expected to be in cache, but wasn't found there"
1372 );
1373 if let Some(metrics) = &self.metrics {
1374 metrics.cache_get_error.inc();
1375 }
1376 (piece_index, None)
1377 }
1378 Err(error) => {
1379 error!(
1380 %error,
1381 %cache_index,
1382 "Error while reading piece from cache"
1383 );
1384
1385 if let Some(metrics) = &self.metrics {
1386 metrics.cache_get_error.inc();
1387 }
1388 continue;
1389 }
1390 };
1391
1392 tx.unbounded_send(result)
1393 .expect("This future isn't polled after receiver is dropped; qed");
1394 }
1395
1396 if pieces_to_get.is_empty() {
1397 return;
1398 }
1399
1400 if let Some(metrics) = &self.metrics {
1401 metrics.cache_get_error.inc_by(pieces_to_get.len() as u64);
1402 }
1403 for (piece_offset, (piece_index, key)) in pieces_to_get {
1404 error!(
1405 %cache_index,
1406 %piece_index,
1407 %piece_offset,
1408 ?key,
1409 "Piece cache didn't return an entry for offset"
1410 );
1411
1412 tx.unbounded_send((piece_index, None))
1415 .expect("This future isn't polled after receiver is dropped; qed");
1416 }
1417 })
1418 .collect::<FuturesUnordered<_>>();
1419 let reading_from_piece_cache_fut = async move {
1425 while let Some(()) = reading_from_piece_cache.next().await {
1426 }
1428 };
1429
1430 let reading_from_plot_cache_fut = async {
1431 if pieces_to_get_from_plot_cache.is_empty() {
1432 return;
1433 }
1434
1435 for cache in self.plot_caches.caches.read().await.iter() {
1436 for offset in (0..pieces_to_get_from_plot_cache.len()).rev() {
1439 let (piece_index, key) = &pieces_to_get_from_plot_cache[offset];
1440
1441 if let Ok(Some(piece)) = cache.read_piece(key).await {
1442 if let Some(metrics) = &self.metrics {
1443 metrics.cache_get_hit.inc();
1444 }
1445 tx.unbounded_send((*piece_index, Some(piece)))
1446 .expect("This future isn't polled after receiver is dropped; qed");
1447
1448 pieces_to_get_from_plot_cache.swap_remove(offset);
1451 }
1452 }
1453
1454 if pieces_to_get_from_plot_cache.is_empty() {
1455 return;
1456 }
1457 }
1458
1459 if let Some(metrics) = &self.metrics {
1460 metrics
1461 .cache_get_miss
1462 .inc_by(pieces_to_get_from_plot_cache.len() as u64);
1463 }
1464
1465 for (piece_index, _key) in pieces_to_get_from_plot_cache {
1466 tx.unbounded_send((piece_index, None))
1467 .expect("This future isn't polled after receiver is dropped; qed");
1468 }
1469 };
1470
1471 join!(reading_from_piece_cache_fut, reading_from_plot_cache_fut).await
1472 };
1473 let mut fut = Box::pin(fut.fuse());
1474
1475 stream::poll_fn(move |cx| {
1477 if !fut.is_terminated() {
1478 let _ = fut.poll_unpin(cx);
1480 }
1481
1482 if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) {
1483 return Poll::Ready(maybe_result);
1484 }
1485
1486 Poll::Pending
1488 })
1489 }
1490
1491 pub async fn has_pieces(&self, mut piece_indices: Vec<PieceIndex>) -> Vec<PieceIndex> {
1493 let mut pieces_to_find = HashMap::<PieceIndex, RecordKey>::from_iter(
1494 piece_indices
1495 .iter()
1496 .map(|piece_index| (*piece_index, RecordKey::from(piece_index.to_multihash()))),
1497 );
1498
1499 {
1501 let piece_caches = self.piece_caches.read().await;
1502 pieces_to_find.retain(|_piece_index, key| {
1503 let distance_key = KeyWithDistance::new(self.peer_id, key.clone());
1504 !piece_caches.contains_stored_piece(&distance_key)
1505 });
1506 }
1507
1508 if pieces_to_find.is_empty() {
1510 return piece_indices;
1511 }
1512
1513 if let Some(plot_caches) = self.plot_caches.caches.try_read() {
1515 let plot_caches = &plot_caches;
1516 let not_found = pieces_to_find
1517 .into_iter()
1518 .map(|(piece_index, key)| async move {
1519 let key = &key;
1520
1521 let found = plot_caches
1522 .iter()
1523 .map(|plot_cache| async {
1524 matches!(
1525 plot_cache.is_piece_maybe_stored(key).await,
1526 Ok(MaybePieceStoredResult::Yes)
1527 )
1528 })
1529 .collect::<FuturesUnordered<_>>()
1530 .any(|found| async move { found })
1531 .await;
1532
1533 if found { None } else { Some(piece_index) }
1534 })
1535 .collect::<FuturesUnordered<_>>()
1536 .filter_map(|maybe_piece_index| async move { maybe_piece_index })
1537 .collect::<HashSet<_>>()
1538 .await;
1539 piece_indices.retain(|piece_index| !not_found.contains(piece_index));
1540 }
1541 piece_indices
1542 }
1543
1544 pub async fn find_piece(
1546 &self,
1547 piece_index: PieceIndex,
1548 ) -> Option<(PieceCacheId, PieceCacheOffset)> {
1549 let caches = self.piece_caches.read().await;
1550
1551 self.find_piece_internal(&caches, piece_index)
1552 }
1553
1554 pub async fn find_pieces<PieceIndices>(
1556 &self,
1557 piece_indices: PieceIndices,
1558 ) -> Vec<(PieceIndex, PieceCacheId, PieceCacheOffset)>
1559 where
1560 PieceIndices: IntoIterator<Item = PieceIndex>,
1561 {
1562 let caches = self.piece_caches.read().await;
1563
1564 piece_indices
1565 .into_iter()
1566 .filter_map(|piece_index| {
1567 self.find_piece_internal(&caches, piece_index)
1568 .map(|(cache_id, piece_offset)| (piece_index, cache_id, piece_offset))
1569 })
1570 .collect()
1571 }
1572
1573 fn find_piece_internal(
1574 &self,
1575 caches: &PieceCachesState,
1576 piece_index: PieceIndex,
1577 ) -> Option<(PieceCacheId, PieceCacheOffset)> {
1578 let key = KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
1579
1580 let Some(offset) = caches.get_stored_piece(&key) else {
1581 if let Some(metrics) = &self.metrics {
1582 metrics.cache_find_miss.inc();
1583 }
1584
1585 return None;
1586 };
1587 let piece_offset = offset.piece_offset;
1588
1589 if let Some(backend) = caches.get_backend(offset.cache_index) {
1590 if let Some(metrics) = &self.metrics {
1591 metrics.cache_find_hit.inc();
1592 }
1593 return Some((*backend.id(), piece_offset));
1594 }
1595
1596 if let Some(metrics) = &self.metrics {
1597 metrics.cache_find_miss.inc();
1598 }
1599 None
1600 }
1601
1602 pub async fn maybe_store_additional_piece(
1606 &self,
1607 piece_index: PieceIndex,
1608 piece: &Piece,
1609 ) -> bool {
1610 let key = RecordKey::from(piece_index.to_multihash());
1611
1612 let should_store = self.plot_caches.should_store(piece_index, &key).await;
1613
1614 if !should_store {
1615 return false;
1616 }
1617
1618 self.plot_caches
1619 .store_additional_piece(piece_index, piece)
1620 .await
1621 }
1622
1623 pub async fn replace_backing_caches(
1625 &self,
1626 new_piece_caches: Vec<Arc<dyn PieceCache>>,
1627 new_plot_caches: Vec<Arc<dyn PlotCache>>,
1628 ) {
1629 if let Err(error) = self
1630 .worker_sender
1631 .clone()
1632 .send(WorkerCommand::ReplaceBackingCaches { new_piece_caches })
1633 .await
1634 {
1635 warn!(%error, "Failed to replace backing caches, worker exited");
1636 }
1637
1638 *self.plot_caches.caches.write().await = new_plot_caches;
1639 }
1640
1641 pub fn on_sync_progress(&self, callback: HandlerFn<f32>) -> HandlerId {
1643 self.handlers.progress.add(callback)
1644 }
1645}
1646
1647#[derive(Debug, Clone)]
1649pub struct FarmerCaches {
1650 caches: Arc<[FarmerCache]>,
1651}
1652
1653impl From<Arc<[FarmerCache]>> for FarmerCaches {
1654 fn from(caches: Arc<[FarmerCache]>) -> Self {
1655 Self { caches }
1656 }
1657}
1658
1659impl From<FarmerCache> for FarmerCaches {
1660 fn from(cache: FarmerCache) -> Self {
1661 Self {
1662 caches: Arc::new([cache]),
1663 }
1664 }
1665}
1666
1667impl FarmerCaches {
1668 pub async fn get_piece<Key>(&self, key: Key) -> Option<Piece>
1670 where
1671 RecordKey: From<Key>,
1672 {
1673 let farmer_cache = self.caches.choose(&mut rand::rng())?;
1674 farmer_cache.get_piece(key).await
1675 }
1676
1677 pub async fn get_pieces<'a, PieceIndices>(
1681 &'a self,
1682 piece_indices: PieceIndices,
1683 ) -> impl Stream<Item = (PieceIndex, Option<Piece>)> + Send + Unpin + 'a
1684 where
1685 PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send + 'a> + Send + 'a,
1686 {
1687 let Some(farmer_cache) = self.caches.choose(&mut rand::rng()) else {
1688 return Either::Left(stream::iter(
1689 piece_indices
1690 .into_iter()
1691 .map(|piece_index| (piece_index, None)),
1692 ));
1693 };
1694
1695 Either::Right(farmer_cache.get_pieces(piece_indices).await)
1696 }
1697
1698 pub async fn has_pieces(&self, piece_indices: Vec<PieceIndex>) -> Vec<PieceIndex> {
1700 let Some(farmer_cache) = self.caches.choose(&mut rand::rng()) else {
1701 return Vec::new();
1702 };
1703
1704 farmer_cache.has_pieces(piece_indices).await
1705 }
1706
1707 pub async fn maybe_store_additional_piece(
1711 &self,
1712 piece_index: PieceIndex,
1713 piece: &Piece,
1714 ) -> bool {
1715 self.caches
1717 .iter()
1718 .map(|farmer_cache| farmer_cache.maybe_store_additional_piece(piece_index, piece))
1719 .collect::<FuturesUnordered<_>>()
1720 .fold::<bool, _, _>(false, |acc, stored| async move { acc || stored })
1721 .await
1722 }
1723}
1724
1725fn decode_piece_index_from_record_key(key: &RecordKey) -> PieceIndex {
1727 let len = key.as_ref().len();
1728 let s = len - PieceIndex::SIZE;
1729
1730 let mut piece_index_bytes = [0u8; PieceIndex::SIZE];
1731 piece_index_bytes.copy_from_slice(&key.as_ref()[s..]);
1732
1733 PieceIndex::from_bytes(piece_index_bytes)
1734}