1mod metrics;
7mod piece_cache_state;
8#[cfg(not(miri))]
10#[cfg(test)]
11mod tests;
12
13use crate::farm::{MaybePieceStoredResult, PieceCache, PieceCacheId, PieceCacheOffset, PlotCache};
14use crate::farmer_cache::metrics::FarmerCacheMetrics;
15use crate::farmer_cache::piece_cache_state::PieceCachesState;
16use crate::node_client::NodeClient;
17use crate::utils::run_future_in_dedicated_thread;
18use ab_core_primitives::pieces::{Piece, PieceIndex};
19use ab_core_primitives::segments::SegmentIndex;
20use ab_data_retrieval::piece_getter::PieceGetter;
21use ab_networking::KeyWithDistance;
22use ab_networking::libp2p::PeerId;
23use ab_networking::libp2p::kad::RecordKey;
24use ab_networking::utils::multihash::ToMultihash;
25use async_lock::RwLock as AsyncRwLock;
26use bytesize::ByteSize;
27use event_listener_primitives::{Bag, HandlerId};
28use futures::channel::mpsc;
29use futures::future::{Either, FusedFuture};
30use futures::stream::{FuturesOrdered, FuturesUnordered};
31use futures::{FutureExt, SinkExt, Stream, StreamExt, select, stream};
32use parking_lot::{Mutex, RwLock};
33use prometheus_client::registry::Registry;
34use rand::prelude::*;
35use rayon::prelude::*;
36use std::collections::hash_map::Entry;
37use std::collections::{HashMap, HashSet};
38use std::future::join;
39use std::sync::Arc;
40use std::sync::atomic::{AtomicUsize, Ordering};
41use std::task::Poll;
42use std::time::Duration;
43use std::{fmt, mem};
44use tokio::sync::Semaphore;
45use tokio::task::yield_now;
46use tracing::{Instrument, debug, error, info, info_span, trace, warn};
47
48const WORKER_CHANNEL_CAPACITY: usize = 100;
49const SYNC_BATCH_SIZE: usize = 256;
50const SYNC_CONCURRENT_BATCHES: usize = 4;
51const INTERMEDIATE_CACHE_UPDATE_INTERVAL: usize = 100;
54const INITIAL_SYNC_FARM_INFO_CHECK_INTERVAL: Duration = Duration::from_secs(1);
55
56type HandlerFn<A> = Arc<dyn Fn(&A) + Send + Sync + 'static>;
57type Handler<A> = Bag<HandlerFn<A>, A>;
58type CacheIndex = u8;
59
60#[derive(Default, Debug)]
61struct Handlers {
62 progress: Handler<f32>,
63}
64
65#[derive(Debug, Clone, Copy)]
66struct FarmerCacheOffset {
67 cache_index: CacheIndex,
68 piece_offset: PieceCacheOffset,
69}
70
71impl FarmerCacheOffset {
72 fn new(cache_index: CacheIndex, piece_offset: PieceCacheOffset) -> Self {
73 Self {
74 cache_index,
75 piece_offset,
76 }
77 }
78}
79
80#[derive(Debug, Clone)]
81struct CacheBackend {
82 backend: Arc<dyn PieceCache>,
83 used_capacity: u32,
84 total_capacity: u32,
85}
86
87impl std::ops::Deref for CacheBackend {
88 type Target = Arc<dyn PieceCache>;
89
90 fn deref(&self) -> &Self::Target {
91 &self.backend
92 }
93}
94
95impl CacheBackend {
96 fn new(backend: Arc<dyn PieceCache>, total_capacity: u32) -> Self {
97 Self {
98 backend,
99 used_capacity: 0,
100 total_capacity,
101 }
102 }
103
104 fn next_free(&mut self) -> Option<PieceCacheOffset> {
105 let offset = self.used_capacity;
106 if offset < self.total_capacity {
107 self.used_capacity += 1;
108 Some(PieceCacheOffset(offset))
109 } else {
110 debug!(?offset, total_capacity = ?self.total_capacity, "No free space in cache backend");
111 None
112 }
113 }
114
115 fn free_size(&self) -> u32 {
116 self.total_capacity - self.used_capacity
117 }
118}
119
120#[derive(Debug)]
121struct CacheState {
122 cache_stored_pieces: HashMap<KeyWithDistance, FarmerCacheOffset>,
123 cache_free_offsets: Vec<FarmerCacheOffset>,
124 backend: CacheBackend,
125}
126
127#[derive(Debug)]
128enum WorkerCommand {
129 ReplaceBackingCaches {
130 new_piece_caches: Vec<Arc<dyn PieceCache>>,
131 },
132 ForgetKey {
133 key: RecordKey,
134 },
135}
136
137#[derive(Debug)]
139#[must_use = "Farmer cache will not work unless its worker is running"]
140pub struct FarmerCacheWorker<NC>
141where
142 NC: fmt::Debug,
143{
144 peer_id: PeerId,
145 node_client: NC,
146 piece_caches: Arc<AsyncRwLock<PieceCachesState>>,
147 plot_caches: Arc<PlotCaches>,
148 handlers: Arc<Handlers>,
149 worker_receiver: Option<mpsc::Receiver<WorkerCommand>>,
150 metrics: Option<Arc<FarmerCacheMetrics>>,
151}
152
153impl<NC> FarmerCacheWorker<NC>
154where
155 NC: NodeClient,
156{
157 pub async fn run<PG>(mut self, piece_getter: PG)
161 where
162 PG: PieceGetter,
163 {
164 let mut last_segment_index_internal = SegmentIndex::ZERO;
166
167 let mut worker_receiver = self
168 .worker_receiver
169 .take()
170 .expect("Always set during worker instantiation");
171
172 if let Some(WorkerCommand::ReplaceBackingCaches { new_piece_caches }) =
173 worker_receiver.next().await
174 {
175 self.initialize(
176 &piece_getter,
177 &mut last_segment_index_internal,
178 new_piece_caches,
179 )
180 .await;
181 } else {
182 return;
184 }
185
186 let mut super_segment_headers_notifications = match self
187 .node_client
188 .subscribe_new_super_segment_headers()
189 .await
190 {
191 Ok(super_segment_headers_notifications) => super_segment_headers_notifications,
192 Err(error) => {
193 error!(%error, "Failed to subscribe to new super segment headers notifications");
194 return;
195 }
196 };
197
198 self.keep_up_after_initial_sync(&piece_getter, &mut last_segment_index_internal)
200 .await;
201
202 loop {
203 select! {
204 maybe_command = worker_receiver.next() => {
205 let Some(command) = maybe_command else {
206 return;
208 };
209
210 self.handle_command(command, &piece_getter, &mut last_segment_index_internal).await;
211 }
212 maybe_super_segment_header = super_segment_headers_notifications.next().fuse() => {
213 if let Some(super_segment_header) = maybe_super_segment_header {
214 self
215 .process_new_segment_index(
216 &piece_getter,
217 super_segment_header.max_segment_index.as_inner(),
218 &mut last_segment_index_internal
219 )
220 .await;
221 } else {
222 return;
225 }
226 }
227 }
228 }
229 }
230
231 async fn handle_command<PG>(
232 &self,
233 command: WorkerCommand,
234 piece_getter: &PG,
235 last_segment_index_internal: &mut SegmentIndex,
236 ) where
237 PG: PieceGetter,
238 {
239 match command {
240 WorkerCommand::ReplaceBackingCaches { new_piece_caches } => {
241 self.initialize(piece_getter, last_segment_index_internal, new_piece_caches)
242 .await;
243 }
244 WorkerCommand::ForgetKey { key } => {
246 let mut caches = self.piece_caches.write().await;
247 let key = KeyWithDistance::new_with_record_key(self.peer_id, key);
248 let Some(offset) = caches.remove_stored_piece(&key) else {
249 return;
251 };
252
253 let cache_index = offset.cache_index;
254 let piece_offset = offset.piece_offset;
255 let Some(backend) = caches.get_backend(cache_index).cloned() else {
256 return;
258 };
259
260 caches.push_dangling_free_offset(offset);
261 match backend.read_piece_index(piece_offset).await {
262 Ok(Some(piece_index)) => {
263 trace!(%piece_index, %cache_index, %piece_offset, "Forget piece");
264 }
265 Ok(None) => {
266 warn!(
267 %cache_index,
268 %piece_offset,
269 "Piece index out of range, this is likely an implementation bug, \
270 not freeing heap element"
271 );
272 }
273 Err(error) => {
274 error!(
275 %error,
276 %cache_index,
277 ?key,
278 %piece_offset,
279 "Error while reading piece from cache"
280 );
281 }
282 }
283 }
284 }
285 }
286
287 async fn initialize<PG>(
288 &self,
289 piece_getter: &PG,
290 last_segment_index_internal: &mut SegmentIndex,
291 new_piece_caches: Vec<Arc<dyn PieceCache>>,
292 ) where
293 PG: PieceGetter,
294 {
295 info!("Initializing piece cache");
296
297 let (mut stored_pieces, mut dangling_free_offsets) =
299 mem::take(&mut *self.piece_caches.write().await).reuse();
300
301 debug!("Collecting pieces that were in the cache before");
302
303 if let Some(metrics) = &self.metrics {
304 metrics.piece_cache_capacity_total.set(0);
305 metrics.piece_cache_capacity_used.set(0);
306 }
307
308 let peer_id = self.peer_id;
309
310 let piece_caches_number = new_piece_caches.len();
312 let maybe_caches_futures = new_piece_caches
313 .into_iter()
314 .enumerate()
315 .filter_map(|(cache_index, new_cache)| {
316 let total_capacity = new_cache.max_num_elements();
317 let mut backend = CacheBackend::new(new_cache, total_capacity);
318 let Ok(cache_index) = CacheIndex::try_from(cache_index) else {
319 warn!(
320 ?piece_caches_number,
321 "Too many piece caches provided, {cache_index} cache will be ignored",
322 );
323 return None;
324 };
325
326 if let Some(metrics) = &self.metrics {
327 metrics
328 .piece_cache_capacity_total
329 .inc_by(i64::from(total_capacity));
330 }
331
332 let init_fut = async move {
333 let used_capacity = &mut backend.used_capacity;
334
335 let mut maybe_contents = match backend.backend.contents().await {
339 Ok(contents) => Some(contents),
340 Err(error) => {
341 warn!(%error, "Failed to get cache contents");
342
343 None
344 }
345 };
346
347 let mut cache_stored_pieces = HashMap::new();
348 let mut cache_free_offsets = Vec::new();
349
350 let Some(mut contents) = maybe_contents.take() else {
351 drop(maybe_contents);
352
353 return CacheState {
354 cache_stored_pieces,
355 cache_free_offsets,
356 backend,
357 };
358 };
359
360 while let Some(maybe_element_details) = contents.next().await {
361 let (piece_offset, maybe_piece_index) = match maybe_element_details {
362 Ok(element_details) => element_details,
363 Err(error) => {
364 warn!(%error, "Failed to get cache contents element details");
365 break;
366 }
367 };
368 let offset = FarmerCacheOffset::new(cache_index, piece_offset);
369 match maybe_piece_index {
370 Some(piece_index) => {
371 *used_capacity = piece_offset.0 + 1;
372 let record_key = RecordKey::from(piece_index.to_multihash());
373 let key = KeyWithDistance::new_with_record_key(peer_id, record_key);
374 cache_stored_pieces.insert(key, offset);
375 }
376 None => {
377 cache_free_offsets.push(offset);
380 }
381 }
382
383 yield_now().await;
385 }
386
387 drop(maybe_contents);
388 drop(contents);
389
390 CacheState {
391 cache_stored_pieces,
392 cache_free_offsets,
393 backend,
394 }
395 };
396
397 Some(run_future_in_dedicated_thread(
398 move || init_fut.instrument(info_span!("", %cache_index)),
399 format!("piece-cache.{cache_index}"),
400 ))
401 })
402 .collect::<Result<Vec<_>, _>>();
403
404 let caches_futures = match maybe_caches_futures {
405 Ok(caches_futures) => caches_futures,
406 Err(error) => {
407 error!(%error, "Failed to spawn piece cache reading thread");
408
409 return;
410 }
411 };
412
413 let mut backends = Vec::with_capacity(caches_futures.len());
414 let mut caches_futures = caches_futures.into_iter().collect::<FuturesOrdered<_>>();
415
416 while let Some(maybe_cache) = caches_futures.next().await {
417 match maybe_cache {
418 Ok(cache) => {
419 let backend = cache.backend;
420 for (key, cache_offset) in cache.cache_stored_pieces {
421 if let Some(old_cache_offset) = stored_pieces.insert(key, cache_offset) {
422 dangling_free_offsets.push_front(old_cache_offset);
423 }
424 }
425 dangling_free_offsets.extend(
426 cache.cache_free_offsets.into_iter().filter(|free_offset| {
427 free_offset.piece_offset.0 < backend.used_capacity
428 }),
429 );
430 backends.push(backend);
431 }
432 Err(_cancelled) => {
433 error!("Piece cache reading thread panicked");
434
435 return;
436 }
437 }
438 }
439
440 let mut caches = PieceCachesState::new(stored_pieces, dangling_free_offsets, backends);
441
442 info!("Synchronizing piece cache");
443
444 let last_segment_index = loop {
445 match self.node_client.farmer_app_info().await {
446 Ok(farmer_app_info) => {
447 let last_segment_index =
448 farmer_app_info.protocol_info.history_size.segment_index();
449 if !farmer_app_info.syncing || last_segment_index > SegmentIndex::ZERO {
457 break last_segment_index;
458 }
459 }
460 Err(error) => {
461 error!(
462 %error,
463 "Failed to get farmer app info from node, keeping old cache state without \
464 updates"
465 );
466
467 *self.piece_caches.write().await = caches;
469 return;
470 }
471 }
472
473 tokio::time::sleep(INITIAL_SYNC_FARM_INFO_CHECK_INTERVAL).await;
474 };
475
476 debug!(%last_segment_index, "Identified last segment index");
477
478 let segment_indices = Vec::from_iter(SegmentIndex::ZERO..=last_segment_index);
480 let mut piece_indices_to_store = segment_indices
483 .into_par_iter()
484 .flat_map(|segment_index| {
485 segment_index
486 .segment_piece_indexes()
487 .into_par_iter()
488 .map(|piece_index| {
489 (
490 KeyWithDistance::new(self.peer_id, piece_index.to_multihash()),
491 piece_index,
492 )
493 })
494 })
495 .collect::<Vec<_>>();
496
497 piece_indices_to_store.par_sort_unstable_by(|(a_key, _), (b_key, _)| a_key.cmp(b_key));
500
501 let mut piece_indices_to_store = piece_indices_to_store
503 .into_iter()
504 .take(caches.total_capacity())
505 .collect::<HashMap<_, _>>();
506
507 let mut piece_caches_capacity_used = vec![0u32; caches.backends().len()];
508 caches.free_unneeded_stored_pieces(&mut piece_indices_to_store);
512
513 if let Some(metrics) = &self.metrics {
514 for offset in caches.stored_pieces_offsets() {
515 piece_caches_capacity_used[usize::from(offset.cache_index)] += 1;
516 }
517
518 for cache_used in piece_caches_capacity_used {
519 metrics
520 .piece_cache_capacity_used
521 .inc_by(i64::from(cache_used));
522 }
523 }
524
525 self.piece_caches.write().await.clone_from(&caches);
527 let stored_count = caches.stored_pieces_offsets().len();
528
529 debug!(
530 %stored_count,
531 count = %piece_indices_to_store.len(),
532 "Identified piece indices that should be cached",
533 );
534
535 let pieces_to_download_total = piece_indices_to_store.len() + stored_count;
536 let piece_indices_to_store = piece_indices_to_store
537 .into_values()
538 .collect::<Vec<_>>()
539 .chunks(SYNC_BATCH_SIZE)
543 .map(<[PieceIndex]>::to_vec)
544 .collect::<Vec<_>>();
545
546 let downloaded_pieces_count = AtomicUsize::new(stored_count);
547 let caches = Mutex::new(caches);
548 self.handlers.progress.call_simple(&0.0);
549 let batch_count = piece_indices_to_store.len();
550 let piece_indices_to_store = piece_indices_to_store.into_iter().enumerate();
551
552 let downloading_semaphore = &Semaphore::new(SYNC_BATCH_SIZE * SYNC_CONCURRENT_BATCHES);
553 let ignored_cache_indices = &RwLock::new(HashSet::new());
554
555 let downloading_pieces_stream =
556 stream::iter(piece_indices_to_store.map(|(batch, piece_indices)| {
557 let downloaded_pieces_count = &downloaded_pieces_count;
558 let caches = &caches;
559 let num_pieces = piece_indices.len();
560
561 trace!(
562 %num_pieces,
563 %batch,
564 %batch_count,
565 first_piece_index = ?piece_indices.first().expect("chunks are never empty"),
566 last_piece_index = ?piece_indices.last().expect("chunks are never empty"),
567 downloaded_pieces_count = %downloaded_pieces_count.load(Ordering::Relaxed),
568 %pieces_to_download_total,
569 available_permits = %downloading_semaphore.available_permits(),
570 "Started piece cache sync batch",
571 );
572
573 async move {
574 let mut permit = downloading_semaphore
575 .acquire_many(SYNC_BATCH_SIZE as u32)
576 .await
577 .expect("Semaphore is never closed; qed");
578 debug!(%batch, %num_pieces, "Downloading pieces");
579
580 let pieces_stream = match piece_getter.get_pieces(piece_indices).await {
581 Ok(pieces_stream) => pieces_stream,
582 Err(error) => {
583 error!(
584 %error,
585 "Failed to get pieces from piece getter"
586 );
587 return;
588 }
589 };
590 let mut pieces_stream = pieces_stream.enumerate();
591
592 while let Some((index, (piece_index, result))) = pieces_stream.next().await {
593 debug!(%batch, %index, %piece_index, "Downloaded piece");
594 let _permit = permit.split(1);
596
597 let piece = match result {
598 Ok(Some(piece)) => {
599 trace!(%batch, %piece_index, "Downloaded piece successfully");
600 piece
601 }
602 Ok(None) => {
603 debug!(%batch, %piece_index, "Couldn't find piece");
604 continue;
605 }
606 Err(error) => {
607 debug!(
608 %batch,
609 %error,
610 %piece_index,
611 "Failed to get piece for piece cache"
612 );
613 continue;
614 }
615 };
616
617 let (offset, maybe_backend) = {
618 let mut caches = caches.lock();
619
620 let Some(offset) = caches.pop_free_offset() else {
622 error!(
623 %batch,
624 %piece_index,
625 "Failed to store piece in cache, there was no space"
626 );
627 break;
628 };
629
630 (offset, caches.get_backend(offset.cache_index).cloned())
631 };
632
633 let cache_index = offset.cache_index;
634 let piece_offset = offset.piece_offset;
635
636 let skip_write = ignored_cache_indices.read().contains(&cache_index);
637 if skip_write {
638 trace!(
639 %batch,
640 %cache_index,
641 %piece_index,
642 %piece_offset,
643 "Skipping known problematic cache index"
644 );
645 } else {
646 if let Some(backend) = maybe_backend
647 && let Err(error) =
648 backend.write_piece(piece_offset, piece_index, &piece).await
649 {
650 error!(
651 %error,
652 %batch,
653 %cache_index,
654 %piece_index,
655 %piece_offset,
656 "Failed to write piece into cache, ignoring this cache going \
657 forward"
658 );
659 ignored_cache_indices.write().insert(cache_index);
660 continue;
661 }
662
663 let key =
664 KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
665 caches.lock().push_stored_piece(key, offset);
666 }
667
668 let prev_downloaded_pieces_count =
669 downloaded_pieces_count.fetch_add(1, Ordering::Relaxed);
670 if prev_downloaded_pieces_count != pieces_to_download_total {
673 let progress = prev_downloaded_pieces_count as f32
674 / pieces_to_download_total as f32
675 * 100.0;
676 if prev_downloaded_pieces_count
677 .is_multiple_of(INTERMEDIATE_CACHE_UPDATE_INTERVAL)
678 {
679 let mut piece_caches = self.piece_caches.write().await;
680 piece_caches.clone_from(&caches.lock());
681
682 info!(
683 "Piece cache sync {progress:.2}% complete ({} / {})",
684 ByteSize::b(
685 (prev_downloaded_pieces_count * Piece::SIZE) as u64,
686 )
687 .display()
688 .iec(),
689 ByteSize::b((pieces_to_download_total * Piece::SIZE) as u64,)
690 .display()
691 .iec(),
692 );
693 }
694
695 self.handlers.progress.call_simple(&progress);
696 }
697 }
698
699 trace!(
700 %num_pieces,
701 %batch,
702 %batch_count,
703 downloaded_pieces_count = %downloaded_pieces_count.load(Ordering::Relaxed),
704 %pieces_to_download_total,
705 available_permits = %downloading_semaphore.available_permits(),
706 "Finished piece cache sync batch",
707 );
708 }
709 }));
710
711 downloading_pieces_stream
714 .buffer_unordered(SYNC_CONCURRENT_BATCHES * 10)
717 .for_each(|()| async {})
719 .await;
720
721 *self.piece_caches.write().await = caches.into_inner();
722 self.handlers.progress.call_simple(&100.0);
723 *last_segment_index_internal = last_segment_index;
724
725 info!("Finished piece cache synchronization");
726 }
727
728 async fn process_new_segment_index<PG>(
729 &self,
730 piece_getter: &PG,
731 segment_index: SegmentIndex,
732 last_segment_index_internal: &mut SegmentIndex,
733 ) where
734 PG: PieceGetter,
735 {
736 debug!(
737 %last_segment_index_internal,
738 %segment_index,
739 "Starting to process new segment index"
740 );
741
742 let segment_index_range =
743 (*last_segment_index_internal + SegmentIndex::ONE)..=segment_index;
744
745 if segment_index_range.is_empty() {
746 return;
747 }
748
749 debug!(
750 ?segment_index_range,
751 "Downloading potentially useful pieces"
752 );
753
754 let pieces_to_maybe_include = segment_index_range
760 .flat_map(|segment_index| segment_index.segment_piece_indexes())
761 .map(|piece_index| async move {
762 let should_store_in_piece_cache = self
763 .piece_caches
764 .read()
765 .await
766 .should_include_key(self.peer_id, piece_index);
767
768 let key = RecordKey::from(piece_index.to_multihash());
769 let should_store_in_plot_cache =
770 self.plot_caches.should_store(piece_index, &key).await;
771
772 if !(should_store_in_piece_cache || should_store_in_plot_cache) {
773 trace!(%piece_index, "Piece doesn't need to be cached #1");
774
775 return None;
776 }
777
778 let maybe_piece_result =
779 self.node_client
780 .piece(piece_index)
781 .await
782 .inspect_err(|error| {
783 debug!(
784 %error,
785 %segment_index,
786 %piece_index,
787 "Failed to retrieve piece from node right after archiving"
788 );
789 });
790
791 if let Ok(Some(piece)) = maybe_piece_result {
792 return Some((piece_index, piece));
793 }
794
795 match piece_getter.get_piece(piece_index).await {
796 Ok(Some(piece)) => Some((piece_index, piece)),
797 Ok(None) => {
798 warn!(
799 %segment_index,
800 %piece_index,
801 "Failed to retrieve piece right after archiving"
802 );
803
804 None
805 }
806 Err(error) => {
807 warn!(
808 %error,
809 %segment_index,
810 %piece_index,
811 "Failed to retrieve piece right after archiving"
812 );
813
814 None
815 }
816 }
817 })
818 .collect::<FuturesUnordered<_>>()
819 .filter_map(|maybe_piece| async move { maybe_piece })
820 .collect::<Vec<_>>()
821 .await;
822
823 debug!(%segment_index, "Downloaded potentially useful pieces");
824
825 for (piece_index, piece) in pieces_to_maybe_include {
828 if !self
829 .plot_caches
830 .store_additional_piece(piece_index, &piece)
831 .await
832 {
833 trace!(%piece_index, "Piece could not be cached in plot cache");
834 }
835
836 if !self
837 .piece_caches
838 .read()
839 .await
840 .should_include_key(self.peer_id, piece_index)
841 {
842 trace!(%piece_index, "Piece doesn't need to be cached #2");
843
844 continue;
845 }
846
847 trace!(%piece_index, "Piece needs to be cached #1");
848
849 self.persist_piece_in_cache(piece_index, piece).await;
850 }
851
852 *last_segment_index_internal = segment_index;
853
854 debug!(%segment_index, "Finished processing new segment indices");
855 }
856
857 async fn keep_up_after_initial_sync<PG>(
858 &self,
859 piece_getter: &PG,
860 last_segment_index_internal: &mut SegmentIndex,
861 ) where
862 PG: PieceGetter,
863 {
864 let last_segment_index = match self.node_client.farmer_app_info().await {
865 Ok(farmer_app_info) => farmer_app_info.protocol_info.history_size.segment_index(),
866 Err(error) => {
867 error!(
868 %error,
869 "Failed to get farmer app info from node, keeping old cache state without \
870 updates"
871 );
872 return;
873 }
874 };
875
876 if last_segment_index <= *last_segment_index_internal {
877 return;
878 }
879
880 info!(
881 "Syncing piece cache to the latest history size, this may pause block production if \
882 takes too long"
883 );
884
885 let piece_indices = (*last_segment_index_internal..=last_segment_index)
887 .flat_map(|segment_index| segment_index.segment_piece_indexes());
888
889 for piece_index in piece_indices {
891 if !self
892 .piece_caches
893 .read()
894 .await
895 .should_include_key(self.peer_id, piece_index)
896 {
897 trace!(%piece_index, "Piece doesn't need to be cached #3");
898
899 continue;
900 }
901
902 trace!(%piece_index, "Piece needs to be cached #2");
903
904 let result = piece_getter.get_piece(piece_index).await;
905
906 let piece = match result {
907 Ok(Some(piece)) => piece,
908 Ok(None) => {
909 debug!(%piece_index, "Couldn't find piece");
910 continue;
911 }
912 Err(error) => {
913 debug!(
914 %error,
915 %piece_index,
916 "Failed to get piece for piece cache"
917 );
918 continue;
919 }
920 };
921
922 self.persist_piece_in_cache(piece_index, piece).await;
923 }
924
925 info!("Finished syncing piece cache to the latest history size");
926
927 *last_segment_index_internal = last_segment_index;
928 }
929
930 async fn persist_piece_in_cache(&self, piece_index: PieceIndex, piece: Piece) {
933 let key = KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
934 let mut caches = self.piece_caches.write().await;
935 if let Some((old_key, offset)) = caches.should_replace(&key) {
936 let cache_index = offset.cache_index;
938 let piece_offset = offset.piece_offset;
939 let Some(backend) = caches.get_backend(cache_index) else {
940 warn!(
942 %cache_index,
943 %piece_index,
944 "Should have a cached backend, but it didn't exist, this is an \
945 implementation bug"
946 );
947 return;
948 };
949 if let Err(error) = backend.write_piece(piece_offset, piece_index, &piece).await {
950 error!(
951 %error,
952 %cache_index,
953 %piece_index,
954 %piece_offset,
955 "Failed to write piece into cache"
956 );
957 } else {
958 let old_piece_index = decode_piece_index_from_record_key(old_key.record_key());
959 trace!(
960 %cache_index,
961 %old_piece_index,
962 %piece_index,
963 %piece_offset,
964 "Successfully replaced old cached piece"
965 );
966 caches.push_stored_piece(key, offset);
967 }
968 } else {
969 let Some(offset) = caches.pop_free_offset() else {
972 warn!(
973 %piece_index,
974 "Should have inserted piece into cache, but it didn't happen, this is an \
975 implementation bug"
976 );
977 return;
978 };
979 let cache_index = offset.cache_index;
980 let piece_offset = offset.piece_offset;
981 let Some(backend) = caches.get_backend(cache_index) else {
982 warn!(
984 %cache_index,
985 %piece_index,
986 "Should have a cached backend, but it didn't exist, this is an \
987 implementation bug"
988 );
989 return;
990 };
991
992 if let Err(error) = backend.write_piece(piece_offset, piece_index, &piece).await {
993 error!(
994 %error,
995 %cache_index,
996 %piece_index,
997 %piece_offset,
998 "Failed to write piece into cache"
999 );
1000 } else {
1001 trace!(
1002 %cache_index,
1003 %piece_index,
1004 %piece_offset,
1005 "Successfully stored piece in cache"
1006 );
1007 if let Some(metrics) = &self.metrics {
1008 metrics.piece_cache_capacity_used.inc();
1009 }
1010 caches.push_stored_piece(key, offset);
1011 }
1012 }
1013 }
1014}
1015
1016#[derive(Debug)]
1017struct PlotCaches {
1018 caches: AsyncRwLock<Vec<Arc<dyn PlotCache>>>,
1020 next_plot_cache: AtomicUsize,
1022}
1023
1024impl PlotCaches {
1025 async fn should_store(&self, piece_index: PieceIndex, key: &RecordKey) -> bool {
1031 for (cache_index, cache) in self.caches.read().await.iter().enumerate() {
1032 match cache.is_piece_maybe_stored(key).await {
1033 Ok(MaybePieceStoredResult::No) => {
1034 }
1036 Ok(MaybePieceStoredResult::Vacant) => {
1037 return true;
1038 }
1039 Ok(MaybePieceStoredResult::Yes) => {
1040 return false;
1042 }
1043 Err(error) => {
1044 warn!(
1045 %cache_index,
1046 %piece_index,
1047 %error,
1048 "Failed to check piece stored in cache"
1049 );
1050 }
1051 }
1052 }
1053
1054 false
1055 }
1056
1057 async fn store_additional_piece(&self, piece_index: PieceIndex, piece: &Piece) -> bool {
1061 let plot_caches = self.caches.read().await;
1062 let plot_caches_len = plot_caches.len();
1063
1064 for _ in 0..plot_caches_len {
1066 let plot_cache_index =
1067 self.next_plot_cache.fetch_add(1, Ordering::Relaxed) % plot_caches_len;
1068
1069 match plot_caches[plot_cache_index]
1070 .try_store_piece(piece_index, piece)
1071 .await
1072 {
1073 Ok(true) => {
1074 return true;
1075 }
1076 Ok(false) => {
1077 }
1079 Err(error) => {
1080 error!(
1081 %error,
1082 %piece_index,
1083 %plot_cache_index,
1084 "Failed to store additional piece in cache"
1085 );
1086 }
1087 }
1088 }
1089
1090 false
1091 }
1092}
1093
1094#[derive(Debug, Clone)]
1105pub struct FarmerCache {
1106 peer_id: PeerId,
1107 piece_caches: Arc<AsyncRwLock<PieceCachesState>>,
1109 plot_caches: Arc<PlotCaches>,
1111 handlers: Arc<Handlers>,
1112 worker_sender: mpsc::Sender<WorkerCommand>,
1114 metrics: Option<Arc<FarmerCacheMetrics>>,
1115}
1116
1117impl FarmerCache {
1118 pub fn new<NC>(
1123 node_client: NC,
1124 peer_id: PeerId,
1125 registry: Option<&mut Registry>,
1126 ) -> (Self, FarmerCacheWorker<NC>)
1127 where
1128 NC: NodeClient,
1129 {
1130 let caches = Arc::default();
1131 let (worker_sender, worker_receiver) = mpsc::channel(WORKER_CHANNEL_CAPACITY);
1132 let handlers = Arc::new(Handlers::default());
1133
1134 let plot_caches = Arc::new(PlotCaches {
1135 caches: AsyncRwLock::default(),
1136 next_plot_cache: AtomicUsize::new(0),
1137 });
1138 let metrics = registry.map(|registry| Arc::new(FarmerCacheMetrics::new(registry)));
1139
1140 let instance = Self {
1141 peer_id,
1142 piece_caches: Arc::clone(&caches),
1143 plot_caches: Arc::clone(&plot_caches),
1144 handlers: Arc::clone(&handlers),
1145 worker_sender,
1146 metrics: metrics.clone(),
1147 };
1148 let worker = FarmerCacheWorker {
1149 peer_id,
1150 node_client,
1151 piece_caches: caches,
1152 plot_caches,
1153 handlers,
1154 worker_receiver: Some(worker_receiver),
1155 metrics,
1156 };
1157
1158 (instance, worker)
1159 }
1160
1161 pub async fn get_piece<Key>(&self, key: Key) -> Option<Piece>
1163 where
1164 RecordKey: From<Key>,
1165 {
1166 let key = RecordKey::from(key);
1167 let maybe_piece_found = {
1168 let key = KeyWithDistance::new_with_record_key(self.peer_id, key.clone());
1169 let caches = self.piece_caches.read().await;
1170
1171 caches.get_stored_piece(&key).and_then(|offset| {
1172 let cache_index = offset.cache_index;
1173 let piece_offset = offset.piece_offset;
1174 Some((
1175 piece_offset,
1176 cache_index,
1177 caches.get_backend(cache_index)?.clone(),
1178 ))
1179 })
1180 };
1181
1182 if let Some((piece_offset, cache_index, backend)) = maybe_piece_found {
1183 match backend.read_piece(piece_offset).await {
1184 Ok(maybe_piece) => {
1185 return if let Some((_piece_index, piece)) = maybe_piece {
1186 if let Some(metrics) = &self.metrics {
1187 metrics.cache_get_hit.inc();
1188 }
1189 Some(piece)
1190 } else {
1191 error!(
1192 %cache_index,
1193 %piece_offset,
1194 ?key,
1195 "Piece was expected to be in cache, but wasn't found there"
1196 );
1197 if let Some(metrics) = &self.metrics {
1198 metrics.cache_get_error.inc();
1199 }
1200 None
1201 };
1202 }
1203 Err(error) => {
1204 error!(
1205 %error,
1206 %cache_index,
1207 ?key,
1208 %piece_offset,
1209 "Error while reading piece from cache"
1210 );
1211
1212 if let Err(error) = self
1213 .worker_sender
1214 .clone()
1215 .send(WorkerCommand::ForgetKey { key })
1216 .await
1217 {
1218 trace!(%error, "Failed to send ForgetKey command to worker");
1219 }
1220
1221 if let Some(metrics) = &self.metrics {
1222 metrics.cache_get_error.inc();
1223 }
1224 return None;
1225 }
1226 }
1227 }
1228
1229 for cache in self.plot_caches.caches.read().await.iter() {
1230 if let Ok(Some(piece)) = cache.read_piece(&key).await {
1231 if let Some(metrics) = &self.metrics {
1232 metrics.cache_get_hit.inc();
1233 }
1234 return Some(piece);
1235 }
1236 }
1237
1238 if let Some(metrics) = &self.metrics {
1239 metrics.cache_get_miss.inc();
1240 }
1241 None
1242 }
1243
1244 pub async fn get_pieces<'a, PieceIndices>(
1248 &'a self,
1249 piece_indices: PieceIndices,
1250 ) -> impl Stream<Item = (PieceIndex, Option<Piece>)> + Send + Unpin + 'a
1251 where
1252 PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send + 'a> + Send + 'a,
1253 {
1254 let mut pieces_to_get_from_plot_cache = Vec::new();
1255
1256 let pieces_to_read_from_piece_cache = {
1257 let caches = self.piece_caches.read().await;
1258 let mut pieces_to_read_from_piece_cache =
1260 HashMap::<CacheIndex, (CacheBackend, HashMap<_, _>)>::new();
1261
1262 for piece_index in piece_indices {
1263 let key = RecordKey::from(piece_index.to_multihash());
1264
1265 let Some(offset) = caches.get_stored_piece(&KeyWithDistance::new_with_record_key(
1266 self.peer_id,
1267 key.clone(),
1268 )) else {
1269 pieces_to_get_from_plot_cache.push((piece_index, key));
1270 continue;
1271 };
1272
1273 let cache_index = offset.cache_index;
1274 let piece_offset = offset.piece_offset;
1275
1276 match pieces_to_read_from_piece_cache.entry(cache_index) {
1277 Entry::Occupied(mut entry) => {
1278 let (_backend, pieces) = entry.get_mut();
1279 pieces.insert(piece_offset, (piece_index, key));
1280 }
1281 Entry::Vacant(entry) => {
1282 let backend = if let Some(backend) = caches.get_backend(cache_index) {
1283 backend.clone()
1284 } else {
1285 pieces_to_get_from_plot_cache.push((piece_index, key));
1286 continue;
1287 };
1288 entry
1289 .insert((backend, HashMap::from([(piece_offset, (piece_index, key))])));
1290 }
1291 }
1292 }
1293
1294 pieces_to_read_from_piece_cache
1295 };
1296
1297 let (tx, mut rx) = mpsc::unbounded();
1298
1299 let fut = async move {
1300 let tx = &tx;
1301
1302 let mut reading_from_piece_cache = pieces_to_read_from_piece_cache
1303 .into_iter()
1304 .map(|(cache_index, (backend, mut pieces_to_get))| async move {
1305 let mut pieces_stream = match backend
1306 .read_pieces(Box::new(
1307 pieces_to_get
1308 .keys()
1309 .copied()
1310 .collect::<Vec<_>>()
1311 .into_iter(),
1312 ))
1313 .await
1314 {
1315 Ok(pieces_stream) => pieces_stream,
1316 Err(error) => {
1317 error!(
1318 %error,
1319 %cache_index,
1320 "Error while reading pieces from cache"
1321 );
1322
1323 if let Some(metrics) = &self.metrics {
1324 metrics.cache_get_error.inc_by(pieces_to_get.len() as u64);
1325 }
1326 for (piece_index, _key) in pieces_to_get.into_values() {
1327 tx.unbounded_send((piece_index, None)).expect(
1328 "This future isn't polled after receiver is dropped; qed",
1329 );
1330 }
1331 return;
1332 }
1333 };
1334
1335 while let Some(maybe_piece) = pieces_stream.next().await {
1336 let result = match maybe_piece {
1337 Ok((piece_offset, Some((piece_index, piece)))) => {
1338 pieces_to_get.remove(&piece_offset);
1339
1340 if let Some(metrics) = &self.metrics {
1341 metrics.cache_get_hit.inc();
1342 }
1343 (piece_index, Some(piece))
1344 }
1345 Ok((piece_offset, None)) => {
1346 let Some((piece_index, key)) = pieces_to_get.remove(&piece_offset)
1347 else {
1348 debug!(
1349 %cache_index,
1350 %piece_offset,
1351 "Received piece offset that was not expected"
1352 );
1353 continue;
1354 };
1355
1356 error!(
1357 %cache_index,
1358 %piece_index,
1359 %piece_offset,
1360 ?key,
1361 "Piece was expected to be in cache, but wasn't found there"
1362 );
1363 if let Some(metrics) = &self.metrics {
1364 metrics.cache_get_error.inc();
1365 }
1366 (piece_index, None)
1367 }
1368 Err(error) => {
1369 error!(
1370 %error,
1371 %cache_index,
1372 "Error while reading piece from cache"
1373 );
1374
1375 if let Some(metrics) = &self.metrics {
1376 metrics.cache_get_error.inc();
1377 }
1378 continue;
1379 }
1380 };
1381
1382 tx.unbounded_send(result)
1383 .expect("This future isn't polled after receiver is dropped; qed");
1384 }
1385
1386 if pieces_to_get.is_empty() {
1387 return;
1388 }
1389
1390 if let Some(metrics) = &self.metrics {
1391 metrics.cache_get_error.inc_by(pieces_to_get.len() as u64);
1392 }
1393 for (piece_offset, (piece_index, key)) in pieces_to_get {
1394 error!(
1395 %cache_index,
1396 %piece_index,
1397 %piece_offset,
1398 ?key,
1399 "Piece cache didn't return an entry for offset"
1400 );
1401
1402 tx.unbounded_send((piece_index, None))
1405 .expect("This future isn't polled after receiver is dropped; qed");
1406 }
1407 })
1408 .collect::<FuturesUnordered<_>>();
1409 let reading_from_piece_cache_fut = async move {
1415 while let Some(()) = reading_from_piece_cache.next().await {
1416 }
1418 };
1419
1420 let reading_from_plot_cache_fut = async {
1421 if pieces_to_get_from_plot_cache.is_empty() {
1422 return;
1423 }
1424
1425 for cache in self.plot_caches.caches.read().await.iter() {
1426 for offset in (0..pieces_to_get_from_plot_cache.len()).rev() {
1429 let (piece_index, key) = &pieces_to_get_from_plot_cache[offset];
1430
1431 if let Ok(Some(piece)) = cache.read_piece(key).await {
1432 if let Some(metrics) = &self.metrics {
1433 metrics.cache_get_hit.inc();
1434 }
1435 tx.unbounded_send((*piece_index, Some(piece)))
1436 .expect("This future isn't polled after receiver is dropped; qed");
1437
1438 pieces_to_get_from_plot_cache.swap_remove(offset);
1441 }
1442 }
1443
1444 if pieces_to_get_from_plot_cache.is_empty() {
1445 return;
1446 }
1447 }
1448
1449 if let Some(metrics) = &self.metrics {
1450 metrics
1451 .cache_get_miss
1452 .inc_by(pieces_to_get_from_plot_cache.len() as u64);
1453 }
1454
1455 for (piece_index, _key) in pieces_to_get_from_plot_cache {
1456 tx.unbounded_send((piece_index, None))
1457 .expect("This future isn't polled after receiver is dropped; qed");
1458 }
1459 };
1460
1461 join!(reading_from_piece_cache_fut, reading_from_plot_cache_fut).await
1462 };
1463 let mut fut = Box::pin(fut.fuse());
1464
1465 stream::poll_fn(move |cx| {
1467 if !fut.is_terminated() {
1468 let _: Poll<_> = fut.poll_unpin(cx);
1470 }
1471
1472 if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) {
1473 return Poll::Ready(maybe_result);
1474 }
1475
1476 Poll::Pending
1478 })
1479 }
1480
1481 pub async fn has_pieces(&self, mut piece_indices: Vec<PieceIndex>) -> Vec<PieceIndex> {
1483 let mut pieces_to_find = HashMap::<PieceIndex, RecordKey>::from_iter(
1484 piece_indices
1485 .iter()
1486 .map(|piece_index| (*piece_index, RecordKey::from(piece_index.to_multihash()))),
1487 );
1488
1489 {
1491 let piece_caches = self.piece_caches.read().await;
1492 pieces_to_find.retain(|_piece_index, key| {
1493 let distance_key = KeyWithDistance::new(self.peer_id, key.clone());
1494 !piece_caches.contains_stored_piece(&distance_key)
1495 });
1496 }
1497
1498 if pieces_to_find.is_empty() {
1500 return piece_indices;
1501 }
1502
1503 if let Some(plot_caches) = self.plot_caches.caches.try_read() {
1505 let plot_caches = &plot_caches;
1506 let not_found = pieces_to_find
1507 .into_iter()
1508 .map(|(piece_index, key)| async move {
1509 let key = &key;
1510
1511 let found = plot_caches
1512 .iter()
1513 .map(|plot_cache| async {
1514 matches!(
1515 plot_cache.is_piece_maybe_stored(key).await,
1516 Ok(MaybePieceStoredResult::Yes)
1517 )
1518 })
1519 .collect::<FuturesUnordered<_>>()
1520 .any(|found| async move { found })
1521 .await;
1522
1523 if found { None } else { Some(piece_index) }
1524 })
1525 .collect::<FuturesUnordered<_>>()
1526 .filter_map(|maybe_piece_index| async move { maybe_piece_index })
1527 .collect::<HashSet<_>>()
1528 .await;
1529 piece_indices.retain(|piece_index| !not_found.contains(piece_index));
1530 }
1531 piece_indices
1532 }
1533
1534 pub async fn find_piece(
1536 &self,
1537 piece_index: PieceIndex,
1538 ) -> Option<(PieceCacheId, PieceCacheOffset)> {
1539 let caches = self.piece_caches.read().await;
1540
1541 self.find_piece_internal(&caches, piece_index)
1542 }
1543
1544 pub async fn find_pieces<PieceIndices>(
1546 &self,
1547 piece_indices: PieceIndices,
1548 ) -> Vec<(PieceIndex, PieceCacheId, PieceCacheOffset)>
1549 where
1550 PieceIndices: IntoIterator<Item = PieceIndex>,
1551 {
1552 let caches = self.piece_caches.read().await;
1553
1554 piece_indices
1555 .into_iter()
1556 .filter_map(|piece_index| {
1557 self.find_piece_internal(&caches, piece_index)
1558 .map(|(cache_id, piece_offset)| (piece_index, cache_id, piece_offset))
1559 })
1560 .collect()
1561 }
1562
1563 fn find_piece_internal(
1564 &self,
1565 caches: &PieceCachesState,
1566 piece_index: PieceIndex,
1567 ) -> Option<(PieceCacheId, PieceCacheOffset)> {
1568 let key = KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
1569
1570 let Some(offset) = caches.get_stored_piece(&key) else {
1571 if let Some(metrics) = &self.metrics {
1572 metrics.cache_find_miss.inc();
1573 }
1574
1575 return None;
1576 };
1577 let piece_offset = offset.piece_offset;
1578
1579 if let Some(backend) = caches.get_backend(offset.cache_index) {
1580 if let Some(metrics) = &self.metrics {
1581 metrics.cache_find_hit.inc();
1582 }
1583 return Some((*backend.id(), piece_offset));
1584 }
1585
1586 if let Some(metrics) = &self.metrics {
1587 metrics.cache_find_miss.inc();
1588 }
1589 None
1590 }
1591
1592 pub async fn maybe_store_additional_piece(
1596 &self,
1597 piece_index: PieceIndex,
1598 piece: &Piece,
1599 ) -> bool {
1600 let key = RecordKey::from(piece_index.to_multihash());
1601
1602 let should_store = self.plot_caches.should_store(piece_index, &key).await;
1603
1604 if !should_store {
1605 return false;
1606 }
1607
1608 self.plot_caches
1609 .store_additional_piece(piece_index, piece)
1610 .await
1611 }
1612
1613 pub async fn replace_backing_caches(
1615 &self,
1616 new_piece_caches: Vec<Arc<dyn PieceCache>>,
1617 new_plot_caches: Vec<Arc<dyn PlotCache>>,
1618 ) {
1619 if let Err(error) = self
1620 .worker_sender
1621 .clone()
1622 .send(WorkerCommand::ReplaceBackingCaches { new_piece_caches })
1623 .await
1624 {
1625 warn!(%error, "Failed to replace backing caches, worker exited");
1626 }
1627
1628 *self.plot_caches.caches.write().await = new_plot_caches;
1629 }
1630
1631 pub fn on_sync_progress(&self, callback: HandlerFn<f32>) -> HandlerId {
1633 self.handlers.progress.add(callback)
1634 }
1635}
1636
1637#[derive(Debug, Clone)]
1639pub struct FarmerCaches {
1640 caches: Arc<[FarmerCache]>,
1641}
1642
1643impl From<Arc<[FarmerCache]>> for FarmerCaches {
1644 fn from(caches: Arc<[FarmerCache]>) -> Self {
1645 Self { caches }
1646 }
1647}
1648
1649impl From<FarmerCache> for FarmerCaches {
1650 fn from(cache: FarmerCache) -> Self {
1651 Self {
1652 caches: Arc::new([cache]),
1653 }
1654 }
1655}
1656
1657impl FarmerCaches {
1658 pub async fn get_piece<Key>(&self, key: Key) -> Option<Piece>
1660 where
1661 RecordKey: From<Key>,
1662 {
1663 let farmer_cache = self.caches.choose(&mut rand::rng())?;
1664 farmer_cache.get_piece(key).await
1665 }
1666
1667 pub async fn get_pieces<'a, PieceIndices>(
1671 &'a self,
1672 piece_indices: PieceIndices,
1673 ) -> impl Stream<Item = (PieceIndex, Option<Piece>)> + Send + Unpin + 'a
1674 where
1675 PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send + 'a> + Send + 'a,
1676 {
1677 let Some(farmer_cache) = self.caches.choose(&mut rand::rng()) else {
1678 return Either::Left(stream::iter(
1679 piece_indices
1680 .into_iter()
1681 .map(|piece_index| (piece_index, None)),
1682 ));
1683 };
1684
1685 Either::Right(farmer_cache.get_pieces(piece_indices).await)
1686 }
1687
1688 pub async fn has_pieces(&self, piece_indices: Vec<PieceIndex>) -> Vec<PieceIndex> {
1690 let Some(farmer_cache) = self.caches.choose(&mut rand::rng()) else {
1691 return Vec::new();
1692 };
1693
1694 farmer_cache.has_pieces(piece_indices).await
1695 }
1696
1697 pub async fn maybe_store_additional_piece(
1701 &self,
1702 piece_index: PieceIndex,
1703 piece: &Piece,
1704 ) -> bool {
1705 self.caches
1707 .iter()
1708 .map(|farmer_cache| farmer_cache.maybe_store_additional_piece(piece_index, piece))
1709 .collect::<FuturesUnordered<_>>()
1710 .fold::<bool, _, _>(false, |acc, stored| async move { acc || stored })
1711 .await
1712 }
1713}
1714
1715fn decode_piece_index_from_record_key(key: &RecordKey) -> PieceIndex {
1717 let len = key.as_ref().len();
1718 let s = len - PieceIndex::SIZE;
1719
1720 let mut piece_index_bytes = [0u8; PieceIndex::SIZE];
1721 piece_index_bytes.copy_from_slice(&key.as_ref()[s..]);
1722
1723 PieceIndex::from_bytes(piece_index_bytes)
1724}