Skip to main content

ab_archiving/
piece_reconstructor.rs

1use ab_core_primitives::pieces::{Piece, PiecePosition, Record};
2use ab_core_primitives::segments::{ArchivedHistorySegment, RecordedHistorySegment};
3use ab_erasure_coding::{ErasureCoding, ErasureCodingError, RecoveryShardState};
4use ab_merkle_tree::balanced::BalancedMerkleTree;
5use alloc::vec::Vec;
6#[cfg(feature = "parallel")]
7use rayon::prelude::*;
8
9/// Reconstructor-related instantiation error
10#[derive(Debug, Clone, PartialEq, thiserror::Error)]
11pub enum ReconstructorError {
12    /// Segment size is not bigger than record size
13    #[error("Error during data shards reconstruction: {0}")]
14    DataShardsReconstruction(#[from] ErasureCodingError),
15    /// Not enough shards
16    #[error("Not enough shards: {num_shards}")]
17    NotEnoughShards { num_shards: usize },
18}
19
20/// Piece reconstructor helps to reconstruct missing pieces.
21#[derive(Debug, Clone)]
22pub struct PiecesReconstructor {
23    /// Erasure coding data structure
24    erasure_coding: ErasureCoding,
25}
26
27impl PiecesReconstructor {
28    /// Create a new instance
29    pub fn new(erasure_coding: ErasureCoding) -> Self {
30        Self { erasure_coding }
31    }
32
33    fn reconstruct_shards(
34        &self,
35        input_pieces: &[Option<Piece>],
36    ) -> Result<ArchivedHistorySegment, ReconstructorError> {
37        if input_pieces.len() < ArchivedHistorySegment::NUM_PIECES {
38            return Err(ReconstructorError::NotEnoughShards {
39                num_shards: input_pieces.len(),
40            });
41        }
42        let mut reconstructed_pieces = ArchivedHistorySegment::default();
43
44        {
45            let (source_input_pieces, parity_input_pieces) =
46                input_pieces.split_at(RecordedHistorySegment::NUM_RAW_RECORDS);
47            let (source_reconstructed_pieces, parity_reconstructed_pieces) =
48                reconstructed_pieces.split_at_mut(RecordedHistorySegment::NUM_RAW_RECORDS);
49
50            let source = source_input_pieces
51                .iter()
52                .zip(source_reconstructed_pieces)
53                .map(
54                    |(maybe_input_piece, output_piece)| match maybe_input_piece {
55                        Some(input_piece) => {
56                            output_piece
57                                .record_mut()
58                                .copy_from_slice(input_piece.record().as_slice());
59                            RecoveryShardState::Present(input_piece.record().as_flattened())
60                        }
61                        None => RecoveryShardState::MissingRecover(
62                            output_piece.record_mut().as_flattened_mut(),
63                        ),
64                    },
65                );
66            let parity = parity_input_pieces
67                .iter()
68                .zip(parity_reconstructed_pieces.iter_mut())
69                .map(
70                    |(maybe_input_piece, output_piece)| match maybe_input_piece {
71                        Some(input_piece) => {
72                            output_piece
73                                .record_mut()
74                                .copy_from_slice(input_piece.record().as_slice());
75                            RecoveryShardState::Present(input_piece.record().as_flattened())
76                        }
77                        None => RecoveryShardState::MissingRecover(
78                            output_piece.record_mut().as_flattened_mut(),
79                        ),
80                    },
81                );
82            self.erasure_coding.recover(source, parity)?;
83        }
84
85        let record_roots = {
86            #[cfg(not(feature = "parallel"))]
87            let iter = reconstructed_pieces.iter_mut().zip(input_pieces);
88            #[cfg(feature = "parallel")]
89            let iter = reconstructed_pieces.par_iter_mut().zip_eq(input_pieces);
90
91            iter.map(|(piece, maybe_input_piece)| {
92                let (record_root, parity_chunks_root) = if let Some(input_piece) = maybe_input_piece
93                {
94                    (**input_piece.root(), **input_piece.parity_chunks_root())
95                } else {
96                    // TODO: Reuse allocations between iterations
97                    let [source_chunks_root, parity_chunks_root] = {
98                        let mut parity_chunks = Record::new_boxed();
99
100                        self.erasure_coding
101                            .extend(piece.record().iter(), parity_chunks.iter_mut())?;
102
103                        let source_chunks_root =
104                            BalancedMerkleTree::compute_root_only(piece.record());
105
106                        let parity_chunks_root =
107                            BalancedMerkleTree::compute_root_only(&parity_chunks);
108
109                        [source_chunks_root, parity_chunks_root]
110                    };
111
112                    let record_root =
113                        BalancedMerkleTree::new(&[source_chunks_root, parity_chunks_root]).root();
114
115                    (record_root, parity_chunks_root)
116                };
117
118                piece.root_mut().copy_from_slice(&record_root);
119                piece
120                    .parity_chunks_root_mut()
121                    .copy_from_slice(&parity_chunks_root);
122
123                Ok::<_, ReconstructorError>(record_root)
124            })
125            .collect::<Result<Vec<_>, _>>()?
126        };
127
128        let segment_merkle_tree =
129            BalancedMerkleTree::<{ ArchivedHistorySegment::NUM_PIECES }>::new_boxed(
130                record_roots
131                    .as_slice()
132                    .try_into()
133                    .expect("Statically guaranteed to have correct length; qed"),
134            );
135
136        reconstructed_pieces
137            .iter_mut()
138            .zip(segment_merkle_tree.all_proofs())
139            .for_each(|(piece, record_proof)| {
140                piece.proof_mut().copy_from_slice(&record_proof);
141            });
142
143        Ok(reconstructed_pieces)
144    }
145
146    /// Returns all the pieces for a segment using given set of pieces of a segment of the archived
147    /// history (any half of all pieces are required to be present, the rest will be recovered
148    /// automatically due to use of erasure coding if needed).
149    pub fn reconstruct_segment(
150        &self,
151        segment_pieces: &[Option<Piece>],
152    ) -> Result<ArchivedHistorySegment, ReconstructorError> {
153        let pieces = self.reconstruct_shards(segment_pieces)?;
154
155        Ok(pieces.to_shared())
156    }
157
158    /// Returns the missing piece for a segment using given set of pieces of a segment of the
159    /// archived history (any half of all pieces are required to be present).
160    pub fn reconstruct_piece(
161        &self,
162        segment_pieces: &[Option<Piece>],
163        piece_position: PiecePosition,
164    ) -> Result<Piece, ReconstructorError> {
165        // TODO: Early exit if position already exists and doesn't need reconstruction
166        // TODO: It is now inefficient to recover all shards if only one piece is needed, especially
167        //  source piece
168        let pieces = self.reconstruct_shards(segment_pieces)?;
169
170        let piece = Piece::from(&pieces[piece_position]);
171
172        Ok(piece.to_shared())
173    }
174}