Skip to main content

ab_farmer_components/
plotting.rs

1//! Plotting utilities
2//!
3//! This module contains functions and data structures that can be used for plotting purposes
4//! (primarily with CPU).
5//!
6//! Plotted sectors can be written to plot and later [`read`](crate::reading) and/or
7//! [`audited`](crate::auditing)/[`proven`](crate::proving) using other modules of this crate.
8
9use crate::FarmerProtocolInfo;
10use crate::sector::{
11    FoundProofs, RawSector, RecordMetadata, SectorContentsMap, SectorMetadata,
12    SectorMetadataChecksummed, sector_record_chunks_size, sector_size,
13};
14use crate::segment_reconstruction::recover_missing_piece;
15use ab_core_primitives::hashes::Blake3Hash;
16use ab_core_primitives::pieces::{Piece, PieceIndex, PieceOffset, Record, RecordChunk};
17use ab_core_primitives::pos::PosSeed;
18use ab_core_primitives::sectors::{SBucket, SectorId, SectorIndex};
19use ab_core_primitives::segments::HistorySize;
20use ab_core_primitives::solutions::ShardCommitmentHash;
21use ab_data_retrieval::piece_getter::PieceGetter;
22use ab_erasure_coding::ErasureCoding;
23use ab_proof_of_space::{Table, TableGenerator};
24use async_lock::{Mutex as AsyncMutex, Semaphore};
25use backon::{ExponentialBuilder, Retryable};
26use futures::stream::FuturesUnordered;
27use futures::{StreamExt, select};
28use parity_scale_codec::{Decode, Encode};
29use parking_lot::Mutex;
30use std::collections::HashMap;
31use std::simd::Simd;
32use std::sync::Arc;
33use std::sync::atomic::{AtomicBool, Ordering};
34use std::time::Duration;
35use thiserror::Error;
36use tracing::{debug, trace, warn};
37
38const RECONSTRUCTION_CONCURRENCY_LIMIT: usize = 1;
39
40/// Information about sector that was plotted
41#[derive(Debug, Clone, Encode, Decode)]
42pub struct PlottedSector {
43    /// Sector ID
44    pub sector_id: SectorId,
45    /// Sector index
46    pub sector_index: SectorIndex,
47    /// Sector metadata
48    pub sector_metadata: SectorMetadataChecksummed,
49    /// Indexes of pieces that were plotted
50    pub piece_indexes: Vec<PieceIndex>,
51}
52
53/// Plotting status
54#[derive(Debug, Error)]
55pub enum PlottingError {
56    /// Records encoder error
57    #[error("Records encoder error: {error}")]
58    RecordsEncoderError {
59        /// Lower-level error
60        error: anyhow::Error,
61    },
62    /// Bad sector output size
63    #[error("Bad sector output size: provided {provided}, expected {expected}")]
64    BadSectorOutputSize {
65        /// Actual size
66        provided: usize,
67        /// Expected size
68        expected: usize,
69    },
70    /// Can't recover missing piece
71    #[error("Can't recover missing piece {piece_index}: {error}")]
72    PieceRecoveryFailed {
73        /// Piece index
74        piece_index: PieceIndex,
75        /// Lower-level error
76        error: anyhow::Error,
77    },
78    /// Failed to retrieve piece
79    #[error("Failed to retrieve pieces: {error}")]
80    FailedToRetrievePieces {
81        /// Lower-level error
82        error: anyhow::Error,
83    },
84    /// Abort early
85    #[error("Abort early")]
86    AbortEarly,
87}
88
89/// Options for plotting a sector.
90///
91/// Sector output and sector metadata output should be either empty (in which case they'll be
92/// resized to correct size automatically) or correctly sized from the beginning or else error will
93/// be returned.
94#[derive(Debug)]
95pub struct PlotSectorOptions<'a, RE, PG> {
96    /// Public key corresponding to sector
97    pub public_key_hash: &'a Blake3Hash,
98    /// Root of the Merkle Tree of shard commitments
99    pub shard_commitments_root: &'a ShardCommitmentHash,
100    /// Sector index
101    pub sector_index: SectorIndex,
102    /// Getter for pieces of archival history
103    pub piece_getter: &'a PG,
104    /// Farmer protocol info
105    pub farmer_protocol_info: FarmerProtocolInfo,
106    /// Erasure coding instance
107    pub erasure_coding: &'a ErasureCoding,
108    /// How many pieces should sector contain
109    pub pieces_in_sector: u16,
110    /// Where plotted sector should be written, vector must either be empty (in which case it'll be
111    /// resized to correct size automatically) or correctly sized from the beginning
112    pub sector_output: &'a mut Vec<u8>,
113    /// Semaphore for part of the plotting when farmer downloads new sector, allows to limit memory
114    /// usage of the plotting process, permit will be held until the end of the plotting process
115    pub downloading_semaphore: Option<Arc<Semaphore>>,
116    /// Semaphore for part of the plotting when farmer encodes downloaded sector, should typically
117    /// allow one permit at a time for efficient CPU utilization
118    pub encoding_semaphore: Option<&'a Semaphore>,
119    /// Proof of space table generators
120    pub records_encoder: &'a mut RE,
121    /// Whether encoding should be aborted early
122    pub abort_early: &'a AtomicBool,
123}
124
125/// Plot a single sector.
126///
127/// This is a convenient wrapper around [`download_sector`] and [`encode_sector`] functions.
128///
129/// NOTE: Even though this function is async, it has blocking code inside and must be running in a
130/// separate thread in order to prevent blocking an executor.
131pub async fn plot_sector<RE, PG>(
132    options: PlotSectorOptions<'_, RE, PG>,
133) -> Result<PlottedSector, PlottingError>
134where
135    RE: RecordsEncoder,
136    PG: PieceGetter + Send + Sync,
137{
138    let PlotSectorOptions {
139        public_key_hash,
140        shard_commitments_root,
141        sector_index,
142        piece_getter,
143        farmer_protocol_info,
144        erasure_coding,
145        pieces_in_sector,
146        sector_output,
147        downloading_semaphore,
148        encoding_semaphore,
149        records_encoder,
150        abort_early,
151    } = options;
152
153    let _downloading_permit = match downloading_semaphore {
154        Some(downloading_semaphore) => Some(downloading_semaphore.acquire_arc().await),
155        None => None,
156    };
157
158    let download_sector_fut = download_sector(DownloadSectorOptions {
159        public_key_hash,
160        shard_commitments_root,
161        sector_index,
162        piece_getter,
163        farmer_protocol_info,
164        erasure_coding,
165        pieces_in_sector,
166    });
167
168    let _encoding_permit = match encoding_semaphore {
169        Some(encoding_semaphore) => Some(encoding_semaphore.acquire().await),
170        None => None,
171    };
172
173    let encoded_sector = encode_sector(
174        download_sector_fut.await?,
175        EncodeSectorOptions::<RE> {
176            sector_index,
177            records_encoder,
178            abort_early,
179        },
180    )?;
181
182    if abort_early.load(Ordering::Acquire) {
183        return Err(PlottingError::AbortEarly);
184    }
185
186    write_sector(&encoded_sector, sector_output)?;
187
188    Ok(encoded_sector.plotted_sector)
189}
190
191/// Opaque sector downloading result and ready for writing
192#[derive(Debug)]
193pub struct DownloadedSector {
194    sector_id: SectorId,
195    piece_indices: Vec<PieceIndex>,
196    raw_sector: RawSector,
197    history_size: HistorySize,
198}
199
200/// Options for sector downloading
201#[derive(Debug)]
202pub struct DownloadSectorOptions<'a, PG> {
203    /// Public key corresponding to sector
204    pub public_key_hash: &'a Blake3Hash,
205    /// Root of the Merkle Tree of shard commitments
206    pub shard_commitments_root: &'a ShardCommitmentHash,
207    /// Sector index
208    pub sector_index: SectorIndex,
209    /// Getter for pieces of archival history
210    pub piece_getter: &'a PG,
211    /// Farmer protocol info
212    pub farmer_protocol_info: FarmerProtocolInfo,
213    /// Erasure coding instance
214    pub erasure_coding: &'a ErasureCoding,
215    /// How many pieces should sector contain
216    pub pieces_in_sector: u16,
217}
218
219/// Download sector for plotting.
220///
221/// This will identify necessary pieces and download them using provided piece getter, after which
222/// they can be encoded using [`encode_sector`] and written to the plot.
223pub async fn download_sector<PG>(
224    options: DownloadSectorOptions<'_, PG>,
225) -> Result<DownloadedSector, PlottingError>
226where
227    PG: PieceGetter + Send + Sync,
228{
229    let DownloadSectorOptions {
230        public_key_hash,
231        shard_commitments_root,
232        sector_index,
233        piece_getter,
234        farmer_protocol_info,
235        erasure_coding,
236        pieces_in_sector,
237    } = options;
238
239    let sector_id = SectorId::new(
240        public_key_hash,
241        shard_commitments_root,
242        sector_index,
243        farmer_protocol_info.history_size,
244    );
245
246    let piece_indices = (PieceOffset::ZERO..)
247        .take(pieces_in_sector.into())
248        .map(|piece_offset| {
249            sector_id.derive_piece_index(
250                piece_offset,
251                farmer_protocol_info.history_size,
252                farmer_protocol_info.max_pieces_in_sector,
253                farmer_protocol_info.recent_segments,
254                farmer_protocol_info.recent_history_fraction,
255            )
256        })
257        .collect::<Vec<_>>();
258
259    let raw_sector = {
260        let mut raw_sector = RawSector::new(pieces_in_sector);
261        let mut pieces_to_download =
262            HashMap::<PieceIndex, Vec<_>>::with_capacity(usize::from(pieces_in_sector));
263        for (piece_index, (record, metadata)) in piece_indices
264            .iter()
265            .copied()
266            .zip(raw_sector.records.iter_mut().zip(&mut raw_sector.metadata))
267        {
268            pieces_to_download
269                .entry(piece_index)
270                .or_default()
271                .push((record, metadata));
272        }
273        // This map will be mutated, removing piece indices we have already processed
274        let pieces_to_download = AsyncMutex::new(pieces_to_download);
275
276        (|| async {
277            let mut pieces_to_download = pieces_to_download.lock().await;
278
279            if let Err(error) =
280                download_sector_internal(&mut pieces_to_download, piece_getter, erasure_coding)
281                    .await
282            {
283                warn!(
284                    %sector_index,
285                    %error,
286                    %pieces_in_sector,
287                    remaining_pieces = %pieces_to_download.len(),
288                    "Sector downloading attempt failed, will retry later"
289                );
290
291                return Err(error);
292            }
293
294            debug!(%sector_index, "Sector downloaded successfully");
295
296            Ok(())
297        })
298        .retry(
299            ExponentialBuilder::default()
300                .with_min_delay(Duration::from_secs(15))
301                .with_max_delay(Duration::from_mins(10))
302                // Try until we get a valid piece
303                .without_max_times(),
304        )
305        .await?;
306
307        raw_sector
308    };
309
310    Ok(DownloadedSector {
311        sector_id,
312        piece_indices,
313        raw_sector,
314        history_size: farmer_protocol_info.history_size,
315    })
316}
317
318/// Records encoder for plotting purposes
319pub trait RecordsEncoder {
320    /// Encode provided sector records
321    fn encode_records(
322        &mut self,
323        sector_id: &SectorId,
324        records: &mut [Record],
325        abort_early: &AtomicBool,
326    ) -> anyhow::Result<SectorContentsMap>;
327}
328
329/// CPU implementation of [`RecordsEncoder`]
330#[derive(Debug)]
331pub struct CpuRecordsEncoder<'a, PosTable>
332where
333    PosTable: Table,
334{
335    table_generators: &'a [PosTable::Generator],
336    erasure_coding: &'a ErasureCoding,
337    global_mutex: &'a AsyncMutex<()>,
338}
339
340impl<PosTable> RecordsEncoder for CpuRecordsEncoder<'_, PosTable>
341where
342    PosTable: Table,
343{
344    fn encode_records(
345        &mut self,
346        sector_id: &SectorId,
347        records: &mut [Record],
348        abort_early: &AtomicBool,
349    ) -> anyhow::Result<SectorContentsMap> {
350        if self.table_generators.is_empty() {
351            return Err(anyhow::anyhow!("No table generators"));
352        }
353
354        let pieces_in_sector = records
355            .len()
356            .try_into()
357            .map_err(|error| anyhow::anyhow!("Failed to convert pieces in sector: {error}"))?;
358        let mut sector_contents_map = SectorContentsMap::new(pieces_in_sector);
359
360        {
361            let global_mutex = self.global_mutex;
362            let erasure_coding = self.erasure_coding;
363
364            let iter = Mutex::new(
365                (PieceOffset::ZERO..)
366                    .zip(records.iter_mut())
367                    .zip(sector_contents_map.iter_record_chunks_used_mut()),
368            );
369
370            rayon::scope(|scope| {
371                for table_generator in self.table_generators {
372                    scope.spawn(|_scope| {
373                        loop {
374                            // Take mutex briefly to make sure encoding is allowed right now
375                            global_mutex.lock_blocking();
376
377                            // This instead of `while` above because otherwise mutex will be held
378                            // for the duration of the loop and will limit concurrency to 1 record
379                            let Some(((piece_offset, record), record_chunks_used)) =
380                                iter.lock().next()
381                            else {
382                                return;
383                            };
384                            let pos_seed = sector_id.derive_evaluation_seed(piece_offset);
385
386                            record_encoding::<PosTable>(
387                                &pos_seed,
388                                record,
389                                record_chunks_used,
390                                table_generator,
391                                erasure_coding,
392                            );
393
394                            if abort_early.load(Ordering::Relaxed) {
395                                return;
396                            }
397                        }
398                    });
399                }
400            });
401        }
402
403        Ok(sector_contents_map)
404    }
405}
406
407impl<'a, PosTable> CpuRecordsEncoder<'a, PosTable>
408where
409    PosTable: Table,
410{
411    /// Create a new instance
412    pub fn new(
413        table_generators: &'a [PosTable::Generator],
414        erasure_coding: &'a ErasureCoding,
415        global_mutex: &'a AsyncMutex<()>,
416    ) -> Self {
417        Self {
418            table_generators,
419            erasure_coding,
420            global_mutex,
421        }
422    }
423}
424
425/// Options for encoding a sector.
426///
427/// Sector output and sector metadata output should be either empty (in which case they'll be
428/// resized to correct size automatically) or correctly sized from the beginning or else error will
429/// be returned.
430#[derive(Debug)]
431pub struct EncodeSectorOptions<'a, RE>
432where
433    RE: RecordsEncoder,
434{
435    /// Sector index
436    pub sector_index: SectorIndex,
437    /// Records encoding instance
438    pub records_encoder: &'a mut RE,
439    /// Whether encoding should be aborted early
440    pub abort_early: &'a AtomicBool,
441}
442
443/// Mostly opaque sector encoding result ready for writing
444#[derive(Debug)]
445#[expect(
446    clippy::partial_pub_fields,
447    reason = "Intentionally hiding some fields such that they can only be used by internal APIs"
448)]
449pub struct EncodedSector {
450    /// Information about sector that was plotted
451    pub plotted_sector: PlottedSector,
452    raw_sector: RawSector,
453    sector_contents_map: SectorContentsMap,
454}
455
456/// Encode downloaded sector.
457///
458/// This function encodes downloaded sector records and returns sector encoding result that can be
459/// written using [`write_sector`].
460pub fn encode_sector<RE>(
461    downloaded_sector: DownloadedSector,
462    encoding_options: EncodeSectorOptions<'_, RE>,
463) -> Result<EncodedSector, PlottingError>
464where
465    RE: RecordsEncoder,
466{
467    let DownloadedSector {
468        sector_id,
469        piece_indices,
470        mut raw_sector,
471        history_size,
472    } = downloaded_sector;
473    let EncodeSectorOptions {
474        sector_index,
475        records_encoder,
476        abort_early,
477    } = encoding_options;
478
479    let pieces_in_sector = raw_sector.records.len().try_into().expect(
480        "Raw sector can only be created in this crate and it is always done correctly; qed",
481    );
482
483    let sector_contents_map = records_encoder
484        .encode_records(&sector_id, &mut raw_sector.records, abort_early)
485        .map_err(|error| PlottingError::RecordsEncoderError { error })?;
486
487    let sector_metadata = SectorMetadataChecksummed::from(SectorMetadata {
488        sector_index,
489        pieces_in_sector,
490        s_bucket_sizes: sector_contents_map.s_bucket_sizes(),
491        history_size,
492    });
493
494    Ok(EncodedSector {
495        plotted_sector: PlottedSector {
496            sector_id,
497            sector_index,
498            sector_metadata,
499            piece_indexes: piece_indices,
500        },
501        raw_sector,
502        sector_contents_map,
503    })
504}
505
506/// Write encoded sector into sector output
507pub fn write_sector(
508    encoded_sector: &EncodedSector,
509    sector_output: &mut Vec<u8>,
510) -> Result<(), PlottingError> {
511    let EncodedSector {
512        plotted_sector: _,
513        raw_sector,
514        sector_contents_map,
515    } = encoded_sector;
516
517    let pieces_in_sector = raw_sector.records.len().try_into().expect(
518        "Raw sector can only be created in this crate and it is always done correctly; qed",
519    );
520
521    let sector_size = sector_size(pieces_in_sector);
522
523    if !sector_output.is_empty() && sector_output.len() != sector_size {
524        return Err(PlottingError::BadSectorOutputSize {
525            provided: sector_output.len(),
526            expected: sector_size,
527        });
528    }
529
530    sector_output.resize(sector_size, 0);
531
532    // Write sector to disk in as the following regions:
533    // * sector contents map
534    // * record chunks as s-buckets
535    // * record metadata
536    // * checksum
537    {
538        let (sector_contents_map_region, remaining_bytes) =
539            sector_output.split_at_mut(SectorContentsMap::encoded_size(pieces_in_sector));
540        // Slice remaining memory into belonging to s-buckets and metadata
541        let (s_buckets_region, metadata_region) =
542            remaining_bytes.split_at_mut(sector_record_chunks_size(pieces_in_sector));
543
544        // Write sector contents map so we can decode it later
545        sector_contents_map
546            .encode_into(sector_contents_map_region)
547            .expect("Chunked into correct size above; qed");
548
549        let mut next_record_chunks_offset = vec![0usize; pieces_in_sector.into()];
550        // Write record chunks, one s-bucket at a time
551        for (piece_offset, output) in (SBucket::ZERO..=SBucket::MAX)
552            .flat_map(|s_bucket| {
553                sector_contents_map
554                    .iter_s_bucket_piece_offsets(s_bucket)
555                    .expect("S-bucket guaranteed to be in range; qed")
556            })
557            .zip(s_buckets_region.as_chunks_mut::<{ RecordChunk::SIZE }>().0)
558        {
559            let next_record_chunks_offset =
560                &mut next_record_chunks_offset[usize::from(piece_offset)];
561
562            let chunk_position = *next_record_chunks_offset;
563            *next_record_chunks_offset += 1;
564            output.copy_from_slice(&raw_sector.records[usize::from(piece_offset)][chunk_position]);
565        }
566
567        let metadata_chunks = metadata_region
568            .as_chunks_mut::<{ RecordMetadata::encoded_size() }>()
569            .0;
570        for (record_metadata, output) in raw_sector.metadata.iter().zip(metadata_chunks) {
571            record_metadata.encode_to(&mut output.as_mut_slice());
572        }
573
574        // It would be more efficient to not re-read the whole sector again, but it makes above code
575        // significantly more convoluted and most likely not worth it
576        let (sector_contents, sector_checksum) =
577            sector_output.split_at_mut(sector_size - Blake3Hash::SIZE);
578        sector_checksum.copy_from_slice(
579            {
580                let mut hasher = blake3::Hasher::new();
581                hasher.update_rayon(sector_contents);
582                hasher.finalize()
583            }
584            .as_bytes(),
585        );
586    }
587
588    Ok(())
589}
590
591fn record_encoding<PosTable>(
592    pos_seed: &PosSeed,
593    record: &mut Record,
594    record_chunks_used: &mut FoundProofs,
595    table_generator: &PosTable::Generator,
596    erasure_coding: &ErasureCoding,
597) where
598    PosTable: Table,
599{
600    let pos_proofs = table_generator.create_proofs_parallel(pos_seed);
601
602    let mut parity_record_chunks = Record::new_boxed();
603
604    // Erasure code source record chunks
605    erasure_coding
606        .extend(record.iter(), parity_record_chunks.iter_mut())
607        .expect("Statically guaranteed valid inputs; qed");
608
609    *record_chunks_used = pos_proofs.found_proofs;
610
611    // TODO: This can probably be optimized by using SIMD
612    let mut num_found_proofs = 0_usize;
613    for (s_buckets, found_proofs) in (0..Record::NUM_S_BUCKETS)
614        .array_chunks::<{ u8::BITS as usize }>()
615        .zip(pos_proofs.found_proofs)
616    {
617        for (proof_offset, s_bucket) in s_buckets.into_iter().enumerate() {
618            if (found_proofs & (1 << proof_offset)) != 0 {
619                let record_chunk = if s_bucket < Record::NUM_CHUNKS {
620                    record[s_bucket]
621                } else {
622                    parity_record_chunks[s_bucket - Record::NUM_CHUNKS]
623                };
624                // TODO: SIMD hashing
625                record[num_found_proofs] = (Simd::from(record_chunk)
626                    ^ Simd::from(*pos_proofs.proofs[num_found_proofs].hash()))
627                .to_array();
628                num_found_proofs += 1;
629            }
630        }
631    }
632}
633
634async fn download_sector_internal<PG>(
635    pieces_to_download: &mut HashMap<PieceIndex, Vec<(&mut Record, &mut RecordMetadata)>>,
636    piece_getter: &PG,
637    erasure_coding: &ErasureCoding,
638) -> Result<(), PlottingError>
639where
640    PG: PieceGetter + Send + Sync,
641{
642    // TODO: Make configurable, likely allowing user to specify RAM usage expectations and inferring
643    //  concurrency from there
644    let recovery_semaphore = &Semaphore::new(RECONSTRUCTION_CONCURRENCY_LIMIT);
645
646    // Allocate to decouple lifetime from `pieces_to_download` that will be modified below
647    let piece_indices = pieces_to_download.keys().copied().collect::<Vec<_>>();
648    let mut downloaded_pieces = piece_getter
649        .get_pieces(piece_indices)
650        .await
651        .map_err(|error| PlottingError::FailedToRetrievePieces { error })?
652        .fuse();
653    let mut reconstructed_pieces = FuturesUnordered::new();
654
655    let mut final_result = Ok(());
656
657    loop {
658        let (piece_index, result) = select! {
659            (piece_index, result) = downloaded_pieces.select_next_some() => {
660                match result {
661                    Ok(Some(piece)) => (piece_index, Ok(piece)),
662                    Ok(None) => {
663                        trace!(%piece_index, "Piece was not found, trying reconstruction");
664
665                        reconstructed_pieces.push(reconstruct_piece(
666                            piece_index,
667                            recovery_semaphore,
668                            piece_getter,
669                            erasure_coding,
670                        ));
671                        continue;
672                    }
673                    Err(error) => {
674                        trace!(
675                            %error,
676                            %piece_index,
677                            "Failed to download piece, trying reconstruction"
678                        );
679
680                        reconstructed_pieces.push(reconstruct_piece(
681                            piece_index,
682                            recovery_semaphore,
683                            piece_getter,
684                            erasure_coding,
685                        ));
686                        continue;
687                    }
688                }
689            },
690            (piece_index, result) = reconstructed_pieces.select_next_some() => {
691                (piece_index, result)
692            },
693            complete => {
694                break;
695            }
696        };
697
698        match result {
699            Ok(piece) => {
700                process_piece(piece_index, piece, pieces_to_download);
701            }
702            Err(error) => {
703                trace!(%error, %piece_index, "Failed to download piece");
704
705                if final_result.is_ok() {
706                    final_result = Err(error);
707                }
708            }
709        }
710    }
711
712    if final_result.is_ok() && !pieces_to_download.is_empty() {
713        return Err(PlottingError::FailedToRetrievePieces {
714            error: anyhow::anyhow!(
715                "Successful result, but not all pieces were downloaded, this is likely a piece \
716                getter implementation bug"
717            ),
718        });
719    }
720
721    final_result
722}
723
724async fn reconstruct_piece<PG>(
725    piece_index: PieceIndex,
726    recovery_semaphore: &Semaphore,
727    piece_getter: &PG,
728    erasure_coding: &ErasureCoding,
729) -> (PieceIndex, Result<Piece, PlottingError>)
730where
731    PG: PieceGetter + Send + Sync,
732{
733    let _permit = recovery_semaphore.acquire().await;
734    let recovered_piece_fut =
735        recover_missing_piece(piece_getter, erasure_coding.clone(), piece_index);
736
737    (
738        piece_index,
739        recovered_piece_fut
740            .await
741            .map_err(|error| PlottingError::PieceRecoveryFailed {
742                piece_index,
743                error: error.into(),
744            }),
745    )
746}
747
748fn process_piece(
749    piece_index: PieceIndex,
750    piece: Piece,
751    pieces_to_download: &mut HashMap<PieceIndex, Vec<(&mut Record, &mut RecordMetadata)>>,
752) {
753    for (record, metadata) in pieces_to_download.remove(&piece_index).unwrap_or_default() {
754        *metadata = RecordMetadata {
755            piece_header: piece.header,
756            piece_checksum: blake3::hash(piece.as_ref()).into(),
757        };
758        // Fancy way to insert value to avoid going through stack (if naive dereferencing is used)
759        // and potentially causing stack overflow as the result
760        record.copy_from_slice(&*piece.record);
761    }
762}