ab_archiving/
reconstructor.rs1use crate::archiver::{Segment, SegmentItem};
2use ab_core_primitives::block::BlockNumber;
3use ab_core_primitives::pieces::Piece;
4use ab_core_primitives::segments::{
5 ArchivedHistorySegment, LastArchivedBlock, LocalSegmentIndex, RecordedHistorySegment,
6 SegmentHeader,
7};
8use ab_erasure_coding::{ErasureCoding, ErasureCodingError, RecoveryShardState};
9use alloc::vec::Vec;
10use core::mem;
11use parity_scale_codec::Decode;
12
13#[derive(Debug, Clone, PartialEq, thiserror::Error)]
15pub enum ReconstructorError {
16 #[error("Error during data shards reconstruction: {0}")]
18 DataShardsReconstruction(#[from] ErasureCodingError),
19 #[error("Not enough shards: {num_shards}")]
21 NotEnoughShards { num_shards: usize },
22 #[error("Error during segment decoding: {0}")]
24 SegmentDecoding(parity_scale_codec::Error),
25 #[error(
27 "Incorrect segment order, expected index {expected_segment_index}, actual \
28 {actual_segment_index}"
29 )]
30 IncorrectSegmentOrder {
31 expected_segment_index: LocalSegmentIndex,
32 actual_segment_index: LocalSegmentIndex,
33 },
34}
35
36#[derive(Debug, Default, Clone, Eq, PartialEq)]
39pub struct ReconstructedContents {
40 pub segment_header: Option<SegmentHeader>,
42 pub blocks: Vec<(BlockNumber, Vec<u8>)>,
44}
45
46#[derive(Debug, Clone)]
48pub struct Reconstructor {
49 erasure_coding: ErasureCoding,
51 last_segment_index: Option<LocalSegmentIndex>,
53 partial_block: Option<Vec<u8>>,
55}
56
57impl Reconstructor {
58 pub fn new(erasure_coding: ErasureCoding) -> Self {
60 Self {
61 erasure_coding,
62 last_segment_index: None,
63 partial_block: None,
64 }
65 }
66
67 pub fn reconstruct_segment(
73 &self,
74 segment_pieces: &[Option<Piece>],
75 ) -> Result<Segment, ReconstructorError> {
76 if segment_pieces.len() < ArchivedHistorySegment::NUM_PIECES {
77 return Err(ReconstructorError::NotEnoughShards {
78 num_shards: segment_pieces.len(),
79 });
80 }
81 let mut segment_data = RecordedHistorySegment::new_boxed();
82
83 if !segment_pieces
84 .iter()
85 .zip(segment_data.iter_mut())
86 .all(|(maybe_piece, record)| {
87 if let Some(piece) = maybe_piece {
88 record.copy_from_slice(piece.record().as_slice());
89 true
90 } else {
91 false
92 }
93 })
94 {
95 let (source_segment_pieces, parity_segment_pieces) =
99 segment_pieces.split_at(RecordedHistorySegment::NUM_RAW_RECORDS);
100 let source = segment_data.iter_mut().zip(source_segment_pieces).map(
101 |(output_record, maybe_source_piece)| match maybe_source_piece {
102 Some(input_piece) => {
103 output_record.copy_from_slice(input_piece.record().as_slice());
104 RecoveryShardState::Present(input_piece.record().as_flattened())
105 }
106 None => RecoveryShardState::MissingRecover(output_record.as_flattened_mut()),
107 },
108 );
109 let parity =
110 parity_segment_pieces
111 .iter()
112 .map(|maybe_source_piece| match maybe_source_piece {
113 Some(input_piece) => {
114 RecoveryShardState::Present(input_piece.record().as_flattened())
115 }
116 None => RecoveryShardState::MissingIgnore,
117 });
118 self.erasure_coding.recover(source, parity)?;
119 }
120
121 let segment = Segment::decode(&mut AsRef::<[u8]>::as_ref(segment_data.as_ref()))
122 .map_err(ReconstructorError::SegmentDecoding)?;
123
124 Ok(segment)
125 }
126
127 pub fn add_segment(
135 &mut self,
136 segment_pieces: &[Option<Piece>],
137 ) -> Result<ReconstructedContents, ReconstructorError> {
138 let segment = self.reconstruct_segment(segment_pieces)?;
139
140 let mut reconstructed_contents = ReconstructedContents::default();
141 let mut next_block_number = BlockNumber::ZERO;
142 let mut partial_block = self.partial_block.take().unwrap_or_default();
143
144 for segment_item in segment.items {
145 match segment_item {
146 SegmentItem::Padding => {
147 }
149 SegmentItem::Block { bytes, .. } => {
150 if !partial_block.is_empty() {
151 reconstructed_contents
152 .blocks
153 .push((next_block_number, mem::take(&mut partial_block)));
154
155 next_block_number += BlockNumber::ONE;
156 }
157
158 reconstructed_contents
159 .blocks
160 .push((next_block_number, Vec::from(bytes)));
161
162 next_block_number += BlockNumber::ONE;
163 }
164 SegmentItem::BlockStart { bytes, .. } => {
165 if !partial_block.is_empty() {
166 reconstructed_contents
167 .blocks
168 .push((next_block_number, mem::take(&mut partial_block)));
169
170 next_block_number += BlockNumber::ONE;
171 }
172
173 partial_block = Vec::from(bytes);
174 }
175 SegmentItem::BlockContinuation { bytes, .. } => {
176 if partial_block.is_empty() {
177 continue;
180 }
181
182 partial_block.extend_from_slice(&bytes);
183 }
184 SegmentItem::ParentSegmentHeader(segment_header) => {
185 let segment_index = segment_header.local_segment_index();
186
187 if let Some(last_segment_index) = self.last_segment_index
188 && last_segment_index != segment_index
189 {
190 return Err(ReconstructorError::IncorrectSegmentOrder {
191 expected_segment_index: last_segment_index + LocalSegmentIndex::ONE,
192 actual_segment_index: segment_index + LocalSegmentIndex::ONE,
193 });
194 }
195
196 self.last_segment_index
197 .replace(segment_index + LocalSegmentIndex::ONE);
198
199 let LastArchivedBlock {
200 number,
201 archived_progress,
202 } = segment_header.last_archived_block;
203
204 reconstructed_contents
205 .segment_header
206 .replace(segment_header);
207
208 match archived_progress.partial() {
209 None => {
210 reconstructed_contents
211 .blocks
212 .push((next_block_number, mem::take(&mut partial_block)));
213
214 next_block_number = number.as_inner() + BlockNumber::ONE;
215 }
216 Some(_bytes) => {
217 next_block_number = number.as_inner();
218
219 if partial_block.is_empty() {
220 next_block_number += BlockNumber::ONE;
222 }
223 }
224 }
225 }
226 }
227 }
228
229 if !partial_block.is_empty() {
230 self.partial_block.replace(partial_block);
231 }
232
233 if self.last_segment_index.is_none() {
234 self.last_segment_index.replace(LocalSegmentIndex::ZERO);
235 }
236
237 Ok(reconstructed_contents)
238 }
239}