ab_farmer_components/
plotting.rs

1//! Plotting utilities
2//!
3//! This module contains functions and data structures that can be used for plotting purposes
4//! (primarily with CPU).
5//!
6//! Plotted sectors can be written to plot and later [`read`](crate::reading) and/or
7//! [`audited`](crate::auditing)/[`proven`](crate::proving) using other modules of this crate.
8
9use crate::FarmerProtocolInfo;
10use crate::sector::{
11    EncodedChunksUsed, RawSector, RecordMetadata, SectorContentsMap, SectorMetadata,
12    SectorMetadataChecksummed, sector_record_chunks_size, sector_size,
13};
14use crate::segment_reconstruction::recover_missing_piece;
15use ab_core_primitives::hashes::Blake3Hash;
16use ab_core_primitives::pieces::{Piece, PieceIndex, PieceOffset, Record, RecordChunk};
17use ab_core_primitives::pos::PosSeed;
18use ab_core_primitives::sectors::{SBucket, SectorId, SectorIndex};
19use ab_core_primitives::segments::HistorySize;
20use ab_data_retrieval::piece_getter::PieceGetter;
21use ab_erasure_coding::ErasureCoding;
22use ab_proof_of_space::{Table, TableGenerator};
23use async_lock::{Mutex as AsyncMutex, Semaphore};
24use backoff::future::retry;
25use backoff::{Error as BackoffError, ExponentialBackoff};
26use futures::stream::FuturesUnordered;
27use futures::{StreamExt, select};
28use parity_scale_codec::{Decode, Encode};
29use parking_lot::Mutex;
30use rayon::prelude::*;
31use std::collections::HashMap;
32use std::simd::Simd;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, Ordering};
35use std::time::Duration;
36use thiserror::Error;
37use tracing::{debug, trace, warn};
38
39const RECONSTRUCTION_CONCURRENCY_LIMIT: usize = 1;
40
41fn default_backoff() -> ExponentialBackoff {
42    ExponentialBackoff {
43        initial_interval: Duration::from_secs(15),
44        max_interval: Duration::from_secs(10 * 60),
45        // Try until we get a valid piece
46        max_elapsed_time: None,
47        ..ExponentialBackoff::default()
48    }
49}
50
51/// Information about sector that was plotted
52#[derive(Debug, Clone, Encode, Decode)]
53pub struct PlottedSector {
54    /// Sector ID
55    pub sector_id: SectorId,
56    /// Sector index
57    pub sector_index: SectorIndex,
58    /// Sector metadata
59    pub sector_metadata: SectorMetadataChecksummed,
60    /// Indexes of pieces that were plotted
61    pub piece_indexes: Vec<PieceIndex>,
62}
63
64/// Plotting status
65#[derive(Debug, Error)]
66pub enum PlottingError {
67    /// Records encoder error
68    #[error("Records encoder error: {error}")]
69    RecordsEncoderError {
70        /// Lower-level error
71        error: anyhow::Error,
72    },
73    /// Bad sector output size
74    #[error("Bad sector output size: provided {provided}, expected {expected}")]
75    BadSectorOutputSize {
76        /// Actual size
77        provided: usize,
78        /// Expected size
79        expected: usize,
80    },
81    /// Can't recover missing piece
82    #[error("Can't recover missing piece {piece_index}: {error}")]
83    PieceRecoveryFailed {
84        /// Piece index
85        piece_index: PieceIndex,
86        /// Lower-level error
87        error: anyhow::Error,
88    },
89    /// Failed to retrieve piece
90    #[error("Failed to retrieve pieces: {error}")]
91    FailedToRetrievePieces {
92        /// Lower-level error
93        error: anyhow::Error,
94    },
95    /// Abort early
96    #[error("Abort early")]
97    AbortEarly,
98}
99
100/// Options for plotting a sector.
101///
102/// Sector output and sector metadata output should be either empty (in which case they'll be
103/// resized to correct size automatically) or correctly sized from the beginning or else error will
104/// be returned.
105#[derive(Debug)]
106pub struct PlotSectorOptions<'a, RE, PG> {
107    /// Public key corresponding to sector
108    pub public_key_hash: &'a Blake3Hash,
109    /// Sector index
110    pub sector_index: SectorIndex,
111    /// Getter for pieces of archival history
112    pub piece_getter: &'a PG,
113    /// Farmer protocol info
114    pub farmer_protocol_info: FarmerProtocolInfo,
115    /// Erasure coding instance
116    pub erasure_coding: &'a ErasureCoding,
117    /// How many pieces should sector contain
118    pub pieces_in_sector: u16,
119    /// Where plotted sector should be written, vector must either be empty (in which case it'll be
120    /// resized to correct size automatically) or correctly sized from the beginning
121    pub sector_output: &'a mut Vec<u8>,
122    /// Semaphore for part of the plotting when farmer downloads new sector, allows to limit memory
123    /// usage of the plotting process, permit will be held until the end of the plotting process
124    pub downloading_semaphore: Option<Arc<Semaphore>>,
125    /// Semaphore for part of the plotting when farmer encodes downloaded sector, should typically
126    /// allow one permit at a time for efficient CPU utilization
127    pub encoding_semaphore: Option<&'a Semaphore>,
128    /// Proof of space table generators
129    pub records_encoder: &'a mut RE,
130    /// Whether encoding should be aborted early
131    pub abort_early: &'a AtomicBool,
132}
133
134/// Plot a single sector.
135///
136/// This is a convenient wrapper around [`download_sector`] and [`encode_sector`] functions.
137///
138/// NOTE: Even though this function is async, it has blocking code inside and must be running in a
139/// separate thread in order to prevent blocking an executor.
140pub async fn plot_sector<RE, PG>(
141    options: PlotSectorOptions<'_, RE, PG>,
142) -> Result<PlottedSector, PlottingError>
143where
144    RE: RecordsEncoder,
145    PG: PieceGetter + Send + Sync,
146{
147    let PlotSectorOptions {
148        public_key_hash,
149        sector_index,
150        piece_getter,
151        farmer_protocol_info,
152        erasure_coding,
153        pieces_in_sector,
154        sector_output,
155        downloading_semaphore,
156        encoding_semaphore,
157        records_encoder,
158        abort_early,
159    } = options;
160
161    let _downloading_permit = match downloading_semaphore {
162        Some(downloading_semaphore) => Some(downloading_semaphore.acquire_arc().await),
163        None => None,
164    };
165
166    let download_sector_fut = download_sector(DownloadSectorOptions {
167        public_key_hash,
168        sector_index,
169        piece_getter,
170        farmer_protocol_info,
171        erasure_coding,
172        pieces_in_sector,
173    });
174
175    let _encoding_permit = match encoding_semaphore {
176        Some(encoding_semaphore) => Some(encoding_semaphore.acquire().await),
177        None => None,
178    };
179
180    let encoded_sector = encode_sector(
181        download_sector_fut.await?,
182        EncodeSectorOptions::<RE> {
183            sector_index,
184            records_encoder,
185            abort_early,
186        },
187    )?;
188
189    if abort_early.load(Ordering::Acquire) {
190        return Err(PlottingError::AbortEarly);
191    }
192
193    write_sector(&encoded_sector, sector_output)?;
194
195    Ok(encoded_sector.plotted_sector)
196}
197
198/// Opaque sector downloading result and ready for writing
199#[derive(Debug)]
200pub struct DownloadedSector {
201    sector_id: SectorId,
202    piece_indices: Vec<PieceIndex>,
203    raw_sector: RawSector,
204    history_size: HistorySize,
205}
206
207/// Options for sector downloading
208#[derive(Debug)]
209pub struct DownloadSectorOptions<'a, PG> {
210    /// Public key corresponding to sector
211    pub public_key_hash: &'a Blake3Hash,
212    /// Sector index
213    pub sector_index: SectorIndex,
214    /// Getter for pieces of archival history
215    pub piece_getter: &'a PG,
216    /// Farmer protocol info
217    pub farmer_protocol_info: FarmerProtocolInfo,
218    /// Erasure coding instance
219    pub erasure_coding: &'a ErasureCoding,
220    /// How many pieces should sector contain
221    pub pieces_in_sector: u16,
222}
223
224/// Download sector for plotting.
225///
226/// This will identify necessary pieces and download them using provided piece getter, after which
227/// they can be encoded using [`encode_sector`] and written to the plot.
228pub async fn download_sector<PG>(
229    options: DownloadSectorOptions<'_, PG>,
230) -> Result<DownloadedSector, PlottingError>
231where
232    PG: PieceGetter + Send + Sync,
233{
234    let DownloadSectorOptions {
235        public_key_hash,
236        sector_index,
237        piece_getter,
238        farmer_protocol_info,
239        erasure_coding,
240        pieces_in_sector,
241    } = options;
242
243    let sector_id = SectorId::new(
244        public_key_hash,
245        sector_index,
246        farmer_protocol_info.history_size,
247    );
248
249    let piece_indices = (PieceOffset::ZERO..)
250        .take(pieces_in_sector.into())
251        .map(|piece_offset| {
252            sector_id.derive_piece_index(
253                piece_offset,
254                farmer_protocol_info.history_size,
255                farmer_protocol_info.max_pieces_in_sector,
256                farmer_protocol_info.recent_segments,
257                farmer_protocol_info.recent_history_fraction,
258            )
259        })
260        .collect::<Vec<_>>();
261
262    let raw_sector = {
263        let mut raw_sector = RawSector::new(pieces_in_sector);
264        let mut pieces_to_download =
265            HashMap::<PieceIndex, Vec<_>>::with_capacity(usize::from(pieces_in_sector));
266        for (piece_index, (record, metadata)) in piece_indices
267            .iter()
268            .copied()
269            .zip(raw_sector.records.iter_mut().zip(&mut raw_sector.metadata))
270        {
271            pieces_to_download
272                .entry(piece_index)
273                .or_default()
274                .push((record, metadata));
275        }
276        // This map will be mutated, removing piece indices we have already processed
277        let pieces_to_download = AsyncMutex::new(pieces_to_download);
278
279        retry(default_backoff(), || async {
280            let mut pieces_to_download = pieces_to_download.lock().await;
281
282            if let Err(error) =
283                download_sector_internal(&mut pieces_to_download, piece_getter, erasure_coding)
284                    .await
285            {
286                warn!(
287                    %sector_index,
288                    %error,
289                    %pieces_in_sector,
290                    remaining_pieces = %pieces_to_download.len(),
291                    "Sector downloading attempt failed, will retry later"
292                );
293
294                return Err(BackoffError::transient(error));
295            }
296
297            debug!(%sector_index, "Sector downloaded successfully");
298
299            Ok(())
300        })
301        .await?;
302
303        raw_sector
304    };
305
306    Ok(DownloadedSector {
307        sector_id,
308        piece_indices,
309        raw_sector,
310        history_size: farmer_protocol_info.history_size,
311    })
312}
313
314/// Records encoder for plotting purposes
315pub trait RecordsEncoder {
316    /// Encode provided sector records
317    fn encode_records(
318        &mut self,
319        sector_id: &SectorId,
320        records: &mut [Record],
321        abort_early: &AtomicBool,
322    ) -> anyhow::Result<SectorContentsMap>;
323}
324
325/// CPU implementation of [`RecordsEncoder`]
326#[derive(Debug)]
327pub struct CpuRecordsEncoder<'a, PosTable>
328where
329    PosTable: Table,
330{
331    table_generators: &'a mut [PosTable::Generator],
332    erasure_coding: &'a ErasureCoding,
333    global_mutex: &'a AsyncMutex<()>,
334}
335
336impl<PosTable> RecordsEncoder for CpuRecordsEncoder<'_, PosTable>
337where
338    PosTable: Table,
339{
340    fn encode_records(
341        &mut self,
342        sector_id: &SectorId,
343        records: &mut [Record],
344        abort_early: &AtomicBool,
345    ) -> anyhow::Result<SectorContentsMap> {
346        if self.table_generators.is_empty() {
347            return Err(anyhow::anyhow!("No table generators"));
348        }
349
350        let pieces_in_sector = records
351            .len()
352            .try_into()
353            .map_err(|error| anyhow::anyhow!("Failed to convert pieces in sector: {error}"))?;
354        let mut sector_contents_map = SectorContentsMap::new(pieces_in_sector);
355
356        {
357            let table_generators = &mut *self.table_generators;
358            let global_mutex = self.global_mutex;
359            let erasure_coding = self.erasure_coding;
360
361            let iter = Mutex::new(
362                (PieceOffset::ZERO..)
363                    .zip(records.iter_mut())
364                    .zip(sector_contents_map.iter_record_bitfields_mut()),
365            );
366
367            rayon::scope(|scope| {
368                for table_generator in table_generators {
369                    scope.spawn(|_scope| {
370                        let mut chunks_scratch = Vec::with_capacity(Record::NUM_S_BUCKETS);
371
372                        loop {
373                            // Take mutex briefly to make sure encoding is allowed right now
374                            global_mutex.lock_blocking();
375
376                            // This instead of `while` above because otherwise mutex will be held
377                            // for the duration of the loop and will limit concurrency to 1 record
378                            let Some(((piece_offset, record), encoded_chunks_used)) =
379                                iter.lock().next()
380                            else {
381                                return;
382                            };
383                            let pos_seed = sector_id.derive_evaluation_seed(piece_offset);
384
385                            record_encoding::<PosTable>(
386                                &pos_seed,
387                                record,
388                                encoded_chunks_used,
389                                table_generator,
390                                erasure_coding,
391                                &mut chunks_scratch,
392                            );
393
394                            if abort_early.load(Ordering::Relaxed) {
395                                return;
396                            }
397                        }
398                    });
399                }
400            });
401        }
402
403        Ok(sector_contents_map)
404    }
405}
406
407impl<'a, PosTable> CpuRecordsEncoder<'a, PosTable>
408where
409    PosTable: Table,
410{
411    /// Create new instance
412    pub fn new(
413        table_generators: &'a mut [PosTable::Generator],
414        erasure_coding: &'a ErasureCoding,
415        global_mutex: &'a AsyncMutex<()>,
416    ) -> Self {
417        Self {
418            table_generators,
419            erasure_coding,
420            global_mutex,
421        }
422    }
423}
424
425/// Options for encoding a sector.
426///
427/// Sector output and sector metadata output should be either empty (in which case they'll be
428/// resized to correct size automatically) or correctly sized from the beginning or else error will
429/// be returned.
430#[derive(Debug)]
431pub struct EncodeSectorOptions<'a, RE>
432where
433    RE: RecordsEncoder,
434{
435    /// Sector index
436    pub sector_index: SectorIndex,
437    /// Records encoding instance
438    pub records_encoder: &'a mut RE,
439    /// Whether encoding should be aborted early
440    pub abort_early: &'a AtomicBool,
441}
442
443/// Mostly opaque sector encoding result ready for writing
444#[derive(Debug)]
445pub struct EncodedSector {
446    /// Information about sector that was plotted
447    pub plotted_sector: PlottedSector,
448    raw_sector: RawSector,
449    sector_contents_map: SectorContentsMap,
450}
451
452/// Encode downloaded sector.
453///
454/// This function encodes downloaded sector records and returns sector encoding result that can be
455/// written using [`write_sector`].
456pub fn encode_sector<RE>(
457    downloaded_sector: DownloadedSector,
458    encoding_options: EncodeSectorOptions<'_, RE>,
459) -> Result<EncodedSector, PlottingError>
460where
461    RE: RecordsEncoder,
462{
463    let DownloadedSector {
464        sector_id,
465        piece_indices,
466        mut raw_sector,
467        history_size,
468    } = downloaded_sector;
469    let EncodeSectorOptions {
470        sector_index,
471        records_encoder,
472        abort_early,
473    } = encoding_options;
474
475    let pieces_in_sector = raw_sector.records.len().try_into().expect(
476        "Raw sector can only be created in this crate and it is always done correctly; qed",
477    );
478
479    let sector_contents_map = records_encoder
480        .encode_records(&sector_id, &mut raw_sector.records, abort_early)
481        .map_err(|error| PlottingError::RecordsEncoderError { error })?;
482
483    let sector_metadata = SectorMetadataChecksummed::from(SectorMetadata {
484        sector_index,
485        pieces_in_sector,
486        s_bucket_sizes: sector_contents_map.s_bucket_sizes(),
487        history_size,
488    });
489
490    Ok(EncodedSector {
491        plotted_sector: PlottedSector {
492            sector_id,
493            sector_index,
494            sector_metadata,
495            piece_indexes: piece_indices,
496        },
497        raw_sector,
498        sector_contents_map,
499    })
500}
501
502/// Write encoded sector into sector output
503pub fn write_sector(
504    encoded_sector: &EncodedSector,
505    sector_output: &mut Vec<u8>,
506) -> Result<(), PlottingError> {
507    let EncodedSector {
508        plotted_sector: _,
509        raw_sector,
510        sector_contents_map,
511    } = encoded_sector;
512
513    let pieces_in_sector = raw_sector.records.len().try_into().expect(
514        "Raw sector can only be created in this crate and it is always done correctly; qed",
515    );
516
517    let sector_size = sector_size(pieces_in_sector);
518
519    if !sector_output.is_empty() && sector_output.len() != sector_size {
520        return Err(PlottingError::BadSectorOutputSize {
521            provided: sector_output.len(),
522            expected: sector_size,
523        });
524    }
525
526    sector_output.resize(sector_size, 0);
527
528    // Write sector to disk in form of following regions:
529    // * sector contents map
530    // * record chunks as s-buckets
531    // * record metadata
532    // * checksum
533    {
534        let (sector_contents_map_region, remaining_bytes) =
535            sector_output.split_at_mut(SectorContentsMap::encoded_size(pieces_in_sector));
536        // Slice remaining memory into belonging to s-buckets and metadata
537        let (s_buckets_region, metadata_region) =
538            remaining_bytes.split_at_mut(sector_record_chunks_size(pieces_in_sector));
539
540        // Write sector contents map so we can decode it later
541        sector_contents_map
542            .encode_into(sector_contents_map_region)
543            .expect("Chunked into correct size above; qed");
544
545        let num_encoded_record_chunks = sector_contents_map.num_encoded_record_chunks();
546        let mut next_encoded_record_chunks_offset = vec![0_usize; pieces_in_sector.into()];
547        let mut next_unencoded_record_chunks_offset = vec![0_usize; pieces_in_sector.into()];
548        // Write record chunks, one s-bucket at a time
549        for ((piece_offset, encoded_chunk_used), output) in (SBucket::ZERO..=SBucket::MAX)
550            .flat_map(|s_bucket| {
551                sector_contents_map
552                    .iter_s_bucket_records(s_bucket)
553                    .expect("S-bucket guaranteed to be in range; qed")
554            })
555            .zip(s_buckets_region.array_chunks_mut::<{ RecordChunk::SIZE }>())
556        {
557            let num_encoded_record_chunks =
558                usize::from(num_encoded_record_chunks[usize::from(piece_offset)]);
559            let next_encoded_record_chunks_offset =
560                &mut next_encoded_record_chunks_offset[usize::from(piece_offset)];
561            let next_unencoded_record_chunks_offset =
562                &mut next_unencoded_record_chunks_offset[usize::from(piece_offset)];
563
564            // We know that s-buckets in `raw_sector.records` are stored in order (encoded first,
565            // then unencoded), hence we don't need to calculate the position, we can just store a
566            // few cursors and know the position that way
567            let chunk_position;
568            if encoded_chunk_used {
569                chunk_position = *next_encoded_record_chunks_offset;
570                *next_encoded_record_chunks_offset += 1;
571            } else {
572                chunk_position = num_encoded_record_chunks + *next_unencoded_record_chunks_offset;
573                *next_unencoded_record_chunks_offset += 1;
574            }
575            output.copy_from_slice(&raw_sector.records[usize::from(piece_offset)][chunk_position]);
576        }
577
578        let metadata_chunks =
579            metadata_region.array_chunks_mut::<{ RecordMetadata::encoded_size() }>();
580        for (record_metadata, output) in raw_sector.metadata.iter().zip(metadata_chunks) {
581            record_metadata.encode_to(&mut output.as_mut_slice());
582        }
583
584        // It would be more efficient to not re-read the whole sector again, but it makes above code
585        // significantly more convoluted and most likely not worth it
586        let (sector_contents, sector_checksum) =
587            sector_output.split_at_mut(sector_size - Blake3Hash::SIZE);
588        sector_checksum.copy_from_slice(
589            {
590                let mut hasher = blake3::Hasher::new();
591                hasher.update_rayon(sector_contents);
592                hasher.finalize()
593            }
594            .as_bytes(),
595        );
596    }
597
598    Ok(())
599}
600
601fn record_encoding<PosTable>(
602    pos_seed: &PosSeed,
603    record: &mut Record,
604    mut encoded_chunks_used: EncodedChunksUsed<'_>,
605    table_generator: &mut PosTable::Generator,
606    erasure_coding: &ErasureCoding,
607    chunks_scratch: &mut Vec<[u8; RecordChunk::SIZE]>,
608) where
609    PosTable: Table,
610{
611    // Derive PoSpace table
612    let pos_table = table_generator.generate_parallel(pos_seed);
613
614    let mut parity_record_chunks = Record::new_boxed();
615
616    // Erasure code source record chunks
617    erasure_coding
618        .extend(record.iter(), parity_record_chunks.iter_mut())
619        .expect("Statically guaranteed valid inputs; qed");
620    let source_record_chunks = record.to_vec();
621
622    chunks_scratch.clear();
623    // For every erasure coded chunk check if there is proof present, if so then encode
624    // with PoSpace proof bytes and set corresponding `encoded_chunks_used` bit to `true`
625    (u16::from(SBucket::ZERO)..=u16::from(SBucket::MAX))
626        .into_par_iter()
627        .map(SBucket::from)
628        .zip(
629            source_record_chunks
630                .par_iter()
631                .chain(parity_record_chunks.par_iter()),
632        )
633        .map(|(s_bucket, record_chunk)| {
634            if let Some(proof) = pos_table.find_proof(s_bucket.into()) {
635                (Simd::from(*record_chunk) ^ Simd::from(*proof.hash())).to_array()
636            } else {
637                // Dummy value indicating no proof
638                [0; RecordChunk::SIZE]
639            }
640        })
641        .collect_into_vec(chunks_scratch);
642    let num_successfully_encoded_chunks = chunks_scratch
643        .drain(..)
644        .zip(encoded_chunks_used.iter_mut())
645        .filter_map(|(maybe_encoded_chunk, mut encoded_chunk_used)| {
646            // No proof, see above
647            if maybe_encoded_chunk == [0; RecordChunk::SIZE] {
648                None
649            } else {
650                *encoded_chunk_used = true;
651
652                Some(maybe_encoded_chunk)
653            }
654        })
655        // Make sure above filter function (and corresponding `encoded_chunk_used` update)
656        // happen at most as many times as there is number of chunks in the record,
657        // otherwise `n+1` iterations could happen and update extra `encoded_chunk_used`
658        // unnecessarily causing issues down the line
659        .take(record.len())
660        .zip(record.iter_mut())
661        // Write encoded chunk back so we can reuse original allocation
662        .map(|(input_chunk, output_chunk)| {
663            *output_chunk = input_chunk;
664        })
665        .count();
666
667    // In some cases there is not enough PoSpace proofs available, in which case we add
668    // remaining number of unencoded erasure coded record chunks to the end
669    source_record_chunks
670        .iter()
671        .chain(parity_record_chunks.iter())
672        .zip(encoded_chunks_used.iter())
673        // Skip chunks that were used previously
674        .filter_map(|(record_chunk, encoded_chunk_used)| {
675            if *encoded_chunk_used {
676                None
677            } else {
678                Some(record_chunk)
679            }
680        })
681        // First `num_successfully_encoded_chunks` chunks are encoded
682        .zip(record.iter_mut().skip(num_successfully_encoded_chunks))
683        // Write necessary number of unencoded chunks at the end
684        .for_each(|(input_chunk, output_chunk)| {
685            *output_chunk = *input_chunk;
686        });
687}
688
689async fn download_sector_internal<PG>(
690    pieces_to_download: &mut HashMap<PieceIndex, Vec<(&mut Record, &mut RecordMetadata)>>,
691    piece_getter: &PG,
692    erasure_coding: &ErasureCoding,
693) -> Result<(), PlottingError>
694where
695    PG: PieceGetter + Send + Sync,
696{
697    // TODO: Make configurable, likely allowing user to specify RAM usage expectations and inferring
698    //  concurrency from there
699    let recovery_semaphore = &Semaphore::new(RECONSTRUCTION_CONCURRENCY_LIMIT);
700
701    // Allocate to decouple lifetime from `pieces_to_download` that will be modified below
702    let piece_indices = pieces_to_download.keys().copied().collect::<Vec<_>>();
703    let mut downloaded_pieces = piece_getter
704        .get_pieces(piece_indices)
705        .await
706        .map_err(|error| PlottingError::FailedToRetrievePieces { error })?
707        .fuse();
708    let mut reconstructed_pieces = FuturesUnordered::new();
709
710    let mut final_result = Ok(());
711
712    loop {
713        let (piece_index, result) = select! {
714            (piece_index, result) = downloaded_pieces.select_next_some() => {
715                match result {
716                    Ok(Some(piece)) => (piece_index, Ok(piece)),
717                    Ok(None) => {
718                        trace!(%piece_index, "Piece was not found, trying reconstruction");
719
720                        reconstructed_pieces.push(reconstruct_piece(
721                            piece_index,
722                            recovery_semaphore,
723                            piece_getter,
724                            erasure_coding,
725                        ));
726                        continue;
727                    }
728                    Err(error) => {
729                        trace!(
730                            %error,
731                            %piece_index,
732                            "Failed to download piece, trying reconstruction"
733                        );
734
735                        reconstructed_pieces.push(reconstruct_piece(
736                            piece_index,
737                            recovery_semaphore,
738                            piece_getter,
739                            erasure_coding,
740                        ));
741                        continue;
742                    }
743                }
744            },
745            (piece_index, result) = reconstructed_pieces.select_next_some() => {
746                (piece_index, result)
747            },
748            complete => {
749                break;
750            }
751        };
752
753        match result {
754            Ok(piece) => {
755                process_piece(piece_index, piece, pieces_to_download);
756            }
757            Err(error) => {
758                trace!(%error, %piece_index, "Failed to download piece");
759
760                if final_result.is_ok() {
761                    final_result = Err(error);
762                }
763            }
764        }
765    }
766
767    if final_result.is_ok() && !pieces_to_download.is_empty() {
768        return Err(PlottingError::FailedToRetrievePieces {
769            error: anyhow::anyhow!(
770                "Successful result, but not all pieces were downloaded, this is likely a piece \
771                getter implementation bug"
772            ),
773        });
774    }
775
776    final_result
777}
778
779async fn reconstruct_piece<PG>(
780    piece_index: PieceIndex,
781    recovery_semaphore: &Semaphore,
782    piece_getter: &PG,
783    erasure_coding: &ErasureCoding,
784) -> (PieceIndex, Result<Piece, PlottingError>)
785where
786    PG: PieceGetter + Send + Sync,
787{
788    let _permit = recovery_semaphore.acquire().await;
789    let recovered_piece_fut =
790        recover_missing_piece(piece_getter, erasure_coding.clone(), piece_index);
791
792    (
793        piece_index,
794        recovered_piece_fut
795            .await
796            .map_err(|error| PlottingError::PieceRecoveryFailed {
797                piece_index,
798                error: error.into(),
799            }),
800    )
801}
802
803fn process_piece(
804    piece_index: PieceIndex,
805    piece: Piece,
806    pieces_to_download: &mut HashMap<PieceIndex, Vec<(&mut Record, &mut RecordMetadata)>>,
807) {
808    for (record, metadata) in pieces_to_download.remove(&piece_index).unwrap_or_default() {
809        // Fancy way to insert value in order to avoid going through stack (if naive
810        // de-referencing is used) and potentially causing stack overflow as the
811        // result
812        record
813            .as_flattened_mut()
814            .copy_from_slice(piece.record().as_flattened());
815        *metadata = RecordMetadata {
816            root: *piece.root(),
817            parity_chunks_root: *piece.parity_chunks_root(),
818            proof: *piece.proof(),
819            piece_checksum: blake3::hash(piece.as_ref()).into(),
820        };
821    }
822}