ab_client_archiving/
recreate.rs1use crate::task::encode_block;
4use ab_archiving::archiver::{Archiver, ArchiverInstantiationError, NewArchivedSegment};
5use ab_archiving::objects::BlockObject;
6use ab_client_api::{ChainInfo, ReadBlockError};
7use ab_core_primitives::block::BlockNumber;
8use ab_core_primitives::block::header::GenericBlockHeader;
9use ab_core_primitives::block::header::owned::GenericOwnedBlockHeader;
10use ab_core_primitives::block::owned::{GenericOwnedBlock, OwnedBeaconChainBlock};
11use ab_core_primitives::pieces::SegmentProof;
12use ab_core_primitives::segments::{SegmentHeader, SegmentPosition, SuperSegmentIndex};
13use ab_core_primitives::shard::ShardIndex;
14use ab_erasure_coding::ErasureCoding;
15use tokio::task::{JoinError, spawn_blocking};
16
17pub fn recreate_genesis_segment(
23 owned_genesis_block: &OwnedBeaconChainBlock,
24 erasure_coding: ErasureCoding,
25) -> NewArchivedSegment {
26 let encoded_block = encode_block(owned_genesis_block);
27
28 let block_outcome = Archiver::new(ShardIndex::BEACON_CHAIN, erasure_coding)
29 .add_block(encoded_block, Vec::new())
30 .expect("Block is never empty and doesn't exceed u32; qed");
31 let mut archived_segment = block_outcome
32 .archived_segments
33 .into_iter()
34 .next()
35 .expect("Genesis block always results in exactly one archived segment; qed");
36
37 for piece in archived_segment.pieces.iter_mut() {
38 piece.header.super_segment_index = SuperSegmentIndex::ZERO.into();
39 }
41
42 archived_segment.pieces = archived_segment.pieces.to_shared();
43
44 archived_segment
45}
46
47#[derive(Debug, thiserror::Error)]
49pub enum RecreateSegmentError {
50 #[error("Read block error: {0}")]
52 ReadBlockError(#[from] ReadBlockError),
53 #[error("Archiver instantiation error: {0}")]
55 ArchiverInstantiationError(#[from] ArchiverInstantiationError),
56 #[error("Failed to add block to the archiver")]
58 FailedToAddBlock,
59 #[error("Blocking task join error: {0}")]
61 BlockingTaskJoinError(#[from] JoinError),
62}
63
64#[derive(Debug, Copy, Clone)]
66#[repr(C)]
67pub struct RecreateSegmentSuperSegmentDetails {
68 pub super_segment_index: SuperSegmentIndex,
70 pub segment_position: SegmentPosition,
72 pub segment_proof: SegmentProof,
74}
75
76pub async fn recreate_segment<Block, CI, EBO>(
86 last_segment_header: Option<SegmentHeader>,
87 chain_info: &CI,
88 erasure_coding: ErasureCoding,
89 super_segment_details: &RecreateSegmentSuperSegmentDetails,
90 mut extract_block_objects: EBO,
91) -> Result<Option<NewArchivedSegment>, RecreateSegmentError>
92where
93 Block: GenericOwnedBlock,
94 CI: ChainInfo<Block>,
95 EBO: FnMut(&Block) -> Vec<BlockObject>,
96{
97 let best_block_root = chain_info.best_root();
98
99 let (start_block_number, mut archiver) = if let Some(last_segment_header) = last_segment_header
100 {
101 let first_block_number = last_segment_header.last_archived_block.number.as_inner();
102
103 let archiver = {
104 let Some(first_block_root) = chain_info
105 .ancestor_header(first_block_number, &best_block_root)
106 .map(|header| *header.header().root())
107 else {
108 return Ok(None);
109 };
110 let block = chain_info.block(&first_block_root).await?;
111 let shard_index = block.header().header().prefix.shard_index;
112 let encoded_block = encode_block(&block);
113
114 Archiver::with_initial_state(
115 shard_index,
116 erasure_coding,
117 last_segment_header,
118 &encoded_block,
119 extract_block_objects(&block),
120 )?
121 };
122
123 (
124 first_block_number.saturating_add(BlockNumber::ONE),
125 archiver,
126 )
127 } else {
128 let best_header = chain_info.best_header();
129 let archiver = Archiver::new(best_header.header().prefix.shard_index, erasure_coding);
130
131 (BlockNumber::ZERO, archiver)
132 };
133
134 for block_number in start_block_number.. {
135 let (encoded_block, block_objects) = {
136 let Some(block_root) = chain_info
137 .ancestor_header(block_number, &best_block_root)
138 .map(|header| *header.header().root())
139 else {
140 return Ok(None);
141 };
142 let block = chain_info.block(&block_root).await?;
143
144 (encode_block(&block), extract_block_objects(&block))
145 };
146
147 let task_fut = spawn_blocking(move || {
148 let maybe_outcome = archiver
149 .add_block(encoded_block, block_objects)
150 .ok_or(RecreateSegmentError::FailedToAddBlock);
151
152 (archiver, maybe_outcome)
153 });
154 let outcome;
155 (archiver, outcome) = task_fut.await?;
156 let outcome = outcome?;
157
158 if let Some(mut archived_segment) = outcome.archived_segments.into_iter().next() {
160 for piece in archived_segment.pieces.iter_mut() {
161 piece
162 .header
163 .super_segment_index
164 .replace(super_segment_details.super_segment_index);
165 piece
166 .header
167 .segment_position
168 .replace(super_segment_details.segment_position);
169 piece
170 .header
171 .segment_proof
172 .copy_from_slice(super_segment_details.segment_proof.as_slice());
173 }
174
175 return Ok(Some(archived_segment));
176 }
177 }
178
179 Ok(None)
180}