Skip to main content

ab_archiving/
reconstructor.rs

1use crate::archiver::{Segment, SegmentItem};
2use ab_core_primitives::block::BlockNumber;
3use ab_core_primitives::pieces::Piece;
4use ab_core_primitives::segments::{
5    ArchivedHistorySegment, LastArchivedBlock, LocalSegmentIndex, RecordedHistorySegment,
6    SegmentHeader,
7};
8use ab_erasure_coding::{ErasureCoding, ErasureCodingError, RecoveryShardState};
9use alloc::vec::Vec;
10use core::mem;
11use parity_scale_codec::Decode;
12
13/// Reconstructor-related instantiation error
14#[derive(Debug, Clone, PartialEq, thiserror::Error)]
15pub enum ReconstructorError {
16    /// Error during data shards reconstruction
17    #[error("Error during data shards reconstruction: {0}")]
18    DataShardsReconstruction(#[from] ErasureCodingError),
19    /// Not enough shards
20    #[error("Not enough shards: {num_shards}")]
21    NotEnoughShards { num_shards: usize },
22    /// Segment size is not bigger than record size
23    #[error("Error during segment decoding: {0}")]
24    SegmentDecoding(parity_scale_codec::Error),
25    /// Incorrect segment order, each next segment must have monotonically increasing segment index
26    #[error(
27        "Incorrect segment order, expected index {expected_segment_index}, actual \
28        {actual_segment_index}"
29    )]
30    IncorrectSegmentOrder {
31        expected_segment_index: LocalSegmentIndex,
32        actual_segment_index: LocalSegmentIndex,
33    },
34}
35
36/// Data structure that contains information reconstructed from given segment (potentially using
37/// information from segments that were added previously)
38#[derive(Debug, Default, Clone, Eq, PartialEq)]
39pub struct ReconstructedContents {
40    /// Segment header stored in a segment
41    pub segment_header: Option<SegmentHeader>,
42    /// Reconstructed encoded blocks with their block numbers
43    pub blocks: Vec<(BlockNumber, Vec<u8>)>,
44}
45
46/// Reconstructor helps to retrieve blocks from archived pieces.
47#[derive(Debug, Clone)]
48pub struct Reconstructor {
49    /// Erasure coding data structure
50    erasure_coding: ErasureCoding,
51    /// Index of the last segment added to reconstructor
52    last_segment_index: Option<LocalSegmentIndex>,
53    /// Partially reconstructed block waiting for more data
54    partial_block: Option<Vec<u8>>,
55}
56
57impl Reconstructor {
58    /// Create a new instance
59    pub fn new(erasure_coding: ErasureCoding) -> Self {
60        Self {
61            erasure_coding,
62            last_segment_index: None,
63            partial_block: None,
64        }
65    }
66
67    /// Given a set of pieces of a segment of the archived history (any half of all pieces are
68    /// required to be present, the rest will be recovered automatically due to use of erasure
69    /// coding if needed), reconstructs and returns the segment itself.
70    ///
71    /// Does not modify the internal state of the reconstructor.
72    pub fn reconstruct_segment(
73        &self,
74        segment_pieces: &[Option<Piece>],
75    ) -> Result<Segment, ReconstructorError> {
76        if segment_pieces.len() < ArchivedHistorySegment::NUM_PIECES {
77            return Err(ReconstructorError::NotEnoughShards {
78                num_shards: segment_pieces.len(),
79            });
80        }
81        let mut segment_data = RecordedHistorySegment::new_boxed();
82
83        if !segment_pieces
84            .iter()
85            .zip(segment_data.iter_mut())
86            .all(|(maybe_piece, record)| {
87                if let Some(piece) = maybe_piece {
88                    record.copy_from_slice(piece.record().as_slice());
89                    true
90                } else {
91                    false
92                }
93            })
94        {
95            // If not all data pieces are available, need to reconstruct data shards using erasure
96            // coding.
97
98            let (source_segment_pieces, parity_segment_pieces) =
99                segment_pieces.split_at(RecordedHistorySegment::NUM_RAW_RECORDS);
100            let source = segment_data.iter_mut().zip(source_segment_pieces).map(
101                |(output_record, maybe_source_piece)| match maybe_source_piece {
102                    Some(input_piece) => {
103                        output_record.copy_from_slice(input_piece.record().as_slice());
104                        RecoveryShardState::Present(input_piece.record().as_flattened())
105                    }
106                    None => RecoveryShardState::MissingRecover(output_record.as_flattened_mut()),
107                },
108            );
109            let parity =
110                parity_segment_pieces
111                    .iter()
112                    .map(|maybe_source_piece| match maybe_source_piece {
113                        Some(input_piece) => {
114                            RecoveryShardState::Present(input_piece.record().as_flattened())
115                        }
116                        None => RecoveryShardState::MissingIgnore,
117                    });
118            self.erasure_coding.recover(source, parity)?;
119        }
120
121        let segment = Segment::decode(&mut AsRef::<[u8]>::as_ref(segment_data.as_ref()))
122            .map_err(ReconstructorError::SegmentDecoding)?;
123
124        Ok(segment)
125    }
126
127    /// Given a set of pieces of a segment of the archived history (any half of all pieces are
128    /// required to be present, the rest will be recovered automatically due to use of erasure
129    /// coding if needed), reconstructs and returns segment header and a list of encoded blocks with
130    /// corresponding block numbers.
131    ///
132    /// It is possible to start with any segment, but when next segment is pushed, it needs to
133    /// follow the previous one or else error will be returned.
134    pub fn add_segment(
135        &mut self,
136        segment_pieces: &[Option<Piece>],
137    ) -> Result<ReconstructedContents, ReconstructorError> {
138        let segment = self.reconstruct_segment(segment_pieces)?;
139
140        let mut reconstructed_contents = ReconstructedContents::default();
141        let mut next_block_number = BlockNumber::ZERO;
142        let mut partial_block = self.partial_block.take().unwrap_or_default();
143
144        for segment_item in segment.items {
145            match segment_item {
146                SegmentItem::Padding => {
147                    // Doesn't contain anything
148                }
149                SegmentItem::Block { bytes, .. } => {
150                    if !partial_block.is_empty() {
151                        reconstructed_contents
152                            .blocks
153                            .push((next_block_number, mem::take(&mut partial_block)));
154
155                        next_block_number += BlockNumber::ONE;
156                    }
157
158                    reconstructed_contents
159                        .blocks
160                        .push((next_block_number, Vec::from(bytes)));
161
162                    next_block_number += BlockNumber::ONE;
163                }
164                SegmentItem::BlockStart { bytes, .. } => {
165                    if !partial_block.is_empty() {
166                        reconstructed_contents
167                            .blocks
168                            .push((next_block_number, mem::take(&mut partial_block)));
169
170                        next_block_number += BlockNumber::ONE;
171                    }
172
173                    partial_block = Vec::from(bytes);
174                }
175                SegmentItem::BlockContinuation { bytes, .. } => {
176                    if partial_block.is_empty() {
177                        // This is continuation from previous segment, we don't have the beginning
178                        // of the block to continue.
179                        continue;
180                    }
181
182                    partial_block.extend_from_slice(&bytes);
183                }
184                SegmentItem::ParentSegmentHeader(segment_header) => {
185                    let segment_index = segment_header.local_segment_index();
186
187                    if let Some(last_segment_index) = self.last_segment_index
188                        && last_segment_index != segment_index
189                    {
190                        return Err(ReconstructorError::IncorrectSegmentOrder {
191                            expected_segment_index: last_segment_index + LocalSegmentIndex::ONE,
192                            actual_segment_index: segment_index + LocalSegmentIndex::ONE,
193                        });
194                    }
195
196                    self.last_segment_index
197                        .replace(segment_index + LocalSegmentIndex::ONE);
198
199                    let LastArchivedBlock {
200                        number,
201                        archived_progress,
202                    } = segment_header.last_archived_block;
203
204                    reconstructed_contents
205                        .segment_header
206                        .replace(segment_header);
207
208                    match archived_progress.partial() {
209                        None => {
210                            reconstructed_contents
211                                .blocks
212                                .push((next_block_number, mem::take(&mut partial_block)));
213
214                            next_block_number = number.as_inner() + BlockNumber::ONE;
215                        }
216                        Some(_bytes) => {
217                            next_block_number = number.as_inner();
218
219                            if partial_block.is_empty() {
220                                // Will not be able to recover full block, bump right away.
221                                next_block_number += BlockNumber::ONE;
222                            }
223                        }
224                    }
225                }
226            }
227        }
228
229        if !partial_block.is_empty() {
230            self.partial_block.replace(partial_block);
231        }
232
233        if self.last_segment_index.is_none() {
234            self.last_segment_index.replace(LocalSegmentIndex::ZERO);
235        }
236
237        Ok(reconstructed_contents)
238    }
239}