ab_data_retrieval/
segment_downloading.rs1use crate::piece_getter::PieceGetter;
4use ab_archiving::archiver::Segment;
5use ab_archiving::reconstructor::{Reconstructor, ReconstructorError};
6use ab_core_primitives::pieces::Piece;
7use ab_core_primitives::segments::{ArchivedHistorySegment, RecordedHistorySegment, SegmentIndex};
8use ab_erasure_coding::ErasureCoding;
9use futures::StreamExt;
10use tokio::task::spawn_blocking;
11use tracing::debug;
12
13#[derive(Debug, thiserror::Error)]
15pub enum SegmentDownloadingError {
16 #[error("Not enough ({downloaded_pieces}) pieces")]
18 NotEnoughPieces {
19 downloaded_pieces: usize,
21 },
22
23 #[error("Piece getter error: {source}")]
25 PieceGetterError {
26 #[from]
27 source: anyhow::Error,
28 },
29
30 #[error("Segment reconstruction error: {source}")]
32 SegmentReconstruction {
33 #[from]
34 source: ReconstructorError,
35 },
36
37 #[error("Segment data decoding error: {source}")]
39 SegmentDecoding {
40 #[from]
41 source: parity_scale_codec::Error,
42 },
43}
44
45pub async fn download_segment<PG>(
47 segment_index: SegmentIndex,
48 piece_getter: &PG,
49 erasure_coding: ErasureCoding,
50) -> Result<Segment, SegmentDownloadingError>
51where
52 PG: PieceGetter,
53{
54 let reconstructor = Reconstructor::new(erasure_coding);
55
56 let segment_pieces = download_segment_pieces(segment_index, piece_getter).await?;
57
58 let segment = spawn_blocking(move || reconstructor.reconstruct_segment(&segment_pieces))
59 .await
60 .expect("Panic if blocking task panicked")?;
61
62 Ok(segment)
63}
64
65pub async fn download_segment_pieces<PG>(
69 segment_index: SegmentIndex,
70 piece_getter: &PG,
71) -> Result<Vec<Option<Piece>>, SegmentDownloadingError>
72where
73 PG: PieceGetter,
74{
75 let required_pieces_number = RecordedHistorySegment::NUM_RAW_RECORDS;
76 let mut downloaded_pieces = 0_usize;
77
78 let mut segment_pieces = vec![None::<Piece>; ArchivedHistorySegment::NUM_PIECES];
79
80 let mut pieces_iter = segment_index.segment_piece_indexes().into_iter();
81
82 while !pieces_iter.is_empty() && downloaded_pieces != required_pieces_number {
84 let piece_indices = pieces_iter
85 .by_ref()
86 .take(required_pieces_number - downloaded_pieces)
87 .collect();
88
89 let mut received_segment_pieces = piece_getter.get_pieces(piece_indices).await?;
90
91 while let Some((piece_index, result)) = received_segment_pieces.next().await {
92 match result {
93 Ok(Some(piece)) => {
94 downloaded_pieces += 1;
95 segment_pieces
96 .get_mut(piece_index.position() as usize)
97 .expect("Piece position is by definition within segment; qed")
98 .replace(piece);
99 }
100 Ok(None) => {
101 debug!(%piece_index, "Piece was not found");
102 }
103 Err(error) => {
104 debug!(%error, %piece_index, "Failed to get piece");
105 }
106 }
107 }
108 }
109
110 if downloaded_pieces < required_pieces_number {
111 debug!(
112 %segment_index,
113 %downloaded_pieces,
114 %required_pieces_number,
115 "Failed to retrieve pieces for segment"
116 );
117
118 return Err(SegmentDownloadingError::NotEnoughPieces { downloaded_pieces });
119 }
120
121 Ok(segment_pieces)
122}