Skip to main content

ab_archiving/
reconstructor.rs

1use crate::archiver::{Segment, SegmentItem};
2use ab_core_primitives::block::BlockNumber;
3use ab_core_primitives::pieces::Piece;
4use ab_core_primitives::segments::{
5    ArchivedHistorySegment, LastArchivedBlock, LocalSegmentIndex, RecordedHistorySegment,
6    SegmentHeader,
7};
8use ab_erasure_coding::{ErasureCoding, ErasureCodingError, RecoveryShardState};
9use alloc::vec::Vec;
10use core::mem;
11use parity_scale_codec::Decode;
12
13/// Reconstructor-related instantiation error
14#[derive(Debug, Clone, PartialEq, thiserror::Error)]
15pub enum ReconstructorError {
16    /// Error during data shards reconstruction
17    #[error("Error during data shards reconstruction: {0}")]
18    DataShardsReconstruction(#[from] ErasureCodingError),
19    /// Not enough shards
20    #[error("Not enough shards: {num_shards}")]
21    NotEnoughShards { num_shards: usize },
22    /// Segment size is not bigger than record size
23    #[error("Error during segment decoding: {0}")]
24    SegmentDecoding(parity_scale_codec::Error),
25    /// Incorrect segment order, each next segment must have monotonically increasing segment index
26    #[error(
27        "Incorrect segment order, expected index {expected_segment_index}, actual \
28        {actual_segment_index}"
29    )]
30    IncorrectSegmentOrder {
31        expected_segment_index: LocalSegmentIndex,
32        actual_segment_index: LocalSegmentIndex,
33    },
34}
35
36/// Data structure that contains information reconstructed from given segment (potentially using
37/// information from segments that were added previously)
38#[derive(Debug, Default, Clone, Eq, PartialEq)]
39pub struct ReconstructedContents {
40    /// Segment header stored in a segment
41    pub segment_header: Option<SegmentHeader>,
42    /// Reconstructed encoded blocks with their block numbers
43    pub blocks: Vec<(BlockNumber, Vec<u8>)>,
44}
45
46/// Reconstructor helps to retrieve blocks from archived pieces.
47#[derive(Debug, Clone)]
48pub struct Reconstructor {
49    /// Erasure coding data structure
50    erasure_coding: ErasureCoding,
51    /// Index of the last segment added to reconstructor
52    last_segment_index: Option<LocalSegmentIndex>,
53    /// Partially reconstructed block waiting for more data
54    partial_block: Option<Vec<u8>>,
55}
56
57impl Reconstructor {
58    /// Create a new instance
59    pub fn new(erasure_coding: ErasureCoding) -> Self {
60        Self {
61            erasure_coding,
62            last_segment_index: None,
63            partial_block: None,
64        }
65    }
66
67    /// Given a set of pieces of a segment of the archived history (any half of all pieces are
68    /// required to be present, the rest will be recovered automatically due to use of erasure
69    /// coding if needed), reconstructs and returns the segment itself.
70    ///
71    /// Does not modify the internal state of the reconstructor.
72    pub fn reconstruct_segment(
73        &self,
74        segment_pieces: &[Option<Piece>],
75    ) -> Result<Segment, ReconstructorError> {
76        if segment_pieces.len() < ArchivedHistorySegment::NUM_PIECES {
77            return Err(ReconstructorError::NotEnoughShards {
78                num_shards: segment_pieces.len(),
79            });
80        }
81        let mut segment_data = RecordedHistorySegment::new_boxed();
82
83        if !segment_pieces
84            .iter()
85            .zip(segment_data.iter_mut())
86            .all(|(maybe_piece, record)| {
87                if let Some(piece) = maybe_piece {
88                    // Fancy way to insert value to avoid going through stack (if naive
89                    // dereferencing is used) and potentially causing stack overflow as the result
90                    record.copy_from_slice(&*piece.record);
91                    true
92                } else {
93                    false
94                }
95            })
96        {
97            // If not all data pieces are available, need to reconstruct data shards using erasure
98            // coding
99            let (source_segment_pieces, parity_segment_pieces) =
100                segment_pieces.split_at(RecordedHistorySegment::NUM_RAW_RECORDS);
101            let source = segment_data.iter_mut().zip(source_segment_pieces).map(
102                |(output_record, maybe_source_piece)| match maybe_source_piece {
103                    Some(input_piece) => {
104                        // Fancy way to insert value to avoid going through stack (if naive
105                        // dereferencing is used) and potentially causing stack overflow as the
106                        // result
107                        output_record.copy_from_slice(&*input_piece.record);
108                        RecoveryShardState::Present(input_piece.record.as_flattened())
109                    }
110                    None => RecoveryShardState::MissingRecover(output_record.as_flattened_mut()),
111                },
112            );
113            let parity =
114                parity_segment_pieces
115                    .iter()
116                    .map(|maybe_source_piece| match maybe_source_piece {
117                        Some(input_piece) => {
118                            RecoveryShardState::Present(input_piece.record.as_flattened())
119                        }
120                        None => RecoveryShardState::MissingIgnore,
121                    });
122            self.erasure_coding.recover(source, parity)?;
123        }
124
125        let segment = Segment::decode(&mut AsRef::<[u8]>::as_ref(segment_data.as_ref()))
126            .map_err(ReconstructorError::SegmentDecoding)?;
127
128        Ok(segment)
129    }
130
131    /// Given a set of pieces of a segment of the archived history (any half of all pieces are
132    /// required to be present, the rest will be recovered automatically due to use of erasure
133    /// coding if needed), reconstructs and returns segment header and a list of encoded blocks with
134    /// corresponding block numbers.
135    ///
136    /// It is possible to start with any segment, but when next segment is pushed, it needs to
137    /// follow the previous one or else error will be returned.
138    pub fn add_segment(
139        &mut self,
140        segment_pieces: &[Option<Piece>],
141    ) -> Result<ReconstructedContents, ReconstructorError> {
142        let segment = self.reconstruct_segment(segment_pieces)?;
143
144        let mut reconstructed_contents = ReconstructedContents::default();
145        let mut next_block_number = BlockNumber::ZERO;
146        let mut partial_block = self.partial_block.take().unwrap_or_default();
147
148        for segment_item in segment.items {
149            match segment_item {
150                SegmentItem::Padding => {
151                    // Doesn't contain anything
152                }
153                SegmentItem::Block { bytes, .. } => {
154                    if !partial_block.is_empty() {
155                        reconstructed_contents
156                            .blocks
157                            .push((next_block_number, mem::take(&mut partial_block)));
158
159                        next_block_number += BlockNumber::ONE;
160                    }
161
162                    reconstructed_contents
163                        .blocks
164                        .push((next_block_number, Vec::from(bytes)));
165
166                    next_block_number += BlockNumber::ONE;
167                }
168                SegmentItem::BlockStart { bytes, .. } => {
169                    if !partial_block.is_empty() {
170                        reconstructed_contents
171                            .blocks
172                            .push((next_block_number, mem::take(&mut partial_block)));
173
174                        next_block_number += BlockNumber::ONE;
175                    }
176
177                    partial_block = Vec::from(bytes);
178                }
179                SegmentItem::BlockContinuation { bytes, .. } => {
180                    if partial_block.is_empty() {
181                        // This is continuation from previous segment, we don't have the beginning
182                        // of the block to continue.
183                        continue;
184                    }
185
186                    partial_block.extend_from_slice(&bytes);
187                }
188                SegmentItem::ParentSegmentHeader(segment_header) => {
189                    let segment_index = segment_header.local_segment_index();
190
191                    if let Some(last_segment_index) = self.last_segment_index
192                        && last_segment_index != segment_index
193                    {
194                        return Err(ReconstructorError::IncorrectSegmentOrder {
195                            expected_segment_index: last_segment_index + LocalSegmentIndex::ONE,
196                            actual_segment_index: segment_index + LocalSegmentIndex::ONE,
197                        });
198                    }
199
200                    self.last_segment_index
201                        .replace(segment_index + LocalSegmentIndex::ONE);
202
203                    let LastArchivedBlock {
204                        number,
205                        archived_progress,
206                    } = segment_header.last_archived_block;
207
208                    reconstructed_contents
209                        .segment_header
210                        .replace(segment_header);
211
212                    match archived_progress.partial() {
213                        None => {
214                            reconstructed_contents
215                                .blocks
216                                .push((next_block_number, mem::take(&mut partial_block)));
217
218                            next_block_number = number.as_inner() + BlockNumber::ONE;
219                        }
220                        Some(_bytes) => {
221                            next_block_number = number.as_inner();
222
223                            if partial_block.is_empty() {
224                                // Will not be able to recover full block, bump right away.
225                                next_block_number += BlockNumber::ONE;
226                            }
227                        }
228                    }
229                }
230            }
231        }
232
233        if !partial_block.is_empty() {
234            self.partial_block.replace(partial_block);
235        }
236
237        if self.last_segment_index.is_none() {
238            self.last_segment_index.replace(LocalSegmentIndex::ZERO);
239        }
240
241        Ok(reconstructed_contents)
242    }
243}