ab_archiving/
piece_reconstructor.rs1use ab_core_primitives::pieces::{
2 Piece, PieceHeader, PiecePosition, Record, RecordChunksRoot, RecordProof, SegmentProof,
3};
4use ab_core_primitives::segments::{
5 ArchivedHistorySegment, LocalSegmentIndex, RecordedHistorySegment, SegmentPosition,
6 SegmentRoot, SuperSegmentIndex,
7};
8use ab_core_primitives::shard::ShardIndex;
9use ab_erasure_coding::{ErasureCoding, ErasureCodingError, RecoveryShardState};
10use ab_merkle_tree::balanced::BalancedMerkleTree;
11use alloc::vec::Vec;
12#[cfg(feature = "parallel")]
13use rayon::prelude::*;
14
15#[derive(Debug, Clone, PartialEq, thiserror::Error)]
17pub enum ReconstructorError {
18 #[error("Error during data shards reconstruction: {0}")]
20 DataShardsReconstruction(#[from] ErasureCodingError),
21 #[error("Not enough shards: {num_shards}")]
23 NotEnoughShards { num_shards: usize },
24}
25
26struct SharedPieceDetails {
27 shard_index: ShardIndex,
28 local_segment_index: LocalSegmentIndex,
29 super_segment_index: SuperSegmentIndex,
30 segment_position: SegmentPosition,
31 segment_root: SegmentRoot,
32 segment_proof: SegmentProof,
33}
34
35#[derive(Debug, Clone)]
37pub struct PiecesReconstructor {
38 erasure_coding: ErasureCoding,
40}
41
42impl PiecesReconstructor {
43 pub fn new(erasure_coding: ErasureCoding) -> Self {
45 Self { erasure_coding }
46 }
47
48 fn reconstruct_shards(
49 &self,
50 input_pieces: &[Option<Piece>],
51 ) -> Result<ArchivedHistorySegment, ReconstructorError> {
52 if input_pieces.len() < ArchivedHistorySegment::NUM_PIECES {
53 return Err(ReconstructorError::NotEnoughShards {
54 num_shards: input_pieces.len(),
55 });
56 }
57 let mut reconstructed_pieces = ArchivedHistorySegment::default();
58
59 let mut shared_piece_details = None;
61 {
62 let (source_input_pieces, parity_input_pieces) =
63 input_pieces.split_at(RecordedHistorySegment::NUM_RAW_RECORDS);
64 let (source_reconstructed_pieces, parity_reconstructed_pieces) =
65 reconstructed_pieces.split_at_mut(RecordedHistorySegment::NUM_RAW_RECORDS);
66
67 let source = source_input_pieces
68 .iter()
69 .zip(source_reconstructed_pieces)
70 .map(
71 |(maybe_input_piece, output_piece)| match maybe_input_piece {
72 Some(input_piece) => {
73 if shared_piece_details.is_none() {
74 shared_piece_details.replace(SharedPieceDetails {
75 shard_index: input_piece.header.shard_index.as_inner(),
76 local_segment_index: input_piece
77 .header
78 .local_segment_index
79 .as_inner(),
80 super_segment_index: input_piece
81 .header
82 .super_segment_index
83 .as_inner(),
84 segment_position: input_piece
85 .header
86 .segment_position
87 .as_inner(),
88 segment_root: input_piece.header.segment_root,
89 segment_proof: input_piece.header.segment_proof,
90 });
91 }
92 output_piece.record.copy_from_slice(&*input_piece.record);
96 RecoveryShardState::Present(input_piece.record.as_flattened())
97 }
98 None => RecoveryShardState::MissingRecover(
99 output_piece.record.as_flattened_mut(),
100 ),
101 },
102 );
103 let parity = parity_input_pieces
104 .iter()
105 .zip(parity_reconstructed_pieces.iter_mut())
106 .map(
107 |(maybe_input_piece, output_piece)| match maybe_input_piece {
108 Some(input_piece) => {
109 output_piece.record.copy_from_slice(&*input_piece.record);
113 RecoveryShardState::Present(input_piece.record.as_flattened())
114 }
115 None => RecoveryShardState::MissingRecover(
116 output_piece.record.as_flattened_mut(),
117 ),
118 },
119 );
120 self.erasure_coding.recover(source, parity)?;
121 }
122 let SharedPieceDetails {
123 shard_index,
124 local_segment_index,
125 super_segment_index,
126 segment_position,
127 segment_root,
128 segment_proof,
129 } = shared_piece_details.expect(
130 "Sucessful recovery means there was at least one piece to fill this Option; qed",
131 );
132
133 let record_roots = {
134 #[cfg(not(feature = "parallel"))]
135 let iter = reconstructed_pieces.iter_mut().zip(input_pieces);
136 #[cfg(feature = "parallel")]
137 let iter = reconstructed_pieces.par_iter_mut().zip_eq(input_pieces);
138
139 iter.map(|(piece, maybe_input_piece)| {
140 let (record_root, parity_chunks_root) = if let Some(input_piece) = maybe_input_piece
141 {
142 (
143 *input_piece.record_root(),
144 *input_piece.header.parity_chunks_root,
145 )
146 } else {
147 let [source_chunks_root, parity_chunks_root] = {
149 let mut parity_chunks = Record::new_boxed();
150
151 self.erasure_coding
152 .extend(piece.record.iter(), parity_chunks.iter_mut())?;
153
154 let source_chunks_root = *piece.record.source_chunks_root();
155 let parity_chunks_root =
156 BalancedMerkleTree::compute_root_only(&parity_chunks);
157
158 [source_chunks_root, parity_chunks_root]
159 };
160
161 let record_root =
162 BalancedMerkleTree::new(&[source_chunks_root, parity_chunks_root]).root();
163
164 (record_root, parity_chunks_root)
165 };
166
167 piece.header.parity_chunks_root = RecordChunksRoot::from(parity_chunks_root);
168
169 Ok::<_, ReconstructorError>(record_root)
170 })
171 .collect::<Result<Vec<_>, _>>()?
172 };
173
174 let segment_merkle_tree =
175 BalancedMerkleTree::<{ ArchivedHistorySegment::NUM_PIECES }>::new_boxed(
176 record_roots
177 .as_slice()
178 .try_into()
179 .expect("Statically guaranteed to have correct length; qed"),
180 );
181
182 reconstructed_pieces
183 .iter_mut()
184 .zip(segment_merkle_tree.all_proofs())
185 .for_each(|(piece, record_proof)| {
186 piece.header = PieceHeader {
187 shard_index: shard_index.into(),
188 local_segment_index: local_segment_index.into(),
189 super_segment_index: super_segment_index.into(),
190 segment_position: segment_position.into(),
191 segment_root,
192 segment_proof,
193 parity_chunks_root: piece.header.parity_chunks_root,
194 record_proof: RecordProof::from(record_proof),
195 };
196 });
197
198 Ok(reconstructed_pieces)
199 }
200
201 pub fn reconstruct_segment(
207 &self,
208 segment_pieces: &[Option<Piece>],
209 ) -> Result<ArchivedHistorySegment, ReconstructorError> {
210 let pieces = self.reconstruct_shards(segment_pieces)?;
211
212 Ok(pieces.to_shared())
213 }
214
215 pub fn reconstruct_piece(
218 &self,
219 segment_pieces: &[Option<Piece>],
220 piece_position: PiecePosition,
221 ) -> Result<Piece, ReconstructorError> {
222 let pieces = self.reconstruct_shards(segment_pieces)?;
226
227 let piece = Piece::from(&pieces[piece_position]);
228
229 Ok(piece.to_shared())
230 }
231}