ab_data_retrieval/
segment_downloading.rs1use crate::piece_getter::PieceGetter;
4use ab_archiving::archiver::Segment;
5use ab_archiving::reconstructor::{Reconstructor, ReconstructorError};
6use ab_core_primitives::pieces::Piece;
7use ab_core_primitives::segments::{ArchivedHistorySegment, RecordedHistorySegment, SegmentIndex};
8use ab_erasure_coding::ErasureCoding;
9use futures::StreamExt;
10use std::time::Duration;
11use tokio::task::spawn_blocking;
12use tokio::time::sleep;
13use tracing::debug;
14
15pub const SEGMENT_DOWNLOAD_RETRIES: usize = 3;
18
19pub const SEGMENT_DOWNLOAD_RETRY_DELAY: Duration = Duration::from_secs(10);
22
23#[derive(Debug, thiserror::Error)]
25pub enum SegmentDownloadingError {
26 #[error(
28 "Not enough ({downloaded_pieces}/{}) pieces for segment {segment_index}",
29 RecordedHistorySegment::NUM_RAW_RECORDS
30 )]
31 NotEnoughPieces {
32 segment_index: SegmentIndex,
34 downloaded_pieces: usize,
36 },
37
38 #[error("Piece getter error: {source}")]
40 PieceGetterError {
41 #[from]
42 source: anyhow::Error,
43 },
44
45 #[error("Segment reconstruction error: {source}")]
47 SegmentReconstruction {
48 #[from]
49 source: ReconstructorError,
50 },
51
52 #[error("Segment data decoding error: {source}")]
54 SegmentDecoding {
55 #[from]
56 source: parity_scale_codec::Error,
57 },
58}
59
60pub async fn download_segment<PG>(
62 segment_index: SegmentIndex,
63 piece_getter: &PG,
64 erasure_coding: ErasureCoding,
65 retries: usize,
66 retry_delay: Option<Duration>,
67) -> Result<Segment, SegmentDownloadingError>
68where
69 PG: PieceGetter,
70{
71 let reconstructor = Reconstructor::new(erasure_coding);
72
73 let segment_pieces =
74 download_segment_pieces(segment_index, piece_getter, retries, retry_delay).await?;
75
76 let segment = spawn_blocking(move || reconstructor.reconstruct_segment(&segment_pieces))
77 .await
78 .expect("Panic if blocking task panicked")?;
79
80 Ok(segment)
81}
82
83pub async fn download_segment_pieces<PG>(
88 segment_index: SegmentIndex,
89 piece_getter: &PG,
90 retries: usize,
91 retry_delay: Option<Duration>,
92) -> Result<Vec<Option<Piece>>, SegmentDownloadingError>
93where
94 PG: PieceGetter,
95{
96 let mut existing_pieces = [const { None }; ArchivedHistorySegment::NUM_PIECES];
97
98 for retry in 0..=retries {
99 match download_missing_segment_pieces(segment_index, piece_getter, existing_pieces).await {
100 Ok(segment_pieces) => return Ok(segment_pieces),
101 Err((error, incomplete_segment_pieces)) => {
102 existing_pieces = incomplete_segment_pieces;
103
104 if retry < retries {
105 debug!(
106 %segment_index,
107 %retry,
108 ?retry_delay,
109 ?error,
110 "Failed to download segment pieces once, retrying"
111 );
112 if let Some(retry_delay) = retry_delay {
113 sleep(retry_delay).await;
115 }
116 }
117 }
118 }
119 }
120
121 debug!(
122 %segment_index,
123 %retries,
124 "Failed to download segment pieces"
125 );
126
127 Err(SegmentDownloadingError::NotEnoughPieces {
128 segment_index,
129 downloaded_pieces: existing_pieces
130 .iter()
131 .filter(|piece| piece.is_some())
132 .count(),
133 })
134}
135
136async fn download_missing_segment_pieces<PG>(
143 segment_index: SegmentIndex,
144 piece_getter: &PG,
145 existing_pieces: [Option<Piece>; ArchivedHistorySegment::NUM_PIECES],
146) -> Result<
147 Vec<Option<Piece>>,
148 (
149 SegmentDownloadingError,
150 [Option<Piece>; ArchivedHistorySegment::NUM_PIECES],
151 ),
152>
153where
154 PG: PieceGetter,
155{
156 let required_pieces_number = RecordedHistorySegment::NUM_RAW_RECORDS;
157 let mut downloaded_pieces = existing_pieces
158 .iter()
159 .filter(|piece| piece.is_some())
160 .count();
161
162 let mut first_success = None;
164 let mut last_success = None;
165 let mut first_failure = None;
166 let mut last_failure = None;
167
168 let mut segment_pieces = existing_pieces;
169
170 let mut pieces_iter = segment_index.segment_piece_indexes().into_iter();
171
172 while !pieces_iter.is_empty() && downloaded_pieces != required_pieces_number {
174 let piece_indices = pieces_iter
175 .by_ref()
176 .filter(|piece_index| segment_pieces[piece_index.position() as usize].is_none())
177 .take(required_pieces_number - downloaded_pieces)
178 .collect();
179
180 let mut received_segment_pieces = match piece_getter.get_pieces(piece_indices).await {
181 Ok(pieces) => pieces,
182 Err(error) => return Err((error.into(), segment_pieces)),
183 };
184
185 while let Some((piece_index, result)) = received_segment_pieces.next().await {
186 match result {
187 Ok(Some(piece)) => {
188 downloaded_pieces += 1;
189 segment_pieces
190 .get_mut(piece_index.position() as usize)
191 .expect("Piece position is by definition within segment; qed")
192 .replace(piece);
193
194 if first_success.is_none() {
195 first_success = Some(piece_index.position());
196 }
197 last_success = Some(piece_index.position());
198 }
199 Ok(None) => {
204 debug!(%piece_index, "Piece was not found");
205 if first_failure.is_none() {
206 first_failure = Some(piece_index.position());
207 }
208 last_failure = Some(piece_index.position());
209 }
210 Err(error) => {
211 debug!(%error, %piece_index, "Failed to get piece");
212 if first_failure.is_none() {
213 first_failure = Some(piece_index.position());
214 }
215 last_failure = Some(piece_index.position());
216 }
217 }
218 }
219 }
220
221 if downloaded_pieces < required_pieces_number {
222 debug!(
223 %segment_index,
224 %downloaded_pieces,
225 %required_pieces_number,
226 ?first_success,
228 ?last_success,
229 ?first_failure,
230 ?last_failure,
231 "Failed to retrieve pieces for segment"
232 );
233
234 return Err((
235 SegmentDownloadingError::NotEnoughPieces {
236 segment_index,
237 downloaded_pieces: segment_pieces
238 .iter()
239 .filter(|piece| piece.is_some())
240 .count(),
241 },
242 segment_pieces,
243 ));
244 }
245
246 Ok(segment_pieces.to_vec())
247}