ab_archiving/
piece_reconstructor.rs1use ab_core_primitives::pieces::{Piece, Record};
2use ab_core_primitives::segments::{ArchivedHistorySegment, RecordedHistorySegment};
3use ab_erasure_coding::{ErasureCoding, ErasureCodingError, RecoveryShardState};
4use ab_merkle_tree::balanced_hashed::BalancedHashedMerkleTree;
5use alloc::vec::Vec;
6#[cfg(feature = "parallel")]
7use rayon::prelude::*;
8
9#[derive(Debug, Clone, PartialEq, thiserror::Error)]
11pub enum ReconstructorError {
12 #[error("Error during data shards reconstruction: {0}")]
14 DataShardsReconstruction(#[from] ErasureCodingError),
15 #[error("Not enough shards: {num_shards}")]
17 NotEnoughShards { num_shards: usize },
18 #[error("Incorrect piece position provided.")]
20 IncorrectPiecePosition,
21}
22
23#[derive(Debug, Clone)]
25pub struct PiecesReconstructor {
26 erasure_coding: ErasureCoding,
28}
29
30impl PiecesReconstructor {
31 pub fn new(erasure_coding: ErasureCoding) -> Self {
33 Self { erasure_coding }
34 }
35
36 fn reconstruct_shards(
37 &self,
38 input_pieces: &[Option<Piece>],
39 ) -> Result<ArchivedHistorySegment, ReconstructorError> {
40 if input_pieces.len() < ArchivedHistorySegment::NUM_PIECES {
41 return Err(ReconstructorError::NotEnoughShards {
42 num_shards: input_pieces.len(),
43 });
44 }
45 let mut reconstructed_pieces = ArchivedHistorySegment::default();
46
47 {
48 let (source_input_pieces, parity_input_pieces) =
49 input_pieces.split_at(RecordedHistorySegment::NUM_RAW_RECORDS);
50 let (source_reconstructed_pieces, parity_reconstructed_pieces) =
51 reconstructed_pieces.split_at_mut(RecordedHistorySegment::NUM_RAW_RECORDS);
52
53 let source = source_input_pieces
54 .iter()
55 .zip(source_reconstructed_pieces)
56 .map(
57 |(maybe_input_piece, output_piece)| match maybe_input_piece {
58 Some(input_piece) => {
59 output_piece
60 .record_mut()
61 .copy_from_slice(input_piece.record().as_slice());
62 RecoveryShardState::Present(input_piece.record().as_flattened())
63 }
64 None => RecoveryShardState::MissingRecover(
65 output_piece.record_mut().as_flattened_mut(),
66 ),
67 },
68 );
69 let parity = parity_input_pieces
70 .iter()
71 .zip(parity_reconstructed_pieces.iter_mut())
72 .map(
73 |(maybe_input_piece, output_piece)| match maybe_input_piece {
74 Some(input_piece) => {
75 output_piece
76 .record_mut()
77 .copy_from_slice(input_piece.record().as_slice());
78 RecoveryShardState::Present(input_piece.record().as_flattened())
79 }
80 None => RecoveryShardState::MissingRecover(
81 output_piece.record_mut().as_flattened_mut(),
82 ),
83 },
84 );
85 self.erasure_coding.recover(source, parity)?;
86 }
87
88 let record_roots = {
89 #[cfg(not(feature = "parallel"))]
90 let iter = reconstructed_pieces.iter_mut().zip(input_pieces);
91 #[cfg(feature = "parallel")]
92 let iter = reconstructed_pieces.par_iter_mut().zip_eq(input_pieces);
93
94 iter.map(|(piece, maybe_input_piece)| {
95 let (record_root, parity_chunks_root) = if let Some(input_piece) = maybe_input_piece
96 {
97 (**input_piece.root(), **input_piece.parity_chunks_root())
98 } else {
99 let [source_chunks_root, parity_chunks_root] = {
101 let mut parity_chunks = Record::new_boxed();
102
103 self.erasure_coding
104 .extend(piece.record().iter(), parity_chunks.iter_mut())?;
105
106 let source_chunks_root =
107 BalancedHashedMerkleTree::compute_root_only(piece.record());
108
109 let parity_chunks_root =
110 BalancedHashedMerkleTree::compute_root_only(&parity_chunks);
111
112 [source_chunks_root, parity_chunks_root]
113 };
114
115 let record_root =
116 BalancedHashedMerkleTree::new(&[source_chunks_root, parity_chunks_root])
117 .root();
118
119 (record_root, parity_chunks_root)
120 };
121
122 piece.root_mut().copy_from_slice(&record_root);
123 piece
124 .parity_chunks_root_mut()
125 .copy_from_slice(&parity_chunks_root);
126
127 Ok::<_, ReconstructorError>(record_root)
128 })
129 .collect::<Result<Vec<_>, _>>()?
130 };
131
132 let segment_merkle_tree =
133 BalancedHashedMerkleTree::<{ ArchivedHistorySegment::NUM_PIECES }>::new_boxed(
134 record_roots
135 .as_slice()
136 .try_into()
137 .expect("Statically guaranteed to have correct length; qed"),
138 );
139
140 reconstructed_pieces
141 .iter_mut()
142 .zip(segment_merkle_tree.all_proofs())
143 .for_each(|(piece, record_proof)| {
144 piece.proof_mut().copy_from_slice(&record_proof);
145 });
146
147 Ok(reconstructed_pieces)
148 }
149
150 pub fn reconstruct_segment(
154 &self,
155 segment_pieces: &[Option<Piece>],
156 ) -> Result<ArchivedHistorySegment, ReconstructorError> {
157 let pieces = self.reconstruct_shards(segment_pieces)?;
158
159 Ok(pieces.to_shared())
160 }
161
162 pub fn reconstruct_piece(
165 &self,
166 segment_pieces: &[Option<Piece>],
167 piece_position: usize,
168 ) -> Result<Piece, ReconstructorError> {
169 if piece_position >= ArchivedHistorySegment::NUM_PIECES {
170 return Err(ReconstructorError::IncorrectPiecePosition);
171 }
172
173 let pieces = self.reconstruct_shards(segment_pieces)?;
177
178 let piece = Piece::from(&pieces[piece_position]);
179
180 Ok(piece.to_shared())
181 }
182}