ab_archiving/
reconstructor.rs1use crate::archiver::{Segment, SegmentItem};
2use ab_core_primitives::block::BlockNumber;
3use ab_core_primitives::pieces::Piece;
4use ab_core_primitives::segments::{
5 ArchivedHistorySegment, LastArchivedBlock, RecordedHistorySegment, SegmentHeader, SegmentIndex,
6};
7use ab_erasure_coding::{ErasureCoding, ErasureCodingError, RecoveryShardState};
8use alloc::vec::Vec;
9use core::mem;
10use parity_scale_codec::Decode;
11
12#[derive(Debug, Clone, PartialEq, thiserror::Error)]
14pub enum ReconstructorError {
15 #[error("Error during data shards reconstruction: {0}")]
17 DataShardsReconstruction(#[from] ErasureCodingError),
18 #[error("Not enough shards: {num_shards}")]
20 NotEnoughShards { num_shards: usize },
21 #[error("Error during segment decoding: {0}")]
23 SegmentDecoding(parity_scale_codec::Error),
24 #[error(
26 "Incorrect segment order, expected index {expected_segment_index}, actual \
27 {actual_segment_index}"
28 )]
29 IncorrectSegmentOrder {
30 expected_segment_index: SegmentIndex,
31 actual_segment_index: SegmentIndex,
32 },
33}
34
35#[derive(Debug, Default, Clone, Eq, PartialEq)]
38pub struct ReconstructedContents {
39 pub segment_header: Option<SegmentHeader>,
41 pub blocks: Vec<(BlockNumber, Vec<u8>)>,
43}
44
45#[derive(Debug, Clone)]
47pub struct Reconstructor {
48 erasure_coding: ErasureCoding,
50 last_segment_index: Option<SegmentIndex>,
52 partial_block: Option<Vec<u8>>,
54}
55
56impl Reconstructor {
57 pub fn new(erasure_coding: ErasureCoding) -> Self {
59 Self {
60 erasure_coding,
61 last_segment_index: None,
62 partial_block: None,
63 }
64 }
65
66 pub fn reconstruct_segment(
72 &self,
73 segment_pieces: &[Option<Piece>],
74 ) -> Result<Segment, ReconstructorError> {
75 if segment_pieces.len() < ArchivedHistorySegment::NUM_PIECES {
76 return Err(ReconstructorError::NotEnoughShards {
77 num_shards: segment_pieces.len(),
78 });
79 }
80 let mut segment_data = RecordedHistorySegment::new_boxed();
81
82 if !segment_pieces
83 .iter()
84 .zip(segment_data.iter_mut())
85 .all(|(maybe_piece, record)| {
86 if let Some(piece) = maybe_piece {
87 record.copy_from_slice(piece.record().as_slice());
88 true
89 } else {
90 false
91 }
92 })
93 {
94 let (source_segment_pieces, parity_segment_pieces) =
98 segment_pieces.split_at(RecordedHistorySegment::NUM_RAW_RECORDS);
99 let source = segment_data.iter_mut().zip(source_segment_pieces).map(
100 |(output_record, maybe_source_piece)| match maybe_source_piece {
101 Some(input_piece) => {
102 output_record.copy_from_slice(input_piece.record().as_slice());
103 RecoveryShardState::Present(input_piece.record().as_flattened())
104 }
105 None => RecoveryShardState::MissingRecover(output_record.as_flattened_mut()),
106 },
107 );
108 let parity =
109 parity_segment_pieces
110 .iter()
111 .map(|maybe_source_piece| match maybe_source_piece {
112 Some(input_piece) => {
113 RecoveryShardState::Present(input_piece.record().as_flattened())
114 }
115 None => RecoveryShardState::MissingIgnore,
116 });
117 self.erasure_coding.recover(source, parity)?;
118 }
119
120 let segment = Segment::decode(&mut AsRef::<[u8]>::as_ref(segment_data.as_ref()))
121 .map_err(ReconstructorError::SegmentDecoding)?;
122
123 Ok(segment)
124 }
125
126 pub fn add_segment(
134 &mut self,
135 segment_pieces: &[Option<Piece>],
136 ) -> Result<ReconstructedContents, ReconstructorError> {
137 let segment = self.reconstruct_segment(segment_pieces)?;
138
139 let mut reconstructed_contents = ReconstructedContents::default();
140 let mut next_block_number = BlockNumber::ZERO;
141 let mut partial_block = self.partial_block.take().unwrap_or_default();
142
143 for segment_item in segment.items {
144 match segment_item {
145 SegmentItem::Padding => {
146 }
148 SegmentItem::Block { bytes, .. } => {
149 if !partial_block.is_empty() {
150 reconstructed_contents
151 .blocks
152 .push((next_block_number, mem::take(&mut partial_block)));
153
154 next_block_number += BlockNumber::ONE;
155 }
156
157 reconstructed_contents
158 .blocks
159 .push((next_block_number, Vec::from(bytes)));
160
161 next_block_number += BlockNumber::ONE;
162 }
163 SegmentItem::BlockStart { bytes, .. } => {
164 if !partial_block.is_empty() {
165 reconstructed_contents
166 .blocks
167 .push((next_block_number, mem::take(&mut partial_block)));
168
169 next_block_number += BlockNumber::ONE;
170 }
171
172 partial_block = Vec::from(bytes);
173 }
174 SegmentItem::BlockContinuation { bytes, .. } => {
175 if partial_block.is_empty() {
176 continue;
179 }
180
181 partial_block.extend_from_slice(&bytes);
182 }
183 SegmentItem::ParentSegmentHeader(segment_header) => {
184 let segment_index = segment_header.segment_index();
185
186 if let Some(last_segment_index) = self.last_segment_index {
187 if last_segment_index != segment_index {
188 return Err(ReconstructorError::IncorrectSegmentOrder {
189 expected_segment_index: last_segment_index + SegmentIndex::ONE,
190 actual_segment_index: segment_index + SegmentIndex::ONE,
191 });
192 }
193 }
194
195 self.last_segment_index
196 .replace(segment_index + SegmentIndex::ONE);
197
198 let LastArchivedBlock {
199 number,
200 archived_progress,
201 } = segment_header.last_archived_block;
202
203 reconstructed_contents
204 .segment_header
205 .replace(segment_header);
206
207 match archived_progress.partial() {
208 None => {
209 reconstructed_contents
210 .blocks
211 .push((next_block_number, mem::take(&mut partial_block)));
212
213 next_block_number = number.as_inner() + BlockNumber::ONE;
214 }
215 Some(_bytes) => {
216 next_block_number = number.as_inner();
217
218 if partial_block.is_empty() {
219 next_block_number += BlockNumber::ONE;
221 }
222 }
223 }
224 }
225 }
226 }
227
228 if !partial_block.is_empty() {
229 self.partial_block.replace(partial_block);
230 }
231
232 if self.last_segment_index.is_none() {
233 self.last_segment_index.replace(SegmentIndex::ZERO);
234 }
235
236 Ok(reconstructed_contents)
237 }
238}