ab_archiving/
piece_reconstructor.rs

1use ab_core_primitives::pieces::{Piece, Record};
2use ab_core_primitives::segments::{ArchivedHistorySegment, RecordedHistorySegment};
3use ab_erasure_coding::{ErasureCoding, ErasureCodingError, RecoveryShardState};
4use ab_merkle_tree::balanced_hashed::BalancedHashedMerkleTree;
5use alloc::vec::Vec;
6#[cfg(feature = "parallel")]
7use rayon::prelude::*;
8
9/// Reconstructor-related instantiation error
10#[derive(Debug, Clone, PartialEq, thiserror::Error)]
11pub enum ReconstructorError {
12    /// Segment size is not bigger than record size
13    #[error("Error during data shards reconstruction: {0}")]
14    DataShardsReconstruction(#[from] ErasureCodingError),
15    /// Not enough shards
16    #[error("Not enough shards: {num_shards}")]
17    NotEnoughShards { num_shards: usize },
18    /// Incorrect piece position provided.
19    #[error("Incorrect piece position provided.")]
20    IncorrectPiecePosition,
21}
22
23/// Piece reconstructor helps to reconstruct missing pieces.
24#[derive(Debug, Clone)]
25pub struct PiecesReconstructor {
26    /// Erasure coding data structure
27    erasure_coding: ErasureCoding,
28}
29
30impl PiecesReconstructor {
31    /// Create a new instance
32    pub fn new(erasure_coding: ErasureCoding) -> Self {
33        Self { erasure_coding }
34    }
35
36    fn reconstruct_shards(
37        &self,
38        input_pieces: &[Option<Piece>],
39    ) -> Result<ArchivedHistorySegment, ReconstructorError> {
40        if input_pieces.len() < ArchivedHistorySegment::NUM_PIECES {
41            return Err(ReconstructorError::NotEnoughShards {
42                num_shards: input_pieces.len(),
43            });
44        }
45        let mut reconstructed_pieces = ArchivedHistorySegment::default();
46
47        {
48            let (source_input_pieces, parity_input_pieces) =
49                input_pieces.split_at(RecordedHistorySegment::NUM_RAW_RECORDS);
50            let (source_reconstructed_pieces, parity_reconstructed_pieces) =
51                reconstructed_pieces.split_at_mut(RecordedHistorySegment::NUM_RAW_RECORDS);
52
53            let source = source_input_pieces
54                .iter()
55                .zip(source_reconstructed_pieces)
56                .map(
57                    |(maybe_input_piece, output_piece)| match maybe_input_piece {
58                        Some(input_piece) => {
59                            output_piece
60                                .record_mut()
61                                .copy_from_slice(input_piece.record().as_slice());
62                            RecoveryShardState::Present(input_piece.record().as_flattened())
63                        }
64                        None => RecoveryShardState::MissingRecover(
65                            output_piece.record_mut().as_flattened_mut(),
66                        ),
67                    },
68                );
69            let parity = parity_input_pieces
70                .iter()
71                .zip(parity_reconstructed_pieces.iter_mut())
72                .map(
73                    |(maybe_input_piece, output_piece)| match maybe_input_piece {
74                        Some(input_piece) => {
75                            output_piece
76                                .record_mut()
77                                .copy_from_slice(input_piece.record().as_slice());
78                            RecoveryShardState::Present(input_piece.record().as_flattened())
79                        }
80                        None => RecoveryShardState::MissingRecover(
81                            output_piece.record_mut().as_flattened_mut(),
82                        ),
83                    },
84                );
85            self.erasure_coding.recover(source, parity)?;
86        }
87
88        let record_roots = {
89            #[cfg(not(feature = "parallel"))]
90            let iter = reconstructed_pieces.iter_mut().zip(input_pieces);
91            #[cfg(feature = "parallel")]
92            let iter = reconstructed_pieces.par_iter_mut().zip_eq(input_pieces);
93
94            iter.map(|(piece, maybe_input_piece)| {
95                let (record_root, parity_chunks_root) = if let Some(input_piece) = maybe_input_piece
96                {
97                    (**input_piece.root(), **input_piece.parity_chunks_root())
98                } else {
99                    // TODO: Reuse allocations between iterations
100                    let [source_chunks_root, parity_chunks_root] = {
101                        let mut parity_chunks = Record::new_boxed();
102
103                        self.erasure_coding
104                            .extend(piece.record().iter(), parity_chunks.iter_mut())?;
105
106                        let source_chunks_root =
107                            BalancedHashedMerkleTree::compute_root_only(piece.record());
108
109                        let parity_chunks_root =
110                            BalancedHashedMerkleTree::compute_root_only(&parity_chunks);
111
112                        [source_chunks_root, parity_chunks_root]
113                    };
114
115                    let record_root =
116                        BalancedHashedMerkleTree::new(&[source_chunks_root, parity_chunks_root])
117                            .root();
118
119                    (record_root, parity_chunks_root)
120                };
121
122                piece.root_mut().copy_from_slice(&record_root);
123                piece
124                    .parity_chunks_root_mut()
125                    .copy_from_slice(&parity_chunks_root);
126
127                Ok::<_, ReconstructorError>(record_root)
128            })
129            .collect::<Result<Vec<_>, _>>()?
130        };
131
132        let segment_merkle_tree =
133            BalancedHashedMerkleTree::<{ ArchivedHistorySegment::NUM_PIECES }>::new_boxed(
134                record_roots
135                    .as_slice()
136                    .try_into()
137                    .expect("Statically guaranteed to have correct length; qed"),
138            );
139
140        reconstructed_pieces
141            .iter_mut()
142            .zip(segment_merkle_tree.all_proofs())
143            .for_each(|(piece, record_proof)| {
144                piece.proof_mut().copy_from_slice(&record_proof);
145            });
146
147        Ok(reconstructed_pieces)
148    }
149
150    /// Returns all the pieces for a segment using given set of pieces of a segment of the archived
151    /// history (any half of all pieces are required to be present, the rest will be recovered
152    /// automatically due to use of erasure coding if needed).
153    pub fn reconstruct_segment(
154        &self,
155        segment_pieces: &[Option<Piece>],
156    ) -> Result<ArchivedHistorySegment, ReconstructorError> {
157        let pieces = self.reconstruct_shards(segment_pieces)?;
158
159        Ok(pieces.to_shared())
160    }
161
162    /// Returns the missing piece for a segment using given set of pieces of a segment of the archived
163    /// history (any half of all pieces are required to be present).
164    pub fn reconstruct_piece(
165        &self,
166        segment_pieces: &[Option<Piece>],
167        piece_position: usize,
168    ) -> Result<Piece, ReconstructorError> {
169        if piece_position >= ArchivedHistorySegment::NUM_PIECES {
170            return Err(ReconstructorError::IncorrectPiecePosition);
171        }
172
173        // TODO: Early exit if position already exists and doesn't need reconstruction
174        // TODO: It is now inefficient to recover all shards if only one piece is needed, especially
175        //  source piece
176        let pieces = self.reconstruct_shards(segment_pieces)?;
177
178        let piece = Piece::from(&pieces[piece_position]);
179
180        Ok(piece.to_shared())
181    }
182}