ab_data_retrieval/
segment_downloading.rs

1//! Fetching segments of the archived history
2
3use crate::piece_getter::PieceGetter;
4use ab_archiving::archiver::Segment;
5use ab_archiving::reconstructor::{Reconstructor, ReconstructorError};
6use ab_core_primitives::pieces::Piece;
7use ab_core_primitives::segments::{ArchivedHistorySegment, RecordedHistorySegment, SegmentIndex};
8use ab_erasure_coding::ErasureCoding;
9use futures::StreamExt;
10use tokio::task::spawn_blocking;
11use tracing::debug;
12
13/// Segment getter errors.
14#[derive(Debug, thiserror::Error)]
15pub enum SegmentDownloadingError {
16    /// Not enough pieces
17    #[error("Not enough ({downloaded_pieces}) pieces")]
18    NotEnoughPieces {
19        /// Number of pieces that were downloaded
20        downloaded_pieces: usize,
21    },
22
23    /// Piece getter error
24    #[error("Piece getter error: {source}")]
25    PieceGetterError {
26        #[from]
27        source: anyhow::Error,
28    },
29
30    /// Segment reconstruction error
31    #[error("Segment reconstruction error: {source}")]
32    SegmentReconstruction {
33        #[from]
34        source: ReconstructorError,
35    },
36
37    /// Segment decoding error
38    #[error("Segment data decoding error: {source}")]
39    SegmentDecoding {
40        #[from]
41        source: parity_scale_codec::Error,
42    },
43}
44
45/// Concurrently downloads the pieces for `segment_index`, and reconstructs the segment.
46pub async fn download_segment<PG>(
47    segment_index: SegmentIndex,
48    piece_getter: &PG,
49    erasure_coding: ErasureCoding,
50) -> Result<Segment, SegmentDownloadingError>
51where
52    PG: PieceGetter,
53{
54    let reconstructor = Reconstructor::new(erasure_coding);
55
56    let segment_pieces = download_segment_pieces(segment_index, piece_getter).await?;
57
58    let segment = spawn_blocking(move || reconstructor.reconstruct_segment(&segment_pieces))
59        .await
60        .expect("Panic if blocking task panicked")?;
61
62    Ok(segment)
63}
64
65/// Downloads pieces of the segment such that segment can be reconstructed afterward.
66///
67/// Prefers source pieces if available, on error returns number of downloaded pieces
68pub async fn download_segment_pieces<PG>(
69    segment_index: SegmentIndex,
70    piece_getter: &PG,
71) -> Result<Vec<Option<Piece>>, SegmentDownloadingError>
72where
73    PG: PieceGetter,
74{
75    let required_pieces_number = RecordedHistorySegment::NUM_RAW_RECORDS;
76    let mut downloaded_pieces = 0_usize;
77
78    let mut segment_pieces = vec![None::<Piece>; ArchivedHistorySegment::NUM_PIECES];
79
80    let mut pieces_iter = segment_index.segment_piece_indexes().into_iter();
81
82    // Download in batches until we get enough or exhaust available pieces
83    while !pieces_iter.is_empty() && downloaded_pieces != required_pieces_number {
84        let piece_indices = pieces_iter
85            .by_ref()
86            .take(required_pieces_number - downloaded_pieces)
87            .collect();
88
89        let mut received_segment_pieces = piece_getter.get_pieces(piece_indices).await?;
90
91        while let Some((piece_index, result)) = received_segment_pieces.next().await {
92            match result {
93                Ok(Some(piece)) => {
94                    downloaded_pieces += 1;
95                    segment_pieces
96                        .get_mut(piece_index.position() as usize)
97                        .expect("Piece position is by definition within segment; qed")
98                        .replace(piece);
99                }
100                Ok(None) => {
101                    debug!(%piece_index, "Piece was not found");
102                }
103                Err(error) => {
104                    debug!(%error, %piece_index, "Failed to get piece");
105                }
106            }
107        }
108    }
109
110    if downloaded_pieces < required_pieces_number {
111        debug!(
112            %segment_index,
113            %downloaded_pieces,
114            %required_pieces_number,
115            "Failed to retrieve pieces for segment"
116        );
117
118        return Err(SegmentDownloadingError::NotEnoughPieces { downloaded_pieces });
119    }
120
121    Ok(segment_pieces)
122}