Skip to main content

ab_farmer_components/
plotting.rs

1//! Plotting utilities
2//!
3//! This module contains functions and data structures that can be used for plotting purposes
4//! (primarily with CPU).
5//!
6//! Plotted sectors can be written to plot and later [`read`](crate::reading) and/or
7//! [`audited`](crate::auditing)/[`proven`](crate::proving) using other modules of this crate.
8
9use crate::FarmerProtocolInfo;
10use crate::sector::{
11    FoundProofs, RawSector, RecordMetadata, SectorContentsMap, SectorMetadata,
12    SectorMetadataChecksummed, sector_record_chunks_size, sector_size,
13};
14use crate::segment_reconstruction::recover_missing_piece;
15use ab_core_primitives::hashes::Blake3Hash;
16use ab_core_primitives::pieces::{Piece, PieceIndex, PieceOffset, Record, RecordChunk};
17use ab_core_primitives::pos::PosSeed;
18use ab_core_primitives::sectors::{SBucket, SectorId, SectorIndex};
19use ab_core_primitives::segments::HistorySize;
20use ab_core_primitives::solutions::ShardCommitmentHash;
21use ab_data_retrieval::piece_getter::PieceGetter;
22use ab_erasure_coding::ErasureCoding;
23use ab_proof_of_space::{Table, TableGenerator};
24use async_lock::{Mutex as AsyncMutex, Semaphore};
25use backon::{ExponentialBuilder, Retryable};
26use futures::stream::FuturesUnordered;
27use futures::{StreamExt, select};
28use parity_scale_codec::{Decode, Encode};
29use parking_lot::Mutex;
30use std::collections::HashMap;
31use std::simd::Simd;
32use std::sync::Arc;
33use std::sync::atomic::{AtomicBool, Ordering};
34use std::time::Duration;
35use thiserror::Error;
36use tracing::{debug, trace, warn};
37
38const RECONSTRUCTION_CONCURRENCY_LIMIT: usize = 1;
39
40/// Information about sector that was plotted
41#[derive(Debug, Clone, Encode, Decode)]
42pub struct PlottedSector {
43    /// Sector ID
44    pub sector_id: SectorId,
45    /// Sector index
46    pub sector_index: SectorIndex,
47    /// Sector metadata
48    pub sector_metadata: SectorMetadataChecksummed,
49    /// Indexes of pieces that were plotted
50    pub piece_indexes: Vec<PieceIndex>,
51}
52
53/// Plotting status
54#[derive(Debug, Error)]
55pub enum PlottingError {
56    /// Records encoder error
57    #[error("Records encoder error: {error}")]
58    RecordsEncoderError {
59        /// Lower-level error
60        error: anyhow::Error,
61    },
62    /// Bad sector output size
63    #[error("Bad sector output size: provided {provided}, expected {expected}")]
64    BadSectorOutputSize {
65        /// Actual size
66        provided: usize,
67        /// Expected size
68        expected: usize,
69    },
70    /// Can't recover missing piece
71    #[error("Can't recover missing piece {piece_index}: {error}")]
72    PieceRecoveryFailed {
73        /// Piece index
74        piece_index: PieceIndex,
75        /// Lower-level error
76        error: anyhow::Error,
77    },
78    /// Failed to retrieve piece
79    #[error("Failed to retrieve pieces: {error}")]
80    FailedToRetrievePieces {
81        /// Lower-level error
82        error: anyhow::Error,
83    },
84    /// Abort early
85    #[error("Abort early")]
86    AbortEarly,
87}
88
89/// Options for plotting a sector.
90///
91/// Sector output and sector metadata output should be either empty (in which case they'll be
92/// resized to correct size automatically) or correctly sized from the beginning or else error will
93/// be returned.
94#[derive(Debug)]
95pub struct PlotSectorOptions<'a, RE, PG> {
96    /// Public key corresponding to sector
97    pub public_key_hash: &'a Blake3Hash,
98    /// Root of the Merkle Tree of shard commitments
99    pub shard_commitments_root: &'a ShardCommitmentHash,
100    /// Sector index
101    pub sector_index: SectorIndex,
102    /// Getter for pieces of archival history
103    pub piece_getter: &'a PG,
104    /// Farmer protocol info
105    pub farmer_protocol_info: FarmerProtocolInfo,
106    /// Erasure coding instance
107    pub erasure_coding: &'a ErasureCoding,
108    /// How many pieces should sector contain
109    pub pieces_in_sector: u16,
110    /// Where plotted sector should be written, vector must either be empty (in which case it'll be
111    /// resized to correct size automatically) or correctly sized from the beginning
112    pub sector_output: &'a mut Vec<u8>,
113    /// Semaphore for part of the plotting when farmer downloads new sector, allows to limit memory
114    /// usage of the plotting process, permit will be held until the end of the plotting process
115    pub downloading_semaphore: Option<Arc<Semaphore>>,
116    /// Semaphore for part of the plotting when farmer encodes downloaded sector, should typically
117    /// allow one permit at a time for efficient CPU utilization
118    pub encoding_semaphore: Option<&'a Semaphore>,
119    /// Proof of space table generators
120    pub records_encoder: &'a mut RE,
121    /// Whether encoding should be aborted early
122    pub abort_early: &'a AtomicBool,
123}
124
125/// Plot a single sector.
126///
127/// This is a convenient wrapper around [`download_sector`] and [`encode_sector`] functions.
128///
129/// NOTE: Even though this function is async, it has blocking code inside and must be running in a
130/// separate thread in order to prevent blocking an executor.
131pub async fn plot_sector<RE, PG>(
132    options: PlotSectorOptions<'_, RE, PG>,
133) -> Result<PlottedSector, PlottingError>
134where
135    RE: RecordsEncoder,
136    PG: PieceGetter + Send + Sync,
137{
138    let PlotSectorOptions {
139        public_key_hash,
140        shard_commitments_root,
141        sector_index,
142        piece_getter,
143        farmer_protocol_info,
144        erasure_coding,
145        pieces_in_sector,
146        sector_output,
147        downloading_semaphore,
148        encoding_semaphore,
149        records_encoder,
150        abort_early,
151    } = options;
152
153    let _downloading_permit = match downloading_semaphore {
154        Some(downloading_semaphore) => Some(downloading_semaphore.acquire_arc().await),
155        None => None,
156    };
157
158    let download_sector_fut = download_sector(DownloadSectorOptions {
159        public_key_hash,
160        shard_commitments_root,
161        sector_index,
162        piece_getter,
163        farmer_protocol_info,
164        erasure_coding,
165        pieces_in_sector,
166    });
167
168    let _encoding_permit = match encoding_semaphore {
169        Some(encoding_semaphore) => Some(encoding_semaphore.acquire().await),
170        None => None,
171    };
172
173    let encoded_sector = encode_sector(
174        download_sector_fut.await?,
175        EncodeSectorOptions::<RE> {
176            sector_index,
177            records_encoder,
178            abort_early,
179        },
180    )?;
181
182    if abort_early.load(Ordering::Acquire) {
183        return Err(PlottingError::AbortEarly);
184    }
185
186    write_sector(&encoded_sector, sector_output)?;
187
188    Ok(encoded_sector.plotted_sector)
189}
190
191/// Opaque sector downloading result and ready for writing
192#[derive(Debug)]
193pub struct DownloadedSector {
194    sector_id: SectorId,
195    piece_indices: Vec<PieceIndex>,
196    raw_sector: RawSector,
197    history_size: HistorySize,
198}
199
200/// Options for sector downloading
201#[derive(Debug)]
202pub struct DownloadSectorOptions<'a, PG> {
203    /// Public key corresponding to sector
204    pub public_key_hash: &'a Blake3Hash,
205    /// Root of the Merkle Tree of shard commitments
206    pub shard_commitments_root: &'a ShardCommitmentHash,
207    /// Sector index
208    pub sector_index: SectorIndex,
209    /// Getter for pieces of archival history
210    pub piece_getter: &'a PG,
211    /// Farmer protocol info
212    pub farmer_protocol_info: FarmerProtocolInfo,
213    /// Erasure coding instance
214    pub erasure_coding: &'a ErasureCoding,
215    /// How many pieces should sector contain
216    pub pieces_in_sector: u16,
217}
218
219/// Download sector for plotting.
220///
221/// This will identify necessary pieces and download them using provided piece getter, after which
222/// they can be encoded using [`encode_sector`] and written to the plot.
223pub async fn download_sector<PG>(
224    options: DownloadSectorOptions<'_, PG>,
225) -> Result<DownloadedSector, PlottingError>
226where
227    PG: PieceGetter + Send + Sync,
228{
229    let DownloadSectorOptions {
230        public_key_hash,
231        shard_commitments_root,
232        sector_index,
233        piece_getter,
234        farmer_protocol_info,
235        erasure_coding,
236        pieces_in_sector,
237    } = options;
238
239    let sector_id = SectorId::new(
240        public_key_hash,
241        shard_commitments_root,
242        sector_index,
243        farmer_protocol_info.history_size,
244    );
245
246    let piece_indices = (PieceOffset::ZERO..)
247        .take(pieces_in_sector.into())
248        .map(|piece_offset| {
249            sector_id.derive_piece_index(
250                piece_offset,
251                farmer_protocol_info.history_size,
252                farmer_protocol_info.max_pieces_in_sector,
253                farmer_protocol_info.recent_segments,
254                farmer_protocol_info.recent_history_fraction,
255            )
256        })
257        .collect::<Vec<_>>();
258
259    let raw_sector = {
260        let mut raw_sector = RawSector::new(pieces_in_sector);
261        let mut pieces_to_download =
262            HashMap::<PieceIndex, Vec<_>>::with_capacity(usize::from(pieces_in_sector));
263        for (piece_index, (record, metadata)) in piece_indices
264            .iter()
265            .copied()
266            .zip(raw_sector.records.iter_mut().zip(&mut raw_sector.metadata))
267        {
268            pieces_to_download
269                .entry(piece_index)
270                .or_default()
271                .push((record, metadata));
272        }
273        // This map will be mutated, removing piece indices we have already processed
274        let pieces_to_download = AsyncMutex::new(pieces_to_download);
275
276        (|| async {
277            let mut pieces_to_download = pieces_to_download.lock().await;
278
279            if let Err(error) =
280                download_sector_internal(&mut pieces_to_download, piece_getter, erasure_coding)
281                    .await
282            {
283                warn!(
284                    %sector_index,
285                    %error,
286                    %pieces_in_sector,
287                    remaining_pieces = %pieces_to_download.len(),
288                    "Sector downloading attempt failed, will retry later"
289                );
290
291                return Err(error);
292            }
293
294            debug!(%sector_index, "Sector downloaded successfully");
295
296            Ok(())
297        })
298        .retry(
299            ExponentialBuilder::default()
300                .with_min_delay(Duration::from_secs(15))
301                .with_max_delay(Duration::from_mins(10))
302                // Try until we get a valid piece
303                .without_max_times(),
304        )
305        .await?;
306
307        raw_sector
308    };
309
310    Ok(DownloadedSector {
311        sector_id,
312        piece_indices,
313        raw_sector,
314        history_size: farmer_protocol_info.history_size,
315    })
316}
317
318/// Records encoder for plotting purposes
319pub trait RecordsEncoder {
320    /// Encode provided sector records
321    fn encode_records(
322        &mut self,
323        sector_id: &SectorId,
324        records: &mut [Record],
325        abort_early: &AtomicBool,
326    ) -> anyhow::Result<SectorContentsMap>;
327}
328
329/// CPU implementation of [`RecordsEncoder`]
330#[derive(Debug)]
331pub struct CpuRecordsEncoder<'a, PosTable>
332where
333    PosTable: Table,
334{
335    table_generators: &'a [PosTable::Generator],
336    erasure_coding: &'a ErasureCoding,
337    global_mutex: &'a AsyncMutex<()>,
338}
339
340impl<PosTable> RecordsEncoder for CpuRecordsEncoder<'_, PosTable>
341where
342    PosTable: Table,
343{
344    fn encode_records(
345        &mut self,
346        sector_id: &SectorId,
347        records: &mut [Record],
348        abort_early: &AtomicBool,
349    ) -> anyhow::Result<SectorContentsMap> {
350        if self.table_generators.is_empty() {
351            return Err(anyhow::anyhow!("No table generators"));
352        }
353
354        let pieces_in_sector = records
355            .len()
356            .try_into()
357            .map_err(|error| anyhow::anyhow!("Failed to convert pieces in sector: {error}"))?;
358        let mut sector_contents_map = SectorContentsMap::new(pieces_in_sector);
359
360        {
361            let global_mutex = self.global_mutex;
362            let erasure_coding = self.erasure_coding;
363
364            let iter = Mutex::new(
365                (PieceOffset::ZERO..)
366                    .zip(records.iter_mut())
367                    .zip(sector_contents_map.iter_record_chunks_used_mut()),
368            );
369
370            rayon::scope(|scope| {
371                for table_generator in self.table_generators {
372                    scope.spawn(|_scope| {
373                        loop {
374                            // Take mutex briefly to make sure encoding is allowed right now
375                            global_mutex.lock_blocking();
376
377                            // This instead of `while` above because otherwise mutex will be held
378                            // for the duration of the loop and will limit concurrency to 1 record
379                            let Some(((piece_offset, record), record_chunks_used)) =
380                                iter.lock().next()
381                            else {
382                                return;
383                            };
384                            let pos_seed = sector_id.derive_evaluation_seed(piece_offset);
385
386                            record_encoding::<PosTable>(
387                                &pos_seed,
388                                record,
389                                record_chunks_used,
390                                table_generator,
391                                erasure_coding,
392                            );
393
394                            if abort_early.load(Ordering::Relaxed) {
395                                return;
396                            }
397                        }
398                    });
399                }
400            });
401        }
402
403        Ok(sector_contents_map)
404    }
405}
406
407impl<'a, PosTable> CpuRecordsEncoder<'a, PosTable>
408where
409    PosTable: Table,
410{
411    /// Create a new instance
412    pub fn new(
413        table_generators: &'a [PosTable::Generator],
414        erasure_coding: &'a ErasureCoding,
415        global_mutex: &'a AsyncMutex<()>,
416    ) -> Self {
417        Self {
418            table_generators,
419            erasure_coding,
420            global_mutex,
421        }
422    }
423}
424
425/// Options for encoding a sector.
426///
427/// Sector output and sector metadata output should be either empty (in which case they'll be
428/// resized to correct size automatically) or correctly sized from the beginning or else error will
429/// be returned.
430#[derive(Debug)]
431pub struct EncodeSectorOptions<'a, RE>
432where
433    RE: RecordsEncoder,
434{
435    /// Sector index
436    pub sector_index: SectorIndex,
437    /// Records encoding instance
438    pub records_encoder: &'a mut RE,
439    /// Whether encoding should be aborted early
440    pub abort_early: &'a AtomicBool,
441}
442
443/// Mostly opaque sector encoding result ready for writing
444#[derive(Debug)]
445pub struct EncodedSector {
446    /// Information about sector that was plotted
447    pub plotted_sector: PlottedSector,
448    raw_sector: RawSector,
449    sector_contents_map: SectorContentsMap,
450}
451
452/// Encode downloaded sector.
453///
454/// This function encodes downloaded sector records and returns sector encoding result that can be
455/// written using [`write_sector`].
456pub fn encode_sector<RE>(
457    downloaded_sector: DownloadedSector,
458    encoding_options: EncodeSectorOptions<'_, RE>,
459) -> Result<EncodedSector, PlottingError>
460where
461    RE: RecordsEncoder,
462{
463    let DownloadedSector {
464        sector_id,
465        piece_indices,
466        mut raw_sector,
467        history_size,
468    } = downloaded_sector;
469    let EncodeSectorOptions {
470        sector_index,
471        records_encoder,
472        abort_early,
473    } = encoding_options;
474
475    let pieces_in_sector = raw_sector.records.len().try_into().expect(
476        "Raw sector can only be created in this crate and it is always done correctly; qed",
477    );
478
479    let sector_contents_map = records_encoder
480        .encode_records(&sector_id, &mut raw_sector.records, abort_early)
481        .map_err(|error| PlottingError::RecordsEncoderError { error })?;
482
483    let sector_metadata = SectorMetadataChecksummed::from(SectorMetadata {
484        sector_index,
485        pieces_in_sector,
486        s_bucket_sizes: sector_contents_map.s_bucket_sizes(),
487        history_size,
488    });
489
490    Ok(EncodedSector {
491        plotted_sector: PlottedSector {
492            sector_id,
493            sector_index,
494            sector_metadata,
495            piece_indexes: piece_indices,
496        },
497        raw_sector,
498        sector_contents_map,
499    })
500}
501
502/// Write encoded sector into sector output
503pub fn write_sector(
504    encoded_sector: &EncodedSector,
505    sector_output: &mut Vec<u8>,
506) -> Result<(), PlottingError> {
507    let EncodedSector {
508        plotted_sector: _,
509        raw_sector,
510        sector_contents_map,
511    } = encoded_sector;
512
513    let pieces_in_sector = raw_sector.records.len().try_into().expect(
514        "Raw sector can only be created in this crate and it is always done correctly; qed",
515    );
516
517    let sector_size = sector_size(pieces_in_sector);
518
519    if !sector_output.is_empty() && sector_output.len() != sector_size {
520        return Err(PlottingError::BadSectorOutputSize {
521            provided: sector_output.len(),
522            expected: sector_size,
523        });
524    }
525
526    sector_output.resize(sector_size, 0);
527
528    // Write sector to disk in as the following regions:
529    // * sector contents map
530    // * record chunks as s-buckets
531    // * record metadata
532    // * checksum
533    {
534        let (sector_contents_map_region, remaining_bytes) =
535            sector_output.split_at_mut(SectorContentsMap::encoded_size(pieces_in_sector));
536        // Slice remaining memory into belonging to s-buckets and metadata
537        let (s_buckets_region, metadata_region) =
538            remaining_bytes.split_at_mut(sector_record_chunks_size(pieces_in_sector));
539
540        // Write sector contents map so we can decode it later
541        sector_contents_map
542            .encode_into(sector_contents_map_region)
543            .expect("Chunked into correct size above; qed");
544
545        let mut next_record_chunks_offset = vec![0_usize; pieces_in_sector.into()];
546        // Write record chunks, one s-bucket at a time
547        for (piece_offset, output) in (SBucket::ZERO..=SBucket::MAX)
548            .flat_map(|s_bucket| {
549                sector_contents_map
550                    .iter_s_bucket_piece_offsets(s_bucket)
551                    .expect("S-bucket guaranteed to be in range; qed")
552            })
553            .zip(s_buckets_region.as_chunks_mut::<{ RecordChunk::SIZE }>().0)
554        {
555            let next_record_chunks_offset =
556                &mut next_record_chunks_offset[usize::from(piece_offset)];
557
558            let chunk_position = *next_record_chunks_offset;
559            *next_record_chunks_offset += 1;
560            output.copy_from_slice(&raw_sector.records[usize::from(piece_offset)][chunk_position]);
561        }
562
563        let metadata_chunks = metadata_region
564            .as_chunks_mut::<{ RecordMetadata::encoded_size() }>()
565            .0;
566        for (record_metadata, output) in raw_sector.metadata.iter().zip(metadata_chunks) {
567            record_metadata.encode_to(&mut output.as_mut_slice());
568        }
569
570        // It would be more efficient to not re-read the whole sector again, but it makes above code
571        // significantly more convoluted and most likely not worth it
572        let (sector_contents, sector_checksum) =
573            sector_output.split_at_mut(sector_size - Blake3Hash::SIZE);
574        sector_checksum.copy_from_slice(
575            {
576                let mut hasher = blake3::Hasher::new();
577                hasher.update_rayon(sector_contents);
578                hasher.finalize()
579            }
580            .as_bytes(),
581        );
582    }
583
584    Ok(())
585}
586
587fn record_encoding<PosTable>(
588    pos_seed: &PosSeed,
589    record: &mut Record,
590    record_chunks_used: &mut FoundProofs,
591    table_generator: &PosTable::Generator,
592    erasure_coding: &ErasureCoding,
593) where
594    PosTable: Table,
595{
596    let pos_proofs = table_generator.create_proofs_parallel(pos_seed);
597
598    let mut parity_record_chunks = Record::new_boxed();
599
600    // Erasure code source record chunks
601    erasure_coding
602        .extend(record.iter(), parity_record_chunks.iter_mut())
603        .expect("Statically guaranteed valid inputs; qed");
604
605    *record_chunks_used = pos_proofs.found_proofs;
606
607    // TODO: This can probably be optimized by using SIMD
608    let mut num_found_proofs = 0_usize;
609    for (s_buckets, found_proofs) in (0..Record::NUM_S_BUCKETS)
610        .array_chunks::<{ u8::BITS as usize }>()
611        .zip(pos_proofs.found_proofs)
612    {
613        for (proof_offset, s_bucket) in s_buckets.into_iter().enumerate() {
614            if (found_proofs & (1 << proof_offset)) != 0 {
615                let record_chunk = if s_bucket < Record::NUM_CHUNKS {
616                    record[s_bucket]
617                } else {
618                    parity_record_chunks[s_bucket - Record::NUM_CHUNKS]
619                };
620                // TODO: SIMD hashing
621                record[num_found_proofs] = (Simd::from(record_chunk)
622                    ^ Simd::from(*pos_proofs.proofs[num_found_proofs].hash()))
623                .to_array();
624                num_found_proofs += 1;
625            }
626        }
627    }
628}
629
630async fn download_sector_internal<PG>(
631    pieces_to_download: &mut HashMap<PieceIndex, Vec<(&mut Record, &mut RecordMetadata)>>,
632    piece_getter: &PG,
633    erasure_coding: &ErasureCoding,
634) -> Result<(), PlottingError>
635where
636    PG: PieceGetter + Send + Sync,
637{
638    // TODO: Make configurable, likely allowing user to specify RAM usage expectations and inferring
639    //  concurrency from there
640    let recovery_semaphore = &Semaphore::new(RECONSTRUCTION_CONCURRENCY_LIMIT);
641
642    // Allocate to decouple lifetime from `pieces_to_download` that will be modified below
643    let piece_indices = pieces_to_download.keys().copied().collect::<Vec<_>>();
644    let mut downloaded_pieces = piece_getter
645        .get_pieces(piece_indices)
646        .await
647        .map_err(|error| PlottingError::FailedToRetrievePieces { error })?
648        .fuse();
649    let mut reconstructed_pieces = FuturesUnordered::new();
650
651    let mut final_result = Ok(());
652
653    loop {
654        let (piece_index, result) = select! {
655            (piece_index, result) = downloaded_pieces.select_next_some() => {
656                match result {
657                    Ok(Some(piece)) => (piece_index, Ok(piece)),
658                    Ok(None) => {
659                        trace!(%piece_index, "Piece was not found, trying reconstruction");
660
661                        reconstructed_pieces.push(reconstruct_piece(
662                            piece_index,
663                            recovery_semaphore,
664                            piece_getter,
665                            erasure_coding,
666                        ));
667                        continue;
668                    }
669                    Err(error) => {
670                        trace!(
671                            %error,
672                            %piece_index,
673                            "Failed to download piece, trying reconstruction"
674                        );
675
676                        reconstructed_pieces.push(reconstruct_piece(
677                            piece_index,
678                            recovery_semaphore,
679                            piece_getter,
680                            erasure_coding,
681                        ));
682                        continue;
683                    }
684                }
685            },
686            (piece_index, result) = reconstructed_pieces.select_next_some() => {
687                (piece_index, result)
688            },
689            complete => {
690                break;
691            }
692        };
693
694        match result {
695            Ok(piece) => {
696                process_piece(piece_index, piece, pieces_to_download);
697            }
698            Err(error) => {
699                trace!(%error, %piece_index, "Failed to download piece");
700
701                if final_result.is_ok() {
702                    final_result = Err(error);
703                }
704            }
705        }
706    }
707
708    if final_result.is_ok() && !pieces_to_download.is_empty() {
709        return Err(PlottingError::FailedToRetrievePieces {
710            error: anyhow::anyhow!(
711                "Successful result, but not all pieces were downloaded, this is likely a piece \
712                getter implementation bug"
713            ),
714        });
715    }
716
717    final_result
718}
719
720async fn reconstruct_piece<PG>(
721    piece_index: PieceIndex,
722    recovery_semaphore: &Semaphore,
723    piece_getter: &PG,
724    erasure_coding: &ErasureCoding,
725) -> (PieceIndex, Result<Piece, PlottingError>)
726where
727    PG: PieceGetter + Send + Sync,
728{
729    let _permit = recovery_semaphore.acquire().await;
730    let recovered_piece_fut =
731        recover_missing_piece(piece_getter, erasure_coding.clone(), piece_index);
732
733    (
734        piece_index,
735        recovered_piece_fut
736            .await
737            .map_err(|error| PlottingError::PieceRecoveryFailed {
738                piece_index,
739                error: error.into(),
740            }),
741    )
742}
743
744fn process_piece(
745    piece_index: PieceIndex,
746    piece: Piece,
747    pieces_to_download: &mut HashMap<PieceIndex, Vec<(&mut Record, &mut RecordMetadata)>>,
748) {
749    for (record, metadata) in pieces_to_download.remove(&piece_index).unwrap_or_default() {
750        *metadata = RecordMetadata {
751            piece_header: piece.header,
752            piece_checksum: blake3::hash(piece.as_ref()).into(),
753        };
754        // Fancy way to insert value to avoid going through stack (if naive dereferencing is used)
755        // and potentially causing stack overflow as the result
756        record.copy_from_slice(&*piece.record);
757    }
758}