ab_archiving/
reconstructor.rs

1use crate::archiver::{Segment, SegmentItem};
2use ab_core_primitives::block::BlockNumber;
3use ab_core_primitives::pieces::Piece;
4use ab_core_primitives::segments::{
5    ArchivedHistorySegment, LastArchivedBlock, RecordedHistorySegment, SegmentHeader, SegmentIndex,
6};
7use ab_erasure_coding::{ErasureCoding, ErasureCodingError, RecoveryShardState};
8use alloc::vec::Vec;
9use core::mem;
10use parity_scale_codec::Decode;
11
12/// Reconstructor-related instantiation error
13#[derive(Debug, Clone, PartialEq, thiserror::Error)]
14pub enum ReconstructorError {
15    /// Error during data shards reconstruction
16    #[error("Error during data shards reconstruction: {0}")]
17    DataShardsReconstruction(#[from] ErasureCodingError),
18    /// Not enough shards
19    #[error("Not enough shards: {num_shards}")]
20    NotEnoughShards { num_shards: usize },
21    /// Segment size is not bigger than record size
22    #[error("Error during segment decoding: {0}")]
23    SegmentDecoding(parity_scale_codec::Error),
24    /// Incorrect segment order, each next segment must have monotonically increasing segment index
25    #[error(
26        "Incorrect segment order, expected index {expected_segment_index}, actual \
27        {actual_segment_index}"
28    )]
29    IncorrectSegmentOrder {
30        expected_segment_index: SegmentIndex,
31        actual_segment_index: SegmentIndex,
32    },
33}
34
35/// Data structure that contains information reconstructed from given segment (potentially using
36/// information from segments that were added previously)
37#[derive(Debug, Default, Clone, Eq, PartialEq)]
38pub struct ReconstructedContents {
39    /// Segment header stored in a segment
40    pub segment_header: Option<SegmentHeader>,
41    /// Reconstructed encoded blocks with their block numbers
42    pub blocks: Vec<(BlockNumber, Vec<u8>)>,
43}
44
45/// Reconstructor helps to retrieve blocks from archived pieces.
46#[derive(Debug, Clone)]
47pub struct Reconstructor {
48    /// Erasure coding data structure
49    erasure_coding: ErasureCoding,
50    /// Index of last segment added to reconstructor
51    last_segment_index: Option<SegmentIndex>,
52    /// Partially reconstructed block waiting for more data
53    partial_block: Option<Vec<u8>>,
54}
55
56impl Reconstructor {
57    /// Create a new instance
58    pub fn new(erasure_coding: ErasureCoding) -> Self {
59        Self {
60            erasure_coding,
61            last_segment_index: None,
62            partial_block: None,
63        }
64    }
65
66    /// Given a set of pieces of a segment of the archived history (any half of all pieces are
67    /// required to be present, the rest will be recovered automatically due to use of erasure
68    /// coding if needed), reconstructs and returns the segment itself.
69    ///
70    /// Does not modify the internal state of the reconstructor.
71    pub fn reconstruct_segment(
72        &self,
73        segment_pieces: &[Option<Piece>],
74    ) -> Result<Segment, ReconstructorError> {
75        if segment_pieces.len() < ArchivedHistorySegment::NUM_PIECES {
76            return Err(ReconstructorError::NotEnoughShards {
77                num_shards: segment_pieces.len(),
78            });
79        }
80        let mut segment_data = RecordedHistorySegment::new_boxed();
81
82        if !segment_pieces
83            .iter()
84            .zip(segment_data.iter_mut())
85            .all(|(maybe_piece, record)| {
86                if let Some(piece) = maybe_piece {
87                    record.copy_from_slice(piece.record().as_slice());
88                    true
89                } else {
90                    false
91                }
92            })
93        {
94            // If not all data pieces are available, need to reconstruct data shards using erasure
95            // coding.
96
97            let (source_segment_pieces, parity_segment_pieces) =
98                segment_pieces.split_at(RecordedHistorySegment::NUM_RAW_RECORDS);
99            let source = segment_data.iter_mut().zip(source_segment_pieces).map(
100                |(output_record, maybe_source_piece)| match maybe_source_piece {
101                    Some(input_piece) => {
102                        output_record.copy_from_slice(input_piece.record().as_slice());
103                        RecoveryShardState::Present(input_piece.record().as_flattened())
104                    }
105                    None => RecoveryShardState::MissingRecover(output_record.as_flattened_mut()),
106                },
107            );
108            let parity =
109                parity_segment_pieces
110                    .iter()
111                    .map(|maybe_source_piece| match maybe_source_piece {
112                        Some(input_piece) => {
113                            RecoveryShardState::Present(input_piece.record().as_flattened())
114                        }
115                        None => RecoveryShardState::MissingIgnore,
116                    });
117            self.erasure_coding.recover(source, parity)?;
118        }
119
120        let segment = Segment::decode(&mut AsRef::<[u8]>::as_ref(segment_data.as_ref()))
121            .map_err(ReconstructorError::SegmentDecoding)?;
122
123        Ok(segment)
124    }
125
126    /// Given a set of pieces of a segment of the archived history (any half of all pieces are
127    /// required to be present, the rest will be recovered automatically due to use of erasure
128    /// coding if needed), reconstructs and returns segment header and a list of encoded blocks with
129    /// corresponding block numbers.
130    ///
131    /// It is possible to start with any segment, but when next segment is pushed, it needs to
132    /// follow the previous one or else error will be returned.
133    pub fn add_segment(
134        &mut self,
135        segment_pieces: &[Option<Piece>],
136    ) -> Result<ReconstructedContents, ReconstructorError> {
137        let segment = self.reconstruct_segment(segment_pieces)?;
138
139        let mut reconstructed_contents = ReconstructedContents::default();
140        let mut next_block_number = BlockNumber::ZERO;
141        let mut partial_block = self.partial_block.take().unwrap_or_default();
142
143        for segment_item in segment.items {
144            match segment_item {
145                SegmentItem::Padding => {
146                    // Doesn't contain anything
147                }
148                SegmentItem::Block { bytes, .. } => {
149                    if !partial_block.is_empty() {
150                        reconstructed_contents
151                            .blocks
152                            .push((next_block_number, mem::take(&mut partial_block)));
153
154                        next_block_number += BlockNumber::ONE;
155                    }
156
157                    reconstructed_contents
158                        .blocks
159                        .push((next_block_number, Vec::from(bytes)));
160
161                    next_block_number += BlockNumber::ONE;
162                }
163                SegmentItem::BlockStart { bytes, .. } => {
164                    if !partial_block.is_empty() {
165                        reconstructed_contents
166                            .blocks
167                            .push((next_block_number, mem::take(&mut partial_block)));
168
169                        next_block_number += BlockNumber::ONE;
170                    }
171
172                    partial_block = Vec::from(bytes);
173                }
174                SegmentItem::BlockContinuation { bytes, .. } => {
175                    if partial_block.is_empty() {
176                        // This is continuation from previous segment, we don't have the beginning
177                        // of the block to continue.
178                        continue;
179                    }
180
181                    partial_block.extend_from_slice(&bytes);
182                }
183                SegmentItem::ParentSegmentHeader(segment_header) => {
184                    let segment_index = segment_header.segment_index();
185
186                    if let Some(last_segment_index) = self.last_segment_index {
187                        if last_segment_index != segment_index {
188                            return Err(ReconstructorError::IncorrectSegmentOrder {
189                                expected_segment_index: last_segment_index + SegmentIndex::ONE,
190                                actual_segment_index: segment_index + SegmentIndex::ONE,
191                            });
192                        }
193                    }
194
195                    self.last_segment_index
196                        .replace(segment_index + SegmentIndex::ONE);
197
198                    let LastArchivedBlock {
199                        number,
200                        archived_progress,
201                    } = segment_header.last_archived_block;
202
203                    reconstructed_contents
204                        .segment_header
205                        .replace(segment_header);
206
207                    match archived_progress.partial() {
208                        None => {
209                            reconstructed_contents
210                                .blocks
211                                .push((next_block_number, mem::take(&mut partial_block)));
212
213                            next_block_number = number.as_inner() + BlockNumber::ONE;
214                        }
215                        Some(_bytes) => {
216                            next_block_number = number.as_inner();
217
218                            if partial_block.is_empty() {
219                                // Will not be able to recover full block, bump right away.
220                                next_block_number += BlockNumber::ONE;
221                            }
222                        }
223                    }
224                }
225            }
226        }
227
228        if !partial_block.is_empty() {
229            self.partial_block.replace(partial_block);
230        }
231
232        if self.last_segment_index.is_none() {
233            self.last_segment_index.replace(SegmentIndex::ZERO);
234        }
235
236        Ok(reconstructed_contents)
237    }
238}