ab_archiving/
piece_reconstructor.rs1use ab_core_primitives::pieces::{Piece, PiecePosition, Record};
2use ab_core_primitives::segments::{ArchivedHistorySegment, RecordedHistorySegment};
3use ab_erasure_coding::{ErasureCoding, ErasureCodingError, RecoveryShardState};
4use ab_merkle_tree::balanced::BalancedMerkleTree;
5use alloc::vec::Vec;
6#[cfg(feature = "parallel")]
7use rayon::prelude::*;
8
9#[derive(Debug, Clone, PartialEq, thiserror::Error)]
11pub enum ReconstructorError {
12 #[error("Error during data shards reconstruction: {0}")]
14 DataShardsReconstruction(#[from] ErasureCodingError),
15 #[error("Not enough shards: {num_shards}")]
17 NotEnoughShards { num_shards: usize },
18}
19
20#[derive(Debug, Clone)]
22pub struct PiecesReconstructor {
23 erasure_coding: ErasureCoding,
25}
26
27impl PiecesReconstructor {
28 pub fn new(erasure_coding: ErasureCoding) -> Self {
30 Self { erasure_coding }
31 }
32
33 fn reconstruct_shards(
34 &self,
35 input_pieces: &[Option<Piece>],
36 ) -> Result<ArchivedHistorySegment, ReconstructorError> {
37 if input_pieces.len() < ArchivedHistorySegment::NUM_PIECES {
38 return Err(ReconstructorError::NotEnoughShards {
39 num_shards: input_pieces.len(),
40 });
41 }
42 let mut reconstructed_pieces = ArchivedHistorySegment::default();
43
44 {
45 let (source_input_pieces, parity_input_pieces) =
46 input_pieces.split_at(RecordedHistorySegment::NUM_RAW_RECORDS);
47 let (source_reconstructed_pieces, parity_reconstructed_pieces) =
48 reconstructed_pieces.split_at_mut(RecordedHistorySegment::NUM_RAW_RECORDS);
49
50 let source = source_input_pieces
51 .iter()
52 .zip(source_reconstructed_pieces)
53 .map(
54 |(maybe_input_piece, output_piece)| match maybe_input_piece {
55 Some(input_piece) => {
56 output_piece
57 .record_mut()
58 .copy_from_slice(input_piece.record().as_slice());
59 RecoveryShardState::Present(input_piece.record().as_flattened())
60 }
61 None => RecoveryShardState::MissingRecover(
62 output_piece.record_mut().as_flattened_mut(),
63 ),
64 },
65 );
66 let parity = parity_input_pieces
67 .iter()
68 .zip(parity_reconstructed_pieces.iter_mut())
69 .map(
70 |(maybe_input_piece, output_piece)| match maybe_input_piece {
71 Some(input_piece) => {
72 output_piece
73 .record_mut()
74 .copy_from_slice(input_piece.record().as_slice());
75 RecoveryShardState::Present(input_piece.record().as_flattened())
76 }
77 None => RecoveryShardState::MissingRecover(
78 output_piece.record_mut().as_flattened_mut(),
79 ),
80 },
81 );
82 self.erasure_coding.recover(source, parity)?;
83 }
84
85 let record_roots = {
86 #[cfg(not(feature = "parallel"))]
87 let iter = reconstructed_pieces.iter_mut().zip(input_pieces);
88 #[cfg(feature = "parallel")]
89 let iter = reconstructed_pieces.par_iter_mut().zip_eq(input_pieces);
90
91 iter.map(|(piece, maybe_input_piece)| {
92 let (record_root, parity_chunks_root) = if let Some(input_piece) = maybe_input_piece
93 {
94 (**input_piece.root(), **input_piece.parity_chunks_root())
95 } else {
96 let [source_chunks_root, parity_chunks_root] = {
98 let mut parity_chunks = Record::new_boxed();
99
100 self.erasure_coding
101 .extend(piece.record().iter(), parity_chunks.iter_mut())?;
102
103 let source_chunks_root =
104 BalancedMerkleTree::compute_root_only(piece.record());
105
106 let parity_chunks_root =
107 BalancedMerkleTree::compute_root_only(&parity_chunks);
108
109 [source_chunks_root, parity_chunks_root]
110 };
111
112 let record_root =
113 BalancedMerkleTree::new(&[source_chunks_root, parity_chunks_root]).root();
114
115 (record_root, parity_chunks_root)
116 };
117
118 piece.root_mut().copy_from_slice(&record_root);
119 piece
120 .parity_chunks_root_mut()
121 .copy_from_slice(&parity_chunks_root);
122
123 Ok::<_, ReconstructorError>(record_root)
124 })
125 .collect::<Result<Vec<_>, _>>()?
126 };
127
128 let segment_merkle_tree =
129 BalancedMerkleTree::<{ ArchivedHistorySegment::NUM_PIECES }>::new_boxed(
130 record_roots
131 .as_slice()
132 .try_into()
133 .expect("Statically guaranteed to have correct length; qed"),
134 );
135
136 reconstructed_pieces
137 .iter_mut()
138 .zip(segment_merkle_tree.all_proofs())
139 .for_each(|(piece, record_proof)| {
140 piece.proof_mut().copy_from_slice(&record_proof);
141 });
142
143 Ok(reconstructed_pieces)
144 }
145
146 pub fn reconstruct_segment(
150 &self,
151 segment_pieces: &[Option<Piece>],
152 ) -> Result<ArchivedHistorySegment, ReconstructorError> {
153 let pieces = self.reconstruct_shards(segment_pieces)?;
154
155 Ok(pieces.to_shared())
156 }
157
158 pub fn reconstruct_piece(
161 &self,
162 segment_pieces: &[Option<Piece>],
163 piece_position: PiecePosition,
164 ) -> Result<Piece, ReconstructorError> {
165 let pieces = self.reconstruct_shards(segment_pieces)?;
169
170 let piece = Piece::from(&pieces[piece_position]);
171
172 Ok(piece.to_shared())
173 }
174}