Skip to main content

ab_archiving/
piece_reconstructor.rs

1use ab_core_primitives::pieces::{
2    Piece, PieceHeader, PiecePosition, Record, RecordChunksRoot, RecordProof, SegmentProof,
3};
4use ab_core_primitives::segments::{
5    ArchivedHistorySegment, LocalSegmentIndex, RecordedHistorySegment, SegmentPosition,
6    SegmentRoot, SuperSegmentIndex,
7};
8use ab_core_primitives::shard::ShardIndex;
9use ab_erasure_coding::{ErasureCoding, ErasureCodingError, RecoveryShardState};
10use ab_merkle_tree::balanced::BalancedMerkleTree;
11use alloc::vec::Vec;
12#[cfg(feature = "parallel")]
13use rayon::prelude::*;
14
15/// Reconstructor-related instantiation error
16#[derive(Debug, Clone, PartialEq, thiserror::Error)]
17pub enum ReconstructorError {
18    /// Segment size is not bigger than record size
19    #[error("Error during data shards reconstruction: {0}")]
20    DataShardsReconstruction(#[from] ErasureCodingError),
21    /// Not enough shards
22    #[error("Not enough shards: {num_shards}")]
23    NotEnoughShards { num_shards: usize },
24}
25
26struct SharedPieceDetails {
27    shard_index: ShardIndex,
28    local_segment_index: LocalSegmentIndex,
29    super_segment_index: SuperSegmentIndex,
30    segment_position: SegmentPosition,
31    segment_root: SegmentRoot,
32    segment_proof: SegmentProof,
33}
34
35/// Piece reconstructor helps to reconstruct missing pieces.
36#[derive(Debug, Clone)]
37pub struct PiecesReconstructor {
38    /// Erasure coding data structure
39    erasure_coding: ErasureCoding,
40}
41
42impl PiecesReconstructor {
43    /// Create a new instance
44    pub fn new(erasure_coding: ErasureCoding) -> Self {
45        Self { erasure_coding }
46    }
47
48    fn reconstruct_shards(
49        &self,
50        input_pieces: &[Option<Piece>],
51    ) -> Result<ArchivedHistorySegment, ReconstructorError> {
52        if input_pieces.len() < ArchivedHistorySegment::NUM_PIECES {
53            return Err(ReconstructorError::NotEnoughShards {
54                num_shards: input_pieces.len(),
55            });
56        }
57        let mut reconstructed_pieces = ArchivedHistorySegment::default();
58
59        // TODO: Fix up piece metadata
60        let mut shared_piece_details = None;
61        {
62            let (source_input_pieces, parity_input_pieces) =
63                input_pieces.split_at(RecordedHistorySegment::NUM_RAW_RECORDS);
64            let (source_reconstructed_pieces, parity_reconstructed_pieces) =
65                reconstructed_pieces.split_at_mut(RecordedHistorySegment::NUM_RAW_RECORDS);
66
67            let source = source_input_pieces
68                .iter()
69                .zip(source_reconstructed_pieces)
70                .map(
71                    |(maybe_input_piece, output_piece)| match maybe_input_piece {
72                        Some(input_piece) => {
73                            if shared_piece_details.is_none() {
74                                shared_piece_details.replace(SharedPieceDetails {
75                                    shard_index: input_piece.header.shard_index.as_inner(),
76                                    local_segment_index: input_piece
77                                        .header
78                                        .local_segment_index
79                                        .as_inner(),
80                                    super_segment_index: input_piece
81                                        .header
82                                        .super_segment_index
83                                        .as_inner(),
84                                    segment_position: input_piece
85                                        .header
86                                        .segment_position
87                                        .as_inner(),
88                                    segment_root: input_piece.header.segment_root,
89                                    segment_proof: input_piece.header.segment_proof,
90                                });
91                            }
92                            // Fancy way to insert value to avoid going through stack (if naive
93                            // dereferencing is used) and potentially causing stack overflow as the
94                            // result
95                            output_piece.record.copy_from_slice(&*input_piece.record);
96                            RecoveryShardState::Present(input_piece.record.as_flattened())
97                        }
98                        None => RecoveryShardState::MissingRecover(
99                            output_piece.record.as_flattened_mut(),
100                        ),
101                    },
102                );
103            let parity = parity_input_pieces
104                .iter()
105                .zip(parity_reconstructed_pieces.iter_mut())
106                .map(
107                    |(maybe_input_piece, output_piece)| match maybe_input_piece {
108                        Some(input_piece) => {
109                            // Fancy way to insert value to avoid going through stack (if naive
110                            // dereferencing is used) and potentially causing stack overflow as the
111                            // result
112                            output_piece.record.copy_from_slice(&*input_piece.record);
113                            RecoveryShardState::Present(input_piece.record.as_flattened())
114                        }
115                        None => RecoveryShardState::MissingRecover(
116                            output_piece.record.as_flattened_mut(),
117                        ),
118                    },
119                );
120            self.erasure_coding.recover(source, parity)?;
121        }
122        let SharedPieceDetails {
123            shard_index,
124            local_segment_index,
125            super_segment_index,
126            segment_position,
127            segment_root,
128            segment_proof,
129        } = shared_piece_details.expect(
130            "Sucessful recovery means there was at least one piece to fill this Option; qed",
131        );
132
133        let record_roots = {
134            #[cfg(not(feature = "parallel"))]
135            let iter = reconstructed_pieces.iter_mut().zip(input_pieces);
136            #[cfg(feature = "parallel")]
137            let iter = reconstructed_pieces.par_iter_mut().zip_eq(input_pieces);
138
139            iter.map(|(piece, maybe_input_piece)| {
140                let (record_root, parity_chunks_root) = if let Some(input_piece) = maybe_input_piece
141                {
142                    (
143                        *input_piece.record_root(),
144                        *input_piece.header.parity_chunks_root,
145                    )
146                } else {
147                    // TODO: Reuse allocations between iterations
148                    let [source_chunks_root, parity_chunks_root] = {
149                        let mut parity_chunks = Record::new_boxed();
150
151                        self.erasure_coding
152                            .extend(piece.record.iter(), parity_chunks.iter_mut())?;
153
154                        let source_chunks_root = *piece.record.source_chunks_root();
155                        let parity_chunks_root =
156                            BalancedMerkleTree::compute_root_only(&parity_chunks);
157
158                        [source_chunks_root, parity_chunks_root]
159                    };
160
161                    let record_root =
162                        BalancedMerkleTree::new(&[source_chunks_root, parity_chunks_root]).root();
163
164                    (record_root, parity_chunks_root)
165                };
166
167                piece.header.parity_chunks_root = RecordChunksRoot::from(parity_chunks_root);
168
169                Ok::<_, ReconstructorError>(record_root)
170            })
171            .collect::<Result<Vec<_>, _>>()?
172        };
173
174        let segment_merkle_tree =
175            BalancedMerkleTree::<{ ArchivedHistorySegment::NUM_PIECES }>::new_boxed(
176                record_roots
177                    .as_slice()
178                    .try_into()
179                    .expect("Statically guaranteed to have correct length; qed"),
180            );
181
182        reconstructed_pieces
183            .iter_mut()
184            .zip(segment_merkle_tree.all_proofs())
185            .for_each(|(piece, record_proof)| {
186                piece.header = PieceHeader {
187                    shard_index: shard_index.into(),
188                    local_segment_index: local_segment_index.into(),
189                    super_segment_index: super_segment_index.into(),
190                    segment_position: segment_position.into(),
191                    segment_root,
192                    segment_proof,
193                    parity_chunks_root: piece.header.parity_chunks_root,
194                    record_proof: RecordProof::from(record_proof),
195                };
196            });
197
198        Ok(reconstructed_pieces)
199    }
200
201    /// Returns all the pieces for a segment using a given set of pieces of a segment of the
202    /// archived history.
203    ///
204    /// Any half of all pieces are required to be present, the rest will be recovered automatically
205    /// due to use of erasure coding if needed.
206    pub fn reconstruct_segment(
207        &self,
208        segment_pieces: &[Option<Piece>],
209    ) -> Result<ArchivedHistorySegment, ReconstructorError> {
210        let pieces = self.reconstruct_shards(segment_pieces)?;
211
212        Ok(pieces.to_shared())
213    }
214
215    /// Returns the missing piece for a segment using given set of pieces of a segment of the
216    /// archived history (any half of all pieces are required to be present).
217    pub fn reconstruct_piece(
218        &self,
219        segment_pieces: &[Option<Piece>],
220        piece_position: PiecePosition,
221    ) -> Result<Piece, ReconstructorError> {
222        // TODO: Early exit if position already exists and doesn't need reconstruction
223        // TODO: It is now inefficient to recover all shards if only one piece is needed, especially
224        //  source piece
225        let pieces = self.reconstruct_shards(segment_pieces)?;
226
227        let piece = Piece::from(&pieces[piece_position]);
228
229        Ok(piece.to_shared())
230    }
231}