ab_archiving/
reconstructor.rs1use crate::archiver::{Segment, SegmentItem};
2use ab_core_primitives::block::BlockNumber;
3use ab_core_primitives::pieces::Piece;
4use ab_core_primitives::segments::{
5 ArchivedHistorySegment, LastArchivedBlock, LocalSegmentIndex, RecordedHistorySegment,
6 SegmentHeader,
7};
8use ab_erasure_coding::{ErasureCoding, ErasureCodingError, RecoveryShardState};
9use alloc::vec::Vec;
10use core::mem;
11use parity_scale_codec::Decode;
12
13#[derive(Debug, Clone, PartialEq, thiserror::Error)]
15pub enum ReconstructorError {
16 #[error("Error during data shards reconstruction: {0}")]
18 DataShardsReconstruction(#[from] ErasureCodingError),
19 #[error("Not enough shards: {num_shards}")]
21 NotEnoughShards { num_shards: usize },
22 #[error("Error during segment decoding: {0}")]
24 SegmentDecoding(parity_scale_codec::Error),
25 #[error(
27 "Incorrect segment order, expected index {expected_segment_index}, actual \
28 {actual_segment_index}"
29 )]
30 IncorrectSegmentOrder {
31 expected_segment_index: LocalSegmentIndex,
32 actual_segment_index: LocalSegmentIndex,
33 },
34}
35
36#[derive(Debug, Default, Clone, Eq, PartialEq)]
39pub struct ReconstructedContents {
40 pub segment_header: Option<SegmentHeader>,
42 pub blocks: Vec<(BlockNumber, Vec<u8>)>,
44}
45
46#[derive(Debug, Clone)]
48pub struct Reconstructor {
49 erasure_coding: ErasureCoding,
51 last_segment_index: Option<LocalSegmentIndex>,
53 partial_block: Option<Vec<u8>>,
55}
56
57impl Reconstructor {
58 pub fn new(erasure_coding: ErasureCoding) -> Self {
60 Self {
61 erasure_coding,
62 last_segment_index: None,
63 partial_block: None,
64 }
65 }
66
67 pub fn reconstruct_segment(
73 &self,
74 segment_pieces: &[Option<Piece>],
75 ) -> Result<Segment, ReconstructorError> {
76 if segment_pieces.len() < ArchivedHistorySegment::NUM_PIECES {
77 return Err(ReconstructorError::NotEnoughShards {
78 num_shards: segment_pieces.len(),
79 });
80 }
81 let mut segment_data = RecordedHistorySegment::new_boxed();
82
83 if !segment_pieces
84 .iter()
85 .zip(segment_data.iter_mut())
86 .all(|(maybe_piece, record)| {
87 if let Some(piece) = maybe_piece {
88 record.copy_from_slice(&*piece.record);
91 true
92 } else {
93 false
94 }
95 })
96 {
97 let (source_segment_pieces, parity_segment_pieces) =
100 segment_pieces.split_at(RecordedHistorySegment::NUM_RAW_RECORDS);
101 let source = segment_data.iter_mut().zip(source_segment_pieces).map(
102 |(output_record, maybe_source_piece)| match maybe_source_piece {
103 Some(input_piece) => {
104 output_record.copy_from_slice(&*input_piece.record);
108 RecoveryShardState::Present(input_piece.record.as_flattened())
109 }
110 None => RecoveryShardState::MissingRecover(output_record.as_flattened_mut()),
111 },
112 );
113 let parity =
114 parity_segment_pieces
115 .iter()
116 .map(|maybe_source_piece| match maybe_source_piece {
117 Some(input_piece) => {
118 RecoveryShardState::Present(input_piece.record.as_flattened())
119 }
120 None => RecoveryShardState::MissingIgnore,
121 });
122 self.erasure_coding.recover(source, parity)?;
123 }
124
125 let segment = Segment::decode(&mut AsRef::<[u8]>::as_ref(segment_data.as_ref()))
126 .map_err(ReconstructorError::SegmentDecoding)?;
127
128 Ok(segment)
129 }
130
131 pub fn add_segment(
139 &mut self,
140 segment_pieces: &[Option<Piece>],
141 ) -> Result<ReconstructedContents, ReconstructorError> {
142 let segment = self.reconstruct_segment(segment_pieces)?;
143
144 let mut reconstructed_contents = ReconstructedContents::default();
145 let mut next_block_number = BlockNumber::ZERO;
146 let mut partial_block = self.partial_block.take().unwrap_or_default();
147
148 for segment_item in segment.items {
149 match segment_item {
150 SegmentItem::Padding => {
151 }
153 SegmentItem::Block { bytes, .. } => {
154 if !partial_block.is_empty() {
155 reconstructed_contents
156 .blocks
157 .push((next_block_number, mem::take(&mut partial_block)));
158
159 next_block_number += BlockNumber::ONE;
160 }
161
162 reconstructed_contents
163 .blocks
164 .push((next_block_number, Vec::from(bytes)));
165
166 next_block_number += BlockNumber::ONE;
167 }
168 SegmentItem::BlockStart { bytes, .. } => {
169 if !partial_block.is_empty() {
170 reconstructed_contents
171 .blocks
172 .push((next_block_number, mem::take(&mut partial_block)));
173
174 next_block_number += BlockNumber::ONE;
175 }
176
177 partial_block = Vec::from(bytes);
178 }
179 SegmentItem::BlockContinuation { bytes, .. } => {
180 if partial_block.is_empty() {
181 continue;
184 }
185
186 partial_block.extend_from_slice(&bytes);
187 }
188 SegmentItem::ParentSegmentHeader(segment_header) => {
189 let segment_index = segment_header.local_segment_index();
190
191 if let Some(last_segment_index) = self.last_segment_index
192 && last_segment_index != segment_index
193 {
194 return Err(ReconstructorError::IncorrectSegmentOrder {
195 expected_segment_index: last_segment_index + LocalSegmentIndex::ONE,
196 actual_segment_index: segment_index + LocalSegmentIndex::ONE,
197 });
198 }
199
200 self.last_segment_index
201 .replace(segment_index + LocalSegmentIndex::ONE);
202
203 let LastArchivedBlock {
204 number,
205 archived_progress,
206 } = segment_header.last_archived_block;
207
208 reconstructed_contents
209 .segment_header
210 .replace(segment_header);
211
212 match archived_progress.partial() {
213 None => {
214 reconstructed_contents
215 .blocks
216 .push((next_block_number, mem::take(&mut partial_block)));
217
218 next_block_number = number.as_inner() + BlockNumber::ONE;
219 }
220 Some(_bytes) => {
221 next_block_number = number.as_inner();
222
223 if partial_block.is_empty() {
224 next_block_number += BlockNumber::ONE;
226 }
227 }
228 }
229 }
230 }
231 }
232
233 if !partial_block.is_empty() {
234 self.partial_block.replace(partial_block);
235 }
236
237 if self.last_segment_index.is_none() {
238 self.last_segment_index.replace(LocalSegmentIndex::ZERO);
239 }
240
241 Ok(reconstructed_contents)
242 }
243}