ab_data_retrieval/
segment_downloading.rs

1//! Fetching segments of the archived history of Subspace Network.
2
3use crate::piece_getter::PieceGetter;
4use ab_archiving::archiver::Segment;
5use ab_archiving::reconstructor::{Reconstructor, ReconstructorError};
6use ab_core_primitives::pieces::Piece;
7use ab_core_primitives::segments::{ArchivedHistorySegment, RecordedHistorySegment, SegmentIndex};
8use ab_erasure_coding::ErasureCoding;
9use futures::StreamExt;
10use std::time::Duration;
11use tokio::task::spawn_blocking;
12use tokio::time::sleep;
13use tracing::debug;
14
15/// The number of times we try to download a segment before giving up.
16/// This is a suggested default, callers can supply their own value if needed.
17pub const SEGMENT_DOWNLOAD_RETRIES: usize = 3;
18
19/// The amount of time we wait between segment download retries.
20/// This is a suggested default, callers can supply their own value if needed.
21pub const SEGMENT_DOWNLOAD_RETRY_DELAY: Duration = Duration::from_secs(10);
22
23/// Segment getter errors.
24#[derive(Debug, thiserror::Error)]
25pub enum SegmentDownloadingError {
26    /// Not enough pieces
27    #[error(
28        "Not enough ({downloaded_pieces}/{}) pieces for segment {segment_index}",
29        RecordedHistorySegment::NUM_RAW_RECORDS
30    )]
31    NotEnoughPieces {
32        /// The segment we were trying to download
33        segment_index: SegmentIndex,
34        /// Number of pieces that were downloaded
35        downloaded_pieces: usize,
36    },
37
38    /// Piece getter error
39    #[error("Piece getter error: {source}")]
40    PieceGetterError {
41        #[from]
42        source: anyhow::Error,
43    },
44
45    /// Segment reconstruction error
46    #[error("Segment reconstruction error: {source}")]
47    SegmentReconstruction {
48        #[from]
49        source: ReconstructorError,
50    },
51
52    /// Segment decoding error
53    #[error("Segment data decoding error: {source}")]
54    SegmentDecoding {
55        #[from]
56        source: parity_scale_codec::Error,
57    },
58}
59
60/// Concurrently downloads the pieces for `segment_index`, and reconstructs the segment.
61pub async fn download_segment<PG>(
62    segment_index: SegmentIndex,
63    piece_getter: &PG,
64    erasure_coding: ErasureCoding,
65    retries: usize,
66    retry_delay: Option<Duration>,
67) -> Result<Segment, SegmentDownloadingError>
68where
69    PG: PieceGetter,
70{
71    let reconstructor = Reconstructor::new(erasure_coding);
72
73    let segment_pieces =
74        download_segment_pieces(segment_index, piece_getter, retries, retry_delay).await?;
75
76    let segment = spawn_blocking(move || reconstructor.reconstruct_segment(&segment_pieces))
77        .await
78        .expect("Panic if blocking task panicked")?;
79
80    Ok(segment)
81}
82
83/// Downloads pieces of a segment so that segment can be reconstructed afterward.
84/// Repeatedly attempts to download pieces until the required number of pieces is reached.
85///
86/// Prefers source pieces if available, on error returns the number of available pieces.
87pub async fn download_segment_pieces<PG>(
88    segment_index: SegmentIndex,
89    piece_getter: &PG,
90    retries: usize,
91    retry_delay: Option<Duration>,
92) -> Result<Vec<Option<Piece>>, SegmentDownloadingError>
93where
94    PG: PieceGetter,
95{
96    let mut existing_pieces = [const { None }; ArchivedHistorySegment::NUM_PIECES];
97
98    for retry in 0..=retries {
99        match download_missing_segment_pieces(segment_index, piece_getter, existing_pieces).await {
100            Ok(segment_pieces) => return Ok(segment_pieces),
101            Err((error, incomplete_segment_pieces)) => {
102                existing_pieces = incomplete_segment_pieces;
103
104                if retry < retries {
105                    debug!(
106                        %segment_index,
107                        %retry,
108                        ?retry_delay,
109                        ?error,
110                        "Failed to download segment pieces once, retrying"
111                    );
112                    if let Some(retry_delay) = retry_delay {
113                        // Wait before retrying to give the node a chance to find other peers
114                        sleep(retry_delay).await;
115                    }
116                }
117            }
118        }
119    }
120
121    debug!(
122        %segment_index,
123        %retries,
124        "Failed to download segment pieces"
125    );
126
127    Err(SegmentDownloadingError::NotEnoughPieces {
128        segment_index,
129        downloaded_pieces: existing_pieces
130            .iter()
131            .filter(|piece| piece.is_some())
132            .count(),
133    })
134}
135
136/// Tries to download pieces of a segment once, so that segment can be reconstructed afterward.
137/// Pass existing pieces in `existing_pieces`, or use
138/// `[const { None }; ArchivedHistorySegment::NUM_PIECES]` if no pieces are available.
139///
140/// Prefers source pieces if available, on error returns the incomplete piece download (including
141/// existing pieces).
142async fn download_missing_segment_pieces<PG>(
143    segment_index: SegmentIndex,
144    piece_getter: &PG,
145    existing_pieces: [Option<Piece>; ArchivedHistorySegment::NUM_PIECES],
146) -> Result<
147    Vec<Option<Piece>>,
148    (
149        SegmentDownloadingError,
150        [Option<Piece>; ArchivedHistorySegment::NUM_PIECES],
151    ),
152>
153where
154    PG: PieceGetter,
155{
156    let required_pieces_number = RecordedHistorySegment::NUM_RAW_RECORDS;
157    let mut downloaded_pieces = existing_pieces
158        .iter()
159        .filter(|piece| piece.is_some())
160        .count();
161
162    // Debugging failure patterns in piece downloads
163    let mut first_success = None;
164    let mut last_success = None;
165    let mut first_failure = None;
166    let mut last_failure = None;
167
168    let mut segment_pieces = existing_pieces;
169
170    let mut pieces_iter = segment_index.segment_piece_indexes().into_iter();
171
172    // Download in batches until we get enough or exhaust available pieces
173    while !pieces_iter.is_empty() && downloaded_pieces != required_pieces_number {
174        let piece_indices = pieces_iter
175            .by_ref()
176            .filter(|piece_index| segment_pieces[piece_index.position() as usize].is_none())
177            .take(required_pieces_number - downloaded_pieces)
178            .collect();
179
180        let mut received_segment_pieces = match piece_getter.get_pieces(piece_indices).await {
181            Ok(pieces) => pieces,
182            Err(error) => return Err((error.into(), segment_pieces)),
183        };
184
185        while let Some((piece_index, result)) = received_segment_pieces.next().await {
186            match result {
187                Ok(Some(piece)) => {
188                    downloaded_pieces += 1;
189                    segment_pieces
190                        .get_mut(piece_index.position() as usize)
191                        .expect("Piece position is by definition within segment; qed")
192                        .replace(piece);
193
194                    if first_success.is_none() {
195                        first_success = Some(piece_index.position());
196                    }
197                    last_success = Some(piece_index.position());
198                }
199                // We often see an error where 127 pieces are downloaded successfully, but the
200                // other 129 fail. It seems like 1 request in a 128 piece batch fails, then 128
201                // single piece requests are made, and also fail.
202                // Delaying requests after a failure gives the node a chance to find other peers.
203                Ok(None) => {
204                    debug!(%piece_index, "Piece was not found");
205                    if first_failure.is_none() {
206                        first_failure = Some(piece_index.position());
207                    }
208                    last_failure = Some(piece_index.position());
209                }
210                Err(error) => {
211                    debug!(%error, %piece_index, "Failed to get piece");
212                    if first_failure.is_none() {
213                        first_failure = Some(piece_index.position());
214                    }
215                    last_failure = Some(piece_index.position());
216                }
217            }
218        }
219    }
220
221    if downloaded_pieces < required_pieces_number {
222        debug!(
223            %segment_index,
224            %downloaded_pieces,
225            %required_pieces_number,
226            // Piece positions that succeeded/failed
227            ?first_success,
228            ?last_success,
229            ?first_failure,
230            ?last_failure,
231            "Failed to retrieve pieces for segment"
232        );
233
234        return Err((
235            SegmentDownloadingError::NotEnoughPieces {
236                segment_index,
237                downloaded_pieces: segment_pieces
238                    .iter()
239                    .filter(|piece| piece.is_some())
240                    .count(),
241            },
242            segment_pieces,
243        ));
244    }
245
246    Ok(segment_pieces.to_vec())
247}