ab_farmer_components/
plotting.rs

1//! Plotting utilities
2//!
3//! This module contains functions and data structures that can be used for plotting purposes
4//! (primarily with CPU).
5//!
6//! Plotted sectors can be written to plot and later [`read`](crate::reading) and/or
7//! [`audited`](crate::auditing)/[`proven`](crate::proving) using other modules of this crate.
8
9use crate::FarmerProtocolInfo;
10use crate::sector::{
11    FoundProofs, RawSector, RecordMetadata, SectorContentsMap, SectorMetadata,
12    SectorMetadataChecksummed, sector_record_chunks_size, sector_size,
13};
14use crate::segment_reconstruction::recover_missing_piece;
15use ab_core_primitives::hashes::Blake3Hash;
16use ab_core_primitives::pieces::{Piece, PieceIndex, PieceOffset, Record, RecordChunk};
17use ab_core_primitives::pos::PosSeed;
18use ab_core_primitives::sectors::{SBucket, SectorId, SectorIndex};
19use ab_core_primitives::segments::HistorySize;
20use ab_core_primitives::solutions::ShardCommitmentHash;
21use ab_data_retrieval::piece_getter::PieceGetter;
22use ab_erasure_coding::ErasureCoding;
23use ab_proof_of_space::{Table, TableGenerator};
24use async_lock::{Mutex as AsyncMutex, Semaphore};
25use backoff::future::retry;
26use backoff::{Error as BackoffError, ExponentialBackoff};
27use futures::stream::FuturesUnordered;
28use futures::{StreamExt, select};
29use parity_scale_codec::{Decode, Encode};
30use parking_lot::Mutex;
31use std::collections::HashMap;
32use std::simd::Simd;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, Ordering};
35use std::time::Duration;
36use thiserror::Error;
37use tracing::{debug, trace, warn};
38
39const RECONSTRUCTION_CONCURRENCY_LIMIT: usize = 1;
40
41fn default_backoff() -> ExponentialBackoff {
42    ExponentialBackoff {
43        initial_interval: Duration::from_secs(15),
44        max_interval: Duration::from_secs(10 * 60),
45        // Try until we get a valid piece
46        max_elapsed_time: None,
47        ..ExponentialBackoff::default()
48    }
49}
50
51/// Information about sector that was plotted
52#[derive(Debug, Clone, Encode, Decode)]
53pub struct PlottedSector {
54    /// Sector ID
55    pub sector_id: SectorId,
56    /// Sector index
57    pub sector_index: SectorIndex,
58    /// Sector metadata
59    pub sector_metadata: SectorMetadataChecksummed,
60    /// Indexes of pieces that were plotted
61    pub piece_indexes: Vec<PieceIndex>,
62}
63
64/// Plotting status
65#[derive(Debug, Error)]
66pub enum PlottingError {
67    /// Records encoder error
68    #[error("Records encoder error: {error}")]
69    RecordsEncoderError {
70        /// Lower-level error
71        error: anyhow::Error,
72    },
73    /// Bad sector output size
74    #[error("Bad sector output size: provided {provided}, expected {expected}")]
75    BadSectorOutputSize {
76        /// Actual size
77        provided: usize,
78        /// Expected size
79        expected: usize,
80    },
81    /// Can't recover missing piece
82    #[error("Can't recover missing piece {piece_index}: {error}")]
83    PieceRecoveryFailed {
84        /// Piece index
85        piece_index: PieceIndex,
86        /// Lower-level error
87        error: anyhow::Error,
88    },
89    /// Failed to retrieve piece
90    #[error("Failed to retrieve pieces: {error}")]
91    FailedToRetrievePieces {
92        /// Lower-level error
93        error: anyhow::Error,
94    },
95    /// Abort early
96    #[error("Abort early")]
97    AbortEarly,
98}
99
100/// Options for plotting a sector.
101///
102/// Sector output and sector metadata output should be either empty (in which case they'll be
103/// resized to correct size automatically) or correctly sized from the beginning or else error will
104/// be returned.
105#[derive(Debug)]
106pub struct PlotSectorOptions<'a, RE, PG> {
107    /// Public key corresponding to sector
108    pub public_key_hash: &'a Blake3Hash,
109    /// Root of the Merkle Tree of shard commitments
110    pub shard_commitments_root: &'a ShardCommitmentHash,
111    /// Sector index
112    pub sector_index: SectorIndex,
113    /// Getter for pieces of archival history
114    pub piece_getter: &'a PG,
115    /// Farmer protocol info
116    pub farmer_protocol_info: FarmerProtocolInfo,
117    /// Erasure coding instance
118    pub erasure_coding: &'a ErasureCoding,
119    /// How many pieces should sector contain
120    pub pieces_in_sector: u16,
121    /// Where plotted sector should be written, vector must either be empty (in which case it'll be
122    /// resized to correct size automatically) or correctly sized from the beginning
123    pub sector_output: &'a mut Vec<u8>,
124    /// Semaphore for part of the plotting when farmer downloads new sector, allows to limit memory
125    /// usage of the plotting process, permit will be held until the end of the plotting process
126    pub downloading_semaphore: Option<Arc<Semaphore>>,
127    /// Semaphore for part of the plotting when farmer encodes downloaded sector, should typically
128    /// allow one permit at a time for efficient CPU utilization
129    pub encoding_semaphore: Option<&'a Semaphore>,
130    /// Proof of space table generators
131    pub records_encoder: &'a mut RE,
132    /// Whether encoding should be aborted early
133    pub abort_early: &'a AtomicBool,
134}
135
136/// Plot a single sector.
137///
138/// This is a convenient wrapper around [`download_sector`] and [`encode_sector`] functions.
139///
140/// NOTE: Even though this function is async, it has blocking code inside and must be running in a
141/// separate thread in order to prevent blocking an executor.
142pub async fn plot_sector<RE, PG>(
143    options: PlotSectorOptions<'_, RE, PG>,
144) -> Result<PlottedSector, PlottingError>
145where
146    RE: RecordsEncoder,
147    PG: PieceGetter + Send + Sync,
148{
149    let PlotSectorOptions {
150        public_key_hash,
151        shard_commitments_root,
152        sector_index,
153        piece_getter,
154        farmer_protocol_info,
155        erasure_coding,
156        pieces_in_sector,
157        sector_output,
158        downloading_semaphore,
159        encoding_semaphore,
160        records_encoder,
161        abort_early,
162    } = options;
163
164    let _downloading_permit = match downloading_semaphore {
165        Some(downloading_semaphore) => Some(downloading_semaphore.acquire_arc().await),
166        None => None,
167    };
168
169    let download_sector_fut = download_sector(DownloadSectorOptions {
170        public_key_hash,
171        shard_commitments_root,
172        sector_index,
173        piece_getter,
174        farmer_protocol_info,
175        erasure_coding,
176        pieces_in_sector,
177    });
178
179    let _encoding_permit = match encoding_semaphore {
180        Some(encoding_semaphore) => Some(encoding_semaphore.acquire().await),
181        None => None,
182    };
183
184    let encoded_sector = encode_sector(
185        download_sector_fut.await?,
186        EncodeSectorOptions::<RE> {
187            sector_index,
188            records_encoder,
189            abort_early,
190        },
191    )?;
192
193    if abort_early.load(Ordering::Acquire) {
194        return Err(PlottingError::AbortEarly);
195    }
196
197    write_sector(&encoded_sector, sector_output)?;
198
199    Ok(encoded_sector.plotted_sector)
200}
201
202/// Opaque sector downloading result and ready for writing
203#[derive(Debug)]
204pub struct DownloadedSector {
205    sector_id: SectorId,
206    piece_indices: Vec<PieceIndex>,
207    raw_sector: RawSector,
208    history_size: HistorySize,
209}
210
211/// Options for sector downloading
212#[derive(Debug)]
213pub struct DownloadSectorOptions<'a, PG> {
214    /// Public key corresponding to sector
215    pub public_key_hash: &'a Blake3Hash,
216    /// Root of the Merkle Tree of shard commitments
217    pub shard_commitments_root: &'a ShardCommitmentHash,
218    /// Sector index
219    pub sector_index: SectorIndex,
220    /// Getter for pieces of archival history
221    pub piece_getter: &'a PG,
222    /// Farmer protocol info
223    pub farmer_protocol_info: FarmerProtocolInfo,
224    /// Erasure coding instance
225    pub erasure_coding: &'a ErasureCoding,
226    /// How many pieces should sector contain
227    pub pieces_in_sector: u16,
228}
229
230/// Download sector for plotting.
231///
232/// This will identify necessary pieces and download them using provided piece getter, after which
233/// they can be encoded using [`encode_sector`] and written to the plot.
234pub async fn download_sector<PG>(
235    options: DownloadSectorOptions<'_, PG>,
236) -> Result<DownloadedSector, PlottingError>
237where
238    PG: PieceGetter + Send + Sync,
239{
240    let DownloadSectorOptions {
241        public_key_hash,
242        shard_commitments_root,
243        sector_index,
244        piece_getter,
245        farmer_protocol_info,
246        erasure_coding,
247        pieces_in_sector,
248    } = options;
249
250    let sector_id = SectorId::new(
251        public_key_hash,
252        shard_commitments_root,
253        sector_index,
254        farmer_protocol_info.history_size,
255    );
256
257    let piece_indices = (PieceOffset::ZERO..)
258        .take(pieces_in_sector.into())
259        .map(|piece_offset| {
260            sector_id.derive_piece_index(
261                piece_offset,
262                farmer_protocol_info.history_size,
263                farmer_protocol_info.max_pieces_in_sector,
264                farmer_protocol_info.recent_segments,
265                farmer_protocol_info.recent_history_fraction,
266            )
267        })
268        .collect::<Vec<_>>();
269
270    let raw_sector = {
271        let mut raw_sector = RawSector::new(pieces_in_sector);
272        let mut pieces_to_download =
273            HashMap::<PieceIndex, Vec<_>>::with_capacity(usize::from(pieces_in_sector));
274        for (piece_index, (record, metadata)) in piece_indices
275            .iter()
276            .copied()
277            .zip(raw_sector.records.iter_mut().zip(&mut raw_sector.metadata))
278        {
279            pieces_to_download
280                .entry(piece_index)
281                .or_default()
282                .push((record, metadata));
283        }
284        // This map will be mutated, removing piece indices we have already processed
285        let pieces_to_download = AsyncMutex::new(pieces_to_download);
286
287        retry(default_backoff(), || async {
288            let mut pieces_to_download = pieces_to_download.lock().await;
289
290            if let Err(error) =
291                download_sector_internal(&mut pieces_to_download, piece_getter, erasure_coding)
292                    .await
293            {
294                warn!(
295                    %sector_index,
296                    %error,
297                    %pieces_in_sector,
298                    remaining_pieces = %pieces_to_download.len(),
299                    "Sector downloading attempt failed, will retry later"
300                );
301
302                return Err(BackoffError::transient(error));
303            }
304
305            debug!(%sector_index, "Sector downloaded successfully");
306
307            Ok(())
308        })
309        .await?;
310
311        raw_sector
312    };
313
314    Ok(DownloadedSector {
315        sector_id,
316        piece_indices,
317        raw_sector,
318        history_size: farmer_protocol_info.history_size,
319    })
320}
321
322/// Records encoder for plotting purposes
323pub trait RecordsEncoder {
324    /// Encode provided sector records
325    fn encode_records(
326        &mut self,
327        sector_id: &SectorId,
328        records: &mut [Record],
329        abort_early: &AtomicBool,
330    ) -> anyhow::Result<SectorContentsMap>;
331}
332
333/// CPU implementation of [`RecordsEncoder`]
334#[derive(Debug)]
335pub struct CpuRecordsEncoder<'a, PosTable>
336where
337    PosTable: Table,
338{
339    table_generators: &'a [PosTable::Generator],
340    erasure_coding: &'a ErasureCoding,
341    global_mutex: &'a AsyncMutex<()>,
342}
343
344impl<PosTable> RecordsEncoder for CpuRecordsEncoder<'_, PosTable>
345where
346    PosTable: Table,
347{
348    fn encode_records(
349        &mut self,
350        sector_id: &SectorId,
351        records: &mut [Record],
352        abort_early: &AtomicBool,
353    ) -> anyhow::Result<SectorContentsMap> {
354        if self.table_generators.is_empty() {
355            return Err(anyhow::anyhow!("No table generators"));
356        }
357
358        let pieces_in_sector = records
359            .len()
360            .try_into()
361            .map_err(|error| anyhow::anyhow!("Failed to convert pieces in sector: {error}"))?;
362        let mut sector_contents_map = SectorContentsMap::new(pieces_in_sector);
363
364        {
365            let global_mutex = self.global_mutex;
366            let erasure_coding = self.erasure_coding;
367
368            let iter = Mutex::new(
369                (PieceOffset::ZERO..)
370                    .zip(records.iter_mut())
371                    .zip(sector_contents_map.iter_record_chunks_used_mut()),
372            );
373
374            rayon::scope(|scope| {
375                for table_generator in self.table_generators {
376                    scope.spawn(|_scope| {
377                        loop {
378                            // Take mutex briefly to make sure encoding is allowed right now
379                            global_mutex.lock_blocking();
380
381                            // This instead of `while` above because otherwise mutex will be held
382                            // for the duration of the loop and will limit concurrency to 1 record
383                            let Some(((piece_offset, record), record_chunks_used)) =
384                                iter.lock().next()
385                            else {
386                                return;
387                            };
388                            let pos_seed = sector_id.derive_evaluation_seed(piece_offset);
389
390                            record_encoding::<PosTable>(
391                                &pos_seed,
392                                record,
393                                record_chunks_used,
394                                table_generator,
395                                erasure_coding,
396                            );
397
398                            if abort_early.load(Ordering::Relaxed) {
399                                return;
400                            }
401                        }
402                    });
403                }
404            });
405        }
406
407        Ok(sector_contents_map)
408    }
409}
410
411impl<'a, PosTable> CpuRecordsEncoder<'a, PosTable>
412where
413    PosTable: Table,
414{
415    /// Create a new instance
416    pub fn new(
417        table_generators: &'a [PosTable::Generator],
418        erasure_coding: &'a ErasureCoding,
419        global_mutex: &'a AsyncMutex<()>,
420    ) -> Self {
421        Self {
422            table_generators,
423            erasure_coding,
424            global_mutex,
425        }
426    }
427}
428
429/// Options for encoding a sector.
430///
431/// Sector output and sector metadata output should be either empty (in which case they'll be
432/// resized to correct size automatically) or correctly sized from the beginning or else error will
433/// be returned.
434#[derive(Debug)]
435pub struct EncodeSectorOptions<'a, RE>
436where
437    RE: RecordsEncoder,
438{
439    /// Sector index
440    pub sector_index: SectorIndex,
441    /// Records encoding instance
442    pub records_encoder: &'a mut RE,
443    /// Whether encoding should be aborted early
444    pub abort_early: &'a AtomicBool,
445}
446
447/// Mostly opaque sector encoding result ready for writing
448#[derive(Debug)]
449pub struct EncodedSector {
450    /// Information about sector that was plotted
451    pub plotted_sector: PlottedSector,
452    raw_sector: RawSector,
453    sector_contents_map: SectorContentsMap,
454}
455
456/// Encode downloaded sector.
457///
458/// This function encodes downloaded sector records and returns sector encoding result that can be
459/// written using [`write_sector`].
460pub fn encode_sector<RE>(
461    downloaded_sector: DownloadedSector,
462    encoding_options: EncodeSectorOptions<'_, RE>,
463) -> Result<EncodedSector, PlottingError>
464where
465    RE: RecordsEncoder,
466{
467    let DownloadedSector {
468        sector_id,
469        piece_indices,
470        mut raw_sector,
471        history_size,
472    } = downloaded_sector;
473    let EncodeSectorOptions {
474        sector_index,
475        records_encoder,
476        abort_early,
477    } = encoding_options;
478
479    let pieces_in_sector = raw_sector.records.len().try_into().expect(
480        "Raw sector can only be created in this crate and it is always done correctly; qed",
481    );
482
483    let sector_contents_map = records_encoder
484        .encode_records(&sector_id, &mut raw_sector.records, abort_early)
485        .map_err(|error| PlottingError::RecordsEncoderError { error })?;
486
487    let sector_metadata = SectorMetadataChecksummed::from(SectorMetadata {
488        sector_index,
489        pieces_in_sector,
490        s_bucket_sizes: sector_contents_map.s_bucket_sizes(),
491        history_size,
492    });
493
494    Ok(EncodedSector {
495        plotted_sector: PlottedSector {
496            sector_id,
497            sector_index,
498            sector_metadata,
499            piece_indexes: piece_indices,
500        },
501        raw_sector,
502        sector_contents_map,
503    })
504}
505
506/// Write encoded sector into sector output
507pub fn write_sector(
508    encoded_sector: &EncodedSector,
509    sector_output: &mut Vec<u8>,
510) -> Result<(), PlottingError> {
511    let EncodedSector {
512        plotted_sector: _,
513        raw_sector,
514        sector_contents_map,
515    } = encoded_sector;
516
517    let pieces_in_sector = raw_sector.records.len().try_into().expect(
518        "Raw sector can only be created in this crate and it is always done correctly; qed",
519    );
520
521    let sector_size = sector_size(pieces_in_sector);
522
523    if !sector_output.is_empty() && sector_output.len() != sector_size {
524        return Err(PlottingError::BadSectorOutputSize {
525            provided: sector_output.len(),
526            expected: sector_size,
527        });
528    }
529
530    sector_output.resize(sector_size, 0);
531
532    // Write sector to disk in as the following regions:
533    // * sector contents map
534    // * record chunks as s-buckets
535    // * record metadata
536    // * checksum
537    {
538        let (sector_contents_map_region, remaining_bytes) =
539            sector_output.split_at_mut(SectorContentsMap::encoded_size(pieces_in_sector));
540        // Slice remaining memory into belonging to s-buckets and metadata
541        let (s_buckets_region, metadata_region) =
542            remaining_bytes.split_at_mut(sector_record_chunks_size(pieces_in_sector));
543
544        // Write sector contents map so we can decode it later
545        sector_contents_map
546            .encode_into(sector_contents_map_region)
547            .expect("Chunked into correct size above; qed");
548
549        let mut next_record_chunks_offset = vec![0_usize; pieces_in_sector.into()];
550        // Write record chunks, one s-bucket at a time
551        for (piece_offset, output) in (SBucket::ZERO..=SBucket::MAX)
552            .flat_map(|s_bucket| {
553                sector_contents_map
554                    .iter_s_bucket_piece_offsets(s_bucket)
555                    .expect("S-bucket guaranteed to be in range; qed")
556            })
557            .zip(s_buckets_region.as_chunks_mut::<{ RecordChunk::SIZE }>().0)
558        {
559            let next_record_chunks_offset =
560                &mut next_record_chunks_offset[usize::from(piece_offset)];
561
562            let chunk_position = *next_record_chunks_offset;
563            *next_record_chunks_offset += 1;
564            output.copy_from_slice(&raw_sector.records[usize::from(piece_offset)][chunk_position]);
565        }
566
567        let metadata_chunks = metadata_region
568            .as_chunks_mut::<{ RecordMetadata::encoded_size() }>()
569            .0;
570        for (record_metadata, output) in raw_sector.metadata.iter().zip(metadata_chunks) {
571            record_metadata.encode_to(&mut output.as_mut_slice());
572        }
573
574        // It would be more efficient to not re-read the whole sector again, but it makes above code
575        // significantly more convoluted and most likely not worth it
576        let (sector_contents, sector_checksum) =
577            sector_output.split_at_mut(sector_size - Blake3Hash::SIZE);
578        sector_checksum.copy_from_slice(
579            {
580                let mut hasher = blake3::Hasher::new();
581                hasher.update_rayon(sector_contents);
582                hasher.finalize()
583            }
584            .as_bytes(),
585        );
586    }
587
588    Ok(())
589}
590
591fn record_encoding<PosTable>(
592    pos_seed: &PosSeed,
593    record: &mut Record,
594    record_chunks_used: &mut FoundProofs,
595    table_generator: &PosTable::Generator,
596    erasure_coding: &ErasureCoding,
597) where
598    PosTable: Table,
599{
600    let pos_proofs = table_generator.create_proofs_parallel(pos_seed);
601
602    let mut parity_record_chunks = Record::new_boxed();
603
604    // Erasure code source record chunks
605    erasure_coding
606        .extend(record.iter(), parity_record_chunks.iter_mut())
607        .expect("Statically guaranteed valid inputs; qed");
608
609    *record_chunks_used = pos_proofs.found_proofs;
610
611    // TODO: This can probably be optimized by using SIMD
612    let mut num_found_proofs = 0_usize;
613    for (s_buckets, found_proofs) in (0..Record::NUM_S_BUCKETS)
614        .array_chunks::<{ u8::BITS as usize }>()
615        .zip(pos_proofs.found_proofs)
616    {
617        for (proof_offset, s_bucket) in s_buckets.into_iter().enumerate() {
618            if (found_proofs & (1 << proof_offset)) != 0 {
619                let record_chunk = if s_bucket < Record::NUM_CHUNKS {
620                    record[s_bucket]
621                } else {
622                    parity_record_chunks[s_bucket - Record::NUM_CHUNKS]
623                };
624                // TODO: SIMD hashing
625                record[num_found_proofs] = (Simd::from(record_chunk)
626                    ^ Simd::from(*pos_proofs.proofs[num_found_proofs].hash()))
627                .to_array();
628                num_found_proofs += 1;
629            }
630        }
631    }
632}
633
634async fn download_sector_internal<PG>(
635    pieces_to_download: &mut HashMap<PieceIndex, Vec<(&mut Record, &mut RecordMetadata)>>,
636    piece_getter: &PG,
637    erasure_coding: &ErasureCoding,
638) -> Result<(), PlottingError>
639where
640    PG: PieceGetter + Send + Sync,
641{
642    // TODO: Make configurable, likely allowing user to specify RAM usage expectations and inferring
643    //  concurrency from there
644    let recovery_semaphore = &Semaphore::new(RECONSTRUCTION_CONCURRENCY_LIMIT);
645
646    // Allocate to decouple lifetime from `pieces_to_download` that will be modified below
647    let piece_indices = pieces_to_download.keys().copied().collect::<Vec<_>>();
648    let mut downloaded_pieces = piece_getter
649        .get_pieces(piece_indices)
650        .await
651        .map_err(|error| PlottingError::FailedToRetrievePieces { error })?
652        .fuse();
653    let mut reconstructed_pieces = FuturesUnordered::new();
654
655    let mut final_result = Ok(());
656
657    loop {
658        let (piece_index, result) = select! {
659            (piece_index, result) = downloaded_pieces.select_next_some() => {
660                match result {
661                    Ok(Some(piece)) => (piece_index, Ok(piece)),
662                    Ok(None) => {
663                        trace!(%piece_index, "Piece was not found, trying reconstruction");
664
665                        reconstructed_pieces.push(reconstruct_piece(
666                            piece_index,
667                            recovery_semaphore,
668                            piece_getter,
669                            erasure_coding,
670                        ));
671                        continue;
672                    }
673                    Err(error) => {
674                        trace!(
675                            %error,
676                            %piece_index,
677                            "Failed to download piece, trying reconstruction"
678                        );
679
680                        reconstructed_pieces.push(reconstruct_piece(
681                            piece_index,
682                            recovery_semaphore,
683                            piece_getter,
684                            erasure_coding,
685                        ));
686                        continue;
687                    }
688                }
689            },
690            (piece_index, result) = reconstructed_pieces.select_next_some() => {
691                (piece_index, result)
692            },
693            complete => {
694                break;
695            }
696        };
697
698        match result {
699            Ok(piece) => {
700                process_piece(piece_index, piece, pieces_to_download);
701            }
702            Err(error) => {
703                trace!(%error, %piece_index, "Failed to download piece");
704
705                if final_result.is_ok() {
706                    final_result = Err(error);
707                }
708            }
709        }
710    }
711
712    if final_result.is_ok() && !pieces_to_download.is_empty() {
713        return Err(PlottingError::FailedToRetrievePieces {
714            error: anyhow::anyhow!(
715                "Successful result, but not all pieces were downloaded, this is likely a piece \
716                getter implementation bug"
717            ),
718        });
719    }
720
721    final_result
722}
723
724async fn reconstruct_piece<PG>(
725    piece_index: PieceIndex,
726    recovery_semaphore: &Semaphore,
727    piece_getter: &PG,
728    erasure_coding: &ErasureCoding,
729) -> (PieceIndex, Result<Piece, PlottingError>)
730where
731    PG: PieceGetter + Send + Sync,
732{
733    let _permit = recovery_semaphore.acquire().await;
734    let recovered_piece_fut =
735        recover_missing_piece(piece_getter, erasure_coding.clone(), piece_index);
736
737    (
738        piece_index,
739        recovered_piece_fut
740            .await
741            .map_err(|error| PlottingError::PieceRecoveryFailed {
742                piece_index,
743                error: error.into(),
744            }),
745    )
746}
747
748fn process_piece(
749    piece_index: PieceIndex,
750    piece: Piece,
751    pieces_to_download: &mut HashMap<PieceIndex, Vec<(&mut Record, &mut RecordMetadata)>>,
752) {
753    for (record, metadata) in pieces_to_download.remove(&piece_index).unwrap_or_default() {
754        // Fancy way to insert value in order to avoid going through stack (if naive
755        // de-referencing is used) and potentially causing stack overflow as the
756        // result
757        record
758            .as_flattened_mut()
759            .copy_from_slice(piece.record().as_flattened());
760        *metadata = RecordMetadata {
761            root: *piece.root(),
762            parity_chunks_root: *piece.parity_chunks_root(),
763            proof: *piece.proof(),
764            piece_checksum: blake3::hash(piece.as_ref()).into(),
765        };
766    }
767}