ab_farmer_components/
plotting.rs

1//! Plotting utilities
2//!
3//! This module contains functions and data structures that can be used for plotting purposes
4//! (primarily with CPU).
5//!
6//! Plotted sectors can be written to plot and later [`read`](crate::reading) and/or
7//! [`audited`](crate::auditing)/[`proven`](crate::proving) using other modules of this crate.
8
9use crate::FarmerProtocolInfo;
10use crate::sector::{
11    RawSector, RecordMetadata, SectorContentsMap, SectorMetadata, SectorMetadataChecksummed,
12    SingleRecordBitArray, sector_record_chunks_size, sector_size,
13};
14use crate::segment_reconstruction::recover_missing_piece;
15use ab_core_primitives::hashes::Blake3Hash;
16use ab_core_primitives::pieces::{Piece, PieceIndex, PieceOffset, Record, RecordChunk};
17use ab_core_primitives::pos::PosSeed;
18use ab_core_primitives::sectors::{SBucket, SectorId, SectorIndex};
19use ab_core_primitives::segments::HistorySize;
20use ab_data_retrieval::piece_getter::PieceGetter;
21use ab_erasure_coding::ErasureCoding;
22use ab_proof_of_space::{Table, TableGenerator};
23use async_lock::{Mutex as AsyncMutex, Semaphore};
24use backoff::future::retry;
25use backoff::{Error as BackoffError, ExponentialBackoff};
26use futures::stream::FuturesUnordered;
27use futures::{StreamExt, select};
28use parity_scale_codec::{Decode, Encode};
29use parking_lot::Mutex;
30use std::collections::HashMap;
31use std::simd::Simd;
32use std::sync::Arc;
33use std::sync::atomic::{AtomicBool, Ordering};
34use std::time::Duration;
35use thiserror::Error;
36use tracing::{debug, trace, warn};
37
38const RECONSTRUCTION_CONCURRENCY_LIMIT: usize = 1;
39
40fn default_backoff() -> ExponentialBackoff {
41    ExponentialBackoff {
42        initial_interval: Duration::from_secs(15),
43        max_interval: Duration::from_secs(10 * 60),
44        // Try until we get a valid piece
45        max_elapsed_time: None,
46        ..ExponentialBackoff::default()
47    }
48}
49
50/// Information about sector that was plotted
51#[derive(Debug, Clone, Encode, Decode)]
52pub struct PlottedSector {
53    /// Sector ID
54    pub sector_id: SectorId,
55    /// Sector index
56    pub sector_index: SectorIndex,
57    /// Sector metadata
58    pub sector_metadata: SectorMetadataChecksummed,
59    /// Indexes of pieces that were plotted
60    pub piece_indexes: Vec<PieceIndex>,
61}
62
63/// Plotting status
64#[derive(Debug, Error)]
65pub enum PlottingError {
66    /// Records encoder error
67    #[error("Records encoder error: {error}")]
68    RecordsEncoderError {
69        /// Lower-level error
70        error: anyhow::Error,
71    },
72    /// Bad sector output size
73    #[error("Bad sector output size: provided {provided}, expected {expected}")]
74    BadSectorOutputSize {
75        /// Actual size
76        provided: usize,
77        /// Expected size
78        expected: usize,
79    },
80    /// Can't recover missing piece
81    #[error("Can't recover missing piece {piece_index}: {error}")]
82    PieceRecoveryFailed {
83        /// Piece index
84        piece_index: PieceIndex,
85        /// Lower-level error
86        error: anyhow::Error,
87    },
88    /// Failed to retrieve piece
89    #[error("Failed to retrieve pieces: {error}")]
90    FailedToRetrievePieces {
91        /// Lower-level error
92        error: anyhow::Error,
93    },
94    /// Abort early
95    #[error("Abort early")]
96    AbortEarly,
97}
98
99/// Options for plotting a sector.
100///
101/// Sector output and sector metadata output should be either empty (in which case they'll be
102/// resized to correct size automatically) or correctly sized from the beginning or else error will
103/// be returned.
104#[derive(Debug)]
105pub struct PlotSectorOptions<'a, RE, PG> {
106    /// Public key corresponding to sector
107    pub public_key_hash: &'a Blake3Hash,
108    /// Sector index
109    pub sector_index: SectorIndex,
110    /// Getter for pieces of archival history
111    pub piece_getter: &'a PG,
112    /// Farmer protocol info
113    pub farmer_protocol_info: FarmerProtocolInfo,
114    /// Erasure coding instance
115    pub erasure_coding: &'a ErasureCoding,
116    /// How many pieces should sector contain
117    pub pieces_in_sector: u16,
118    /// Where plotted sector should be written, vector must either be empty (in which case it'll be
119    /// resized to correct size automatically) or correctly sized from the beginning
120    pub sector_output: &'a mut Vec<u8>,
121    /// Semaphore for part of the plotting when farmer downloads new sector, allows to limit memory
122    /// usage of the plotting process, permit will be held until the end of the plotting process
123    pub downloading_semaphore: Option<Arc<Semaphore>>,
124    /// Semaphore for part of the plotting when farmer encodes downloaded sector, should typically
125    /// allow one permit at a time for efficient CPU utilization
126    pub encoding_semaphore: Option<&'a Semaphore>,
127    /// Proof of space table generators
128    pub records_encoder: &'a mut RE,
129    /// Whether encoding should be aborted early
130    pub abort_early: &'a AtomicBool,
131}
132
133/// Plot a single sector.
134///
135/// This is a convenient wrapper around [`download_sector`] and [`encode_sector`] functions.
136///
137/// NOTE: Even though this function is async, it has blocking code inside and must be running in a
138/// separate thread in order to prevent blocking an executor.
139pub async fn plot_sector<RE, PG>(
140    options: PlotSectorOptions<'_, RE, PG>,
141) -> Result<PlottedSector, PlottingError>
142where
143    RE: RecordsEncoder,
144    PG: PieceGetter + Send + Sync,
145{
146    let PlotSectorOptions {
147        public_key_hash,
148        sector_index,
149        piece_getter,
150        farmer_protocol_info,
151        erasure_coding,
152        pieces_in_sector,
153        sector_output,
154        downloading_semaphore,
155        encoding_semaphore,
156        records_encoder,
157        abort_early,
158    } = options;
159
160    let _downloading_permit = match downloading_semaphore {
161        Some(downloading_semaphore) => Some(downloading_semaphore.acquire_arc().await),
162        None => None,
163    };
164
165    let download_sector_fut = download_sector(DownloadSectorOptions {
166        public_key_hash,
167        sector_index,
168        piece_getter,
169        farmer_protocol_info,
170        erasure_coding,
171        pieces_in_sector,
172    });
173
174    let _encoding_permit = match encoding_semaphore {
175        Some(encoding_semaphore) => Some(encoding_semaphore.acquire().await),
176        None => None,
177    };
178
179    let encoded_sector = encode_sector(
180        download_sector_fut.await?,
181        EncodeSectorOptions::<RE> {
182            sector_index,
183            records_encoder,
184            abort_early,
185        },
186    )?;
187
188    if abort_early.load(Ordering::Acquire) {
189        return Err(PlottingError::AbortEarly);
190    }
191
192    write_sector(&encoded_sector, sector_output)?;
193
194    Ok(encoded_sector.plotted_sector)
195}
196
197/// Opaque sector downloading result and ready for writing
198#[derive(Debug)]
199pub struct DownloadedSector {
200    sector_id: SectorId,
201    piece_indices: Vec<PieceIndex>,
202    raw_sector: RawSector,
203    history_size: HistorySize,
204}
205
206/// Options for sector downloading
207#[derive(Debug)]
208pub struct DownloadSectorOptions<'a, PG> {
209    /// Public key corresponding to sector
210    pub public_key_hash: &'a Blake3Hash,
211    /// Sector index
212    pub sector_index: SectorIndex,
213    /// Getter for pieces of archival history
214    pub piece_getter: &'a PG,
215    /// Farmer protocol info
216    pub farmer_protocol_info: FarmerProtocolInfo,
217    /// Erasure coding instance
218    pub erasure_coding: &'a ErasureCoding,
219    /// How many pieces should sector contain
220    pub pieces_in_sector: u16,
221}
222
223/// Download sector for plotting.
224///
225/// This will identify necessary pieces and download them using provided piece getter, after which
226/// they can be encoded using [`encode_sector`] and written to the plot.
227pub async fn download_sector<PG>(
228    options: DownloadSectorOptions<'_, PG>,
229) -> Result<DownloadedSector, PlottingError>
230where
231    PG: PieceGetter + Send + Sync,
232{
233    let DownloadSectorOptions {
234        public_key_hash,
235        sector_index,
236        piece_getter,
237        farmer_protocol_info,
238        erasure_coding,
239        pieces_in_sector,
240    } = options;
241
242    let sector_id = SectorId::new(
243        public_key_hash,
244        sector_index,
245        farmer_protocol_info.history_size,
246    );
247
248    let piece_indices = (PieceOffset::ZERO..)
249        .take(pieces_in_sector.into())
250        .map(|piece_offset| {
251            sector_id.derive_piece_index(
252                piece_offset,
253                farmer_protocol_info.history_size,
254                farmer_protocol_info.max_pieces_in_sector,
255                farmer_protocol_info.recent_segments,
256                farmer_protocol_info.recent_history_fraction,
257            )
258        })
259        .collect::<Vec<_>>();
260
261    let raw_sector = {
262        let mut raw_sector = RawSector::new(pieces_in_sector);
263        let mut pieces_to_download =
264            HashMap::<PieceIndex, Vec<_>>::with_capacity(usize::from(pieces_in_sector));
265        for (piece_index, (record, metadata)) in piece_indices
266            .iter()
267            .copied()
268            .zip(raw_sector.records.iter_mut().zip(&mut raw_sector.metadata))
269        {
270            pieces_to_download
271                .entry(piece_index)
272                .or_default()
273                .push((record, metadata));
274        }
275        // This map will be mutated, removing piece indices we have already processed
276        let pieces_to_download = AsyncMutex::new(pieces_to_download);
277
278        retry(default_backoff(), || async {
279            let mut pieces_to_download = pieces_to_download.lock().await;
280
281            if let Err(error) =
282                download_sector_internal(&mut pieces_to_download, piece_getter, erasure_coding)
283                    .await
284            {
285                warn!(
286                    %sector_index,
287                    %error,
288                    %pieces_in_sector,
289                    remaining_pieces = %pieces_to_download.len(),
290                    "Sector downloading attempt failed, will retry later"
291                );
292
293                return Err(BackoffError::transient(error));
294            }
295
296            debug!(%sector_index, "Sector downloaded successfully");
297
298            Ok(())
299        })
300        .await?;
301
302        raw_sector
303    };
304
305    Ok(DownloadedSector {
306        sector_id,
307        piece_indices,
308        raw_sector,
309        history_size: farmer_protocol_info.history_size,
310    })
311}
312
313/// Records encoder for plotting purposes
314pub trait RecordsEncoder {
315    /// Encode provided sector records
316    fn encode_records(
317        &mut self,
318        sector_id: &SectorId,
319        records: &mut [Record],
320        abort_early: &AtomicBool,
321    ) -> anyhow::Result<SectorContentsMap>;
322}
323
324/// CPU implementation of [`RecordsEncoder`]
325#[derive(Debug)]
326pub struct CpuRecordsEncoder<'a, PosTable>
327where
328    PosTable: Table,
329{
330    table_generators: &'a [PosTable::Generator],
331    erasure_coding: &'a ErasureCoding,
332    global_mutex: &'a AsyncMutex<()>,
333}
334
335impl<PosTable> RecordsEncoder for CpuRecordsEncoder<'_, PosTable>
336where
337    PosTable: Table,
338{
339    fn encode_records(
340        &mut self,
341        sector_id: &SectorId,
342        records: &mut [Record],
343        abort_early: &AtomicBool,
344    ) -> anyhow::Result<SectorContentsMap> {
345        if self.table_generators.is_empty() {
346            return Err(anyhow::anyhow!("No table generators"));
347        }
348
349        let pieces_in_sector = records
350            .len()
351            .try_into()
352            .map_err(|error| anyhow::anyhow!("Failed to convert pieces in sector: {error}"))?;
353        let mut sector_contents_map = SectorContentsMap::new(pieces_in_sector);
354
355        {
356            let global_mutex = self.global_mutex;
357            let erasure_coding = self.erasure_coding;
358
359            let iter = Mutex::new(
360                (PieceOffset::ZERO..)
361                    .zip(records.iter_mut())
362                    .zip(sector_contents_map.iter_record_chunks_used_mut()),
363            );
364
365            rayon::scope(|scope| {
366                for table_generator in self.table_generators {
367                    scope.spawn(|_scope| {
368                        loop {
369                            // Take mutex briefly to make sure encoding is allowed right now
370                            global_mutex.lock_blocking();
371
372                            // This instead of `while` above because otherwise mutex will be held
373                            // for the duration of the loop and will limit concurrency to 1 record
374                            let Some(((piece_offset, record), record_chunks_used)) =
375                                iter.lock().next()
376                            else {
377                                return;
378                            };
379                            let pos_seed = sector_id.derive_evaluation_seed(piece_offset);
380
381                            record_encoding::<PosTable>(
382                                &pos_seed,
383                                record,
384                                record_chunks_used,
385                                table_generator,
386                                erasure_coding,
387                            );
388
389                            if abort_early.load(Ordering::Relaxed) {
390                                return;
391                            }
392                        }
393                    });
394                }
395            });
396        }
397
398        Ok(sector_contents_map)
399    }
400}
401
402impl<'a, PosTable> CpuRecordsEncoder<'a, PosTable>
403where
404    PosTable: Table,
405{
406    /// Create a new instance
407    pub fn new(
408        table_generators: &'a [PosTable::Generator],
409        erasure_coding: &'a ErasureCoding,
410        global_mutex: &'a AsyncMutex<()>,
411    ) -> Self {
412        Self {
413            table_generators,
414            erasure_coding,
415            global_mutex,
416        }
417    }
418}
419
420/// Options for encoding a sector.
421///
422/// Sector output and sector metadata output should be either empty (in which case they'll be
423/// resized to correct size automatically) or correctly sized from the beginning or else error will
424/// be returned.
425#[derive(Debug)]
426pub struct EncodeSectorOptions<'a, RE>
427where
428    RE: RecordsEncoder,
429{
430    /// Sector index
431    pub sector_index: SectorIndex,
432    /// Records encoding instance
433    pub records_encoder: &'a mut RE,
434    /// Whether encoding should be aborted early
435    pub abort_early: &'a AtomicBool,
436}
437
438/// Mostly opaque sector encoding result ready for writing
439#[derive(Debug)]
440pub struct EncodedSector {
441    /// Information about sector that was plotted
442    pub plotted_sector: PlottedSector,
443    raw_sector: RawSector,
444    sector_contents_map: SectorContentsMap,
445}
446
447/// Encode downloaded sector.
448///
449/// This function encodes downloaded sector records and returns sector encoding result that can be
450/// written using [`write_sector`].
451pub fn encode_sector<RE>(
452    downloaded_sector: DownloadedSector,
453    encoding_options: EncodeSectorOptions<'_, RE>,
454) -> Result<EncodedSector, PlottingError>
455where
456    RE: RecordsEncoder,
457{
458    let DownloadedSector {
459        sector_id,
460        piece_indices,
461        mut raw_sector,
462        history_size,
463    } = downloaded_sector;
464    let EncodeSectorOptions {
465        sector_index,
466        records_encoder,
467        abort_early,
468    } = encoding_options;
469
470    let pieces_in_sector = raw_sector.records.len().try_into().expect(
471        "Raw sector can only be created in this crate and it is always done correctly; qed",
472    );
473
474    let sector_contents_map = records_encoder
475        .encode_records(&sector_id, &mut raw_sector.records, abort_early)
476        .map_err(|error| PlottingError::RecordsEncoderError { error })?;
477
478    let sector_metadata = SectorMetadataChecksummed::from(SectorMetadata {
479        sector_index,
480        pieces_in_sector,
481        s_bucket_sizes: sector_contents_map.s_bucket_sizes(),
482        history_size,
483    });
484
485    Ok(EncodedSector {
486        plotted_sector: PlottedSector {
487            sector_id,
488            sector_index,
489            sector_metadata,
490            piece_indexes: piece_indices,
491        },
492        raw_sector,
493        sector_contents_map,
494    })
495}
496
497/// Write encoded sector into sector output
498pub fn write_sector(
499    encoded_sector: &EncodedSector,
500    sector_output: &mut Vec<u8>,
501) -> Result<(), PlottingError> {
502    let EncodedSector {
503        plotted_sector: _,
504        raw_sector,
505        sector_contents_map,
506    } = encoded_sector;
507
508    let pieces_in_sector = raw_sector.records.len().try_into().expect(
509        "Raw sector can only be created in this crate and it is always done correctly; qed",
510    );
511
512    let sector_size = sector_size(pieces_in_sector);
513
514    if !sector_output.is_empty() && sector_output.len() != sector_size {
515        return Err(PlottingError::BadSectorOutputSize {
516            provided: sector_output.len(),
517            expected: sector_size,
518        });
519    }
520
521    sector_output.resize(sector_size, 0);
522
523    // Write sector to disk in as the following regions:
524    // * sector contents map
525    // * record chunks as s-buckets
526    // * record metadata
527    // * checksum
528    {
529        let (sector_contents_map_region, remaining_bytes) =
530            sector_output.split_at_mut(SectorContentsMap::encoded_size(pieces_in_sector));
531        // Slice remaining memory into belonging to s-buckets and metadata
532        let (s_buckets_region, metadata_region) =
533            remaining_bytes.split_at_mut(sector_record_chunks_size(pieces_in_sector));
534
535        // Write sector contents map so we can decode it later
536        sector_contents_map
537            .encode_into(sector_contents_map_region)
538            .expect("Chunked into correct size above; qed");
539
540        let mut next_record_chunks_offset = vec![0_usize; pieces_in_sector.into()];
541        // Write record chunks, one s-bucket at a time
542        for (piece_offset, output) in (SBucket::ZERO..=SBucket::MAX)
543            .flat_map(|s_bucket| {
544                sector_contents_map
545                    .iter_s_bucket_piece_offsets(s_bucket)
546                    .expect("S-bucket guaranteed to be in range; qed")
547            })
548            .zip(s_buckets_region.as_chunks_mut::<{ RecordChunk::SIZE }>().0)
549        {
550            let next_record_chunks_offset =
551                &mut next_record_chunks_offset[usize::from(piece_offset)];
552
553            let chunk_position = *next_record_chunks_offset;
554            *next_record_chunks_offset += 1;
555            output.copy_from_slice(&raw_sector.records[usize::from(piece_offset)][chunk_position]);
556        }
557
558        let metadata_chunks = metadata_region
559            .as_chunks_mut::<{ RecordMetadata::encoded_size() }>()
560            .0;
561        for (record_metadata, output) in raw_sector.metadata.iter().zip(metadata_chunks) {
562            record_metadata.encode_to(&mut output.as_mut_slice());
563        }
564
565        // It would be more efficient to not re-read the whole sector again, but it makes above code
566        // significantly more convoluted and most likely not worth it
567        let (sector_contents, sector_checksum) =
568            sector_output.split_at_mut(sector_size - Blake3Hash::SIZE);
569        sector_checksum.copy_from_slice(
570            {
571                let mut hasher = blake3::Hasher::new();
572                hasher.update_rayon(sector_contents);
573                hasher.finalize()
574            }
575            .as_bytes(),
576        );
577    }
578
579    Ok(())
580}
581
582fn record_encoding<PosTable>(
583    pos_seed: &PosSeed,
584    record: &mut Record,
585    record_chunks_used: &mut SingleRecordBitArray,
586    table_generator: &PosTable::Generator,
587    erasure_coding: &ErasureCoding,
588) where
589    PosTable: Table,
590{
591    let pos_proofs = table_generator.create_proofs_parallel(pos_seed);
592
593    let mut parity_record_chunks = Record::new_boxed();
594
595    // Erasure code source record chunks
596    erasure_coding
597        .extend(record.iter(), parity_record_chunks.iter_mut())
598        .expect("Statically guaranteed valid inputs; qed");
599
600    record_chunks_used.data = pos_proofs.found_proofs;
601
602    // TODO: This can probably be optimized by using SIMD
603    let mut num_found_proofs = 0_usize;
604    for (s_buckets, found_proofs) in (0..Record::NUM_S_BUCKETS)
605        .array_chunks::<{ u8::BITS as usize }>()
606        .zip(pos_proofs.found_proofs)
607    {
608        for (proof_offset, s_bucket) in s_buckets.into_iter().enumerate() {
609            if (found_proofs & (1 << proof_offset)) != 0 {
610                let record_chunk = if s_bucket < Record::NUM_CHUNKS {
611                    record[s_bucket]
612                } else {
613                    parity_record_chunks[s_bucket - Record::NUM_CHUNKS]
614                };
615                // TODO: SIMD hashing
616                record[num_found_proofs] = (Simd::from(record_chunk)
617                    ^ Simd::from(*pos_proofs.proofs[num_found_proofs].hash()))
618                .to_array();
619                num_found_proofs += 1;
620            }
621        }
622    }
623}
624
625async fn download_sector_internal<PG>(
626    pieces_to_download: &mut HashMap<PieceIndex, Vec<(&mut Record, &mut RecordMetadata)>>,
627    piece_getter: &PG,
628    erasure_coding: &ErasureCoding,
629) -> Result<(), PlottingError>
630where
631    PG: PieceGetter + Send + Sync,
632{
633    // TODO: Make configurable, likely allowing user to specify RAM usage expectations and inferring
634    //  concurrency from there
635    let recovery_semaphore = &Semaphore::new(RECONSTRUCTION_CONCURRENCY_LIMIT);
636
637    // Allocate to decouple lifetime from `pieces_to_download` that will be modified below
638    let piece_indices = pieces_to_download.keys().copied().collect::<Vec<_>>();
639    let mut downloaded_pieces = piece_getter
640        .get_pieces(piece_indices)
641        .await
642        .map_err(|error| PlottingError::FailedToRetrievePieces { error })?
643        .fuse();
644    let mut reconstructed_pieces = FuturesUnordered::new();
645
646    let mut final_result = Ok(());
647
648    loop {
649        let (piece_index, result) = select! {
650            (piece_index, result) = downloaded_pieces.select_next_some() => {
651                match result {
652                    Ok(Some(piece)) => (piece_index, Ok(piece)),
653                    Ok(None) => {
654                        trace!(%piece_index, "Piece was not found, trying reconstruction");
655
656                        reconstructed_pieces.push(reconstruct_piece(
657                            piece_index,
658                            recovery_semaphore,
659                            piece_getter,
660                            erasure_coding,
661                        ));
662                        continue;
663                    }
664                    Err(error) => {
665                        trace!(
666                            %error,
667                            %piece_index,
668                            "Failed to download piece, trying reconstruction"
669                        );
670
671                        reconstructed_pieces.push(reconstruct_piece(
672                            piece_index,
673                            recovery_semaphore,
674                            piece_getter,
675                            erasure_coding,
676                        ));
677                        continue;
678                    }
679                }
680            },
681            (piece_index, result) = reconstructed_pieces.select_next_some() => {
682                (piece_index, result)
683            },
684            complete => {
685                break;
686            }
687        };
688
689        match result {
690            Ok(piece) => {
691                process_piece(piece_index, piece, pieces_to_download);
692            }
693            Err(error) => {
694                trace!(%error, %piece_index, "Failed to download piece");
695
696                if final_result.is_ok() {
697                    final_result = Err(error);
698                }
699            }
700        }
701    }
702
703    if final_result.is_ok() && !pieces_to_download.is_empty() {
704        return Err(PlottingError::FailedToRetrievePieces {
705            error: anyhow::anyhow!(
706                "Successful result, but not all pieces were downloaded, this is likely a piece \
707                getter implementation bug"
708            ),
709        });
710    }
711
712    final_result
713}
714
715async fn reconstruct_piece<PG>(
716    piece_index: PieceIndex,
717    recovery_semaphore: &Semaphore,
718    piece_getter: &PG,
719    erasure_coding: &ErasureCoding,
720) -> (PieceIndex, Result<Piece, PlottingError>)
721where
722    PG: PieceGetter + Send + Sync,
723{
724    let _permit = recovery_semaphore.acquire().await;
725    let recovered_piece_fut =
726        recover_missing_piece(piece_getter, erasure_coding.clone(), piece_index);
727
728    (
729        piece_index,
730        recovered_piece_fut
731            .await
732            .map_err(|error| PlottingError::PieceRecoveryFailed {
733                piece_index,
734                error: error.into(),
735            }),
736    )
737}
738
739fn process_piece(
740    piece_index: PieceIndex,
741    piece: Piece,
742    pieces_to_download: &mut HashMap<PieceIndex, Vec<(&mut Record, &mut RecordMetadata)>>,
743) {
744    for (record, metadata) in pieces_to_download.remove(&piece_index).unwrap_or_default() {
745        // Fancy way to insert value in order to avoid going through stack (if naive
746        // de-referencing is used) and potentially causing stack overflow as the
747        // result
748        record
749            .as_flattened_mut()
750            .copy_from_slice(piece.record().as_flattened());
751        *metadata = RecordMetadata {
752            root: *piece.root(),
753            parity_chunks_root: *piece.parity_chunks_root(),
754            proof: *piece.proof(),
755            piece_checksum: blake3::hash(piece.as_ref()).into(),
756        };
757    }
758}