Skip to main content

ab_client_archiving/
recreate.rs

1//! Segment re-creation
2
3use crate::task::encode_block;
4use ab_archiving::archiver::{Archiver, ArchiverInstantiationError, NewArchivedSegment};
5use ab_archiving::objects::BlockObject;
6use ab_client_api::{ChainInfo, ReadBlockError};
7use ab_core_primitives::block::BlockNumber;
8use ab_core_primitives::block::header::GenericBlockHeader;
9use ab_core_primitives::block::header::owned::GenericOwnedBlockHeader;
10use ab_core_primitives::block::owned::{GenericOwnedBlock, OwnedBeaconChainBlock};
11use ab_core_primitives::pieces::SegmentProof;
12use ab_core_primitives::segments::{SegmentHeader, SegmentPosition, SuperSegmentIndex};
13use ab_core_primitives::shard::ShardIndex;
14use ab_erasure_coding::ErasureCoding;
15use tokio::task::{JoinError, spawn_blocking};
16
17/// Re-create the genesis segment on demand.
18///
19/// This is a bit of a hack and is useful for deriving of the genesis beacon chain segment that is a
20/// special case since we don't have enough data in the blockchain history itself during genesis to
21/// do the archiving.
22pub fn recreate_genesis_segment(
23    owned_genesis_block: &OwnedBeaconChainBlock,
24    erasure_coding: ErasureCoding,
25) -> NewArchivedSegment {
26    let encoded_block = encode_block(owned_genesis_block);
27
28    let block_outcome = Archiver::new(ShardIndex::BEACON_CHAIN, erasure_coding)
29        .add_block(encoded_block, Vec::new())
30        .expect("Block is never empty and doesn't exceed u32; qed");
31    let mut archived_segment = block_outcome
32        .archived_segments
33        .into_iter()
34        .next()
35        .expect("Genesis block always results in exactly one archived segment; qed");
36
37    for piece in archived_segment.pieces.iter_mut() {
38        piece.header.super_segment_index = SuperSegmentIndex::ZERO.into();
39        // Since there is a single segment in super segment, the proof is empty
40    }
41
42    archived_segment.pieces = archived_segment.pieces.to_shared();
43
44    archived_segment
45}
46
47/// Error for [`recreate_segment()`]
48#[derive(Debug, thiserror::Error)]
49pub enum RecreateSegmentError {
50    /// Read block error
51    #[error("Read block error: {0}")]
52    ReadBlockError(#[from] ReadBlockError),
53    /// Archiver instantiation error
54    #[error("Archiver instantiation error: {0}")]
55    ArchiverInstantiationError(#[from] ArchiverInstantiationError),
56    /// Failed to add block to the archiver
57    #[error("Failed to add block to the archiver")]
58    FailedToAddBlock,
59    /// Blocking task join error
60    #[error("Blocking task join error: {0}")]
61    BlockingTaskJoinError(#[from] JoinError),
62}
63
64/// Super segment details for [`recreate_segment()`]
65#[derive(Debug, Copy, Clone)]
66#[repr(C)]
67pub struct RecreateSegmentSuperSegmentDetails {
68    /// Super segment index
69    pub super_segment_index: SuperSegmentIndex,
70    /// Segment position in a super segment
71    pub segment_position: SegmentPosition,
72    /// Segment proof
73    pub segment_proof: SegmentProof,
74}
75
76/// Re-create a segment on demand.
77///
78/// `last_segment_header` corresponds to the last segment before the segment being re-created and
79/// indicates where segment archiving should start.
80///
81/// `extract_block_objects` allows extracting objects stored in blocks to translate them into global
82/// objects.
83///
84/// Returns `Ok(None)` if one of the segment blocks is already pruned.
85pub async fn recreate_segment<Block, CI, EBO>(
86    last_segment_header: Option<SegmentHeader>,
87    chain_info: &CI,
88    erasure_coding: ErasureCoding,
89    super_segment_details: &RecreateSegmentSuperSegmentDetails,
90    mut extract_block_objects: EBO,
91) -> Result<Option<NewArchivedSegment>, RecreateSegmentError>
92where
93    Block: GenericOwnedBlock,
94    CI: ChainInfo<Block>,
95    EBO: FnMut(&Block) -> Vec<BlockObject>,
96{
97    let best_block_root = chain_info.best_root();
98
99    let (start_block_number, mut archiver) = if let Some(last_segment_header) = last_segment_header
100    {
101        let first_block_number = last_segment_header.last_archived_block.number.as_inner();
102
103        let archiver = {
104            let Some(first_block_root) = chain_info
105                .ancestor_header(first_block_number, &best_block_root)
106                .map(|header| *header.header().root())
107            else {
108                return Ok(None);
109            };
110            let block = chain_info.block(&first_block_root).await?;
111            let shard_index = block.header().header().prefix.shard_index;
112            let encoded_block = encode_block(&block);
113
114            Archiver::with_initial_state(
115                shard_index,
116                erasure_coding,
117                last_segment_header,
118                &encoded_block,
119                extract_block_objects(&block),
120            )?
121        };
122
123        (
124            first_block_number.saturating_add(BlockNumber::ONE),
125            archiver,
126        )
127    } else {
128        let best_header = chain_info.best_header();
129        let archiver = Archiver::new(best_header.header().prefix.shard_index, erasure_coding);
130
131        (BlockNumber::ZERO, archiver)
132    };
133
134    for block_number in start_block_number.. {
135        let (encoded_block, block_objects) = {
136            let Some(block_root) = chain_info
137                .ancestor_header(block_number, &best_block_root)
138                .map(|header| *header.header().root())
139            else {
140                return Ok(None);
141            };
142            let block = chain_info.block(&block_root).await?;
143
144            (encode_block(&block), extract_block_objects(&block))
145        };
146
147        let task_fut = spawn_blocking(move || {
148            let maybe_outcome = archiver
149                .add_block(encoded_block, block_objects)
150                .ok_or(RecreateSegmentError::FailedToAddBlock);
151
152            (archiver, maybe_outcome)
153        });
154        let outcome;
155        (archiver, outcome) = task_fut.await?;
156        let outcome = outcome?;
157
158        // TODO: Return global objects once archiver API improves
159        if let Some(mut archived_segment) = outcome.archived_segments.into_iter().next() {
160            for piece in archived_segment.pieces.iter_mut() {
161                piece
162                    .header
163                    .super_segment_index
164                    .replace(super_segment_details.super_segment_index);
165                piece
166                    .header
167                    .segment_position
168                    .replace(super_segment_details.segment_position);
169                piece
170                    .header
171                    .segment_proof
172                    .copy_from_slice(super_segment_details.segment_proof.as_slice());
173            }
174
175            return Ok(Some(archived_segment));
176        }
177    }
178
179    Ok(None)
180}