Skip to main content

ab_client_archiving/
segment.rs

1//! Segment archiving.
2//!
3//! Implements archiving process in that converts blockchain history (blocks) into archived history
4//! (segments and pieces).
5//!
6//! The main entry point here is [`create_segment_archiver_task`] that will create a task, which
7//! while driven will perform the archiving itself.
8//!
9//! Archiving itself will also wait for acknowledgement by various subscribers before proceeding,
10//! which includes farmer subscription, in case of reference implementation via RPC.
11//!
12//! All segment headers of the archived segments are available to other parts of the protocol that
13//! need to know what the correct archival history of the blockchain looks like through
14//! [`ChainInfo`]. For example, it is used during node sync and farmer plotting to verify pieces of
15//! archival history received from other network participants. Future segment header might also be
16//! already known in the case of syncing from DSN.
17//!
18//! [`recreate_genesis_segment`] is a bit of a hack and is useful for deriving of the genesis beacon
19//! chain segment that is a special case since we don't have enough data in the blockchain history
20//! itself during genesis to do the archiving.
21//!
22//! [`encode_block`] and [`decode_block`] are symmetric encoding/decoding functions turning
23//! Blocks into bytes and back.
24
25use ab_aligned_buffer::SharedAlignedBuffer;
26use ab_archiving::archiver::{Archiver, ArchiverInstantiationError, NewArchivedSegment};
27use ab_client_api::{ChainInfo, ChainInfoWrite, PersistSegmentHeadersError};
28use ab_client_consensus_common::{BlockImportingNotification, ConsensusConstants};
29use ab_core_primitives::block::body::owned::GenericOwnedBlockBody;
30use ab_core_primitives::block::header::GenericBlockHeader;
31use ab_core_primitives::block::header::owned::GenericOwnedBlockHeader;
32use ab_core_primitives::block::owned::{GenericOwnedBlock, OwnedBeaconChainBlock};
33use ab_core_primitives::block::{BlockNumber, BlockRoot, GenericBlock};
34use ab_core_primitives::segments::{
35    LocalSegmentIndex, RecordedHistorySegment, SegmentHeader, SuperSegmentIndex,
36};
37use ab_core_primitives::shard::{RealShardKind, ShardIndex};
38use ab_erasure_coding::ErasureCoding;
39use bytesize::ByteSize;
40use chacha20::ChaCha8Rng;
41use chacha20::rand_core::{Rng, SeedableRng};
42use futures::channel::mpsc;
43use futures::prelude::*;
44use std::sync::Arc;
45use std::time::Duration;
46use tracing::{debug, info, trace, warn};
47
48/// Do not wait for acknowledgements beyond this time limit
49const ACKNOWLEDGEMENT_TIMEOUT: Duration = Duration::from_mins(2);
50
51// TODO: Maybe use or remove if database handles this completely on its own
52// /// How deep (in segments) should block be in order to be finalized.
53// ///
54// /// This is required for full nodes to not prune recent history such that keep-up sync in
55// /// Substrate works even without archival nodes (initial sync will be done from DSN).
56// ///
57// /// Ideally, we'd decouple pruning from finalization, but it may require invasive changes in
58// /// Substrate and is not worth it right now.
59// /// https://github.com/paritytech/substrate/discussions/14359
60// const FINALIZATION_DEPTH_IN_SEGMENTS: SegmentIndex = SegmentIndex::from(5);
61
62/// Notification with a new archived segment that was just archived
63#[derive(Debug)]
64pub struct ArchivedSegmentNotification {
65    /// Archived segment.
66    pub archived_segment: Arc<NewArchivedSegment>,
67    /// Sender that signified the fact of receiving an archived segment by farmer.
68    ///
69    /// This must be used to send a message, or else the block import pipeline will get stuck.
70    pub acknowledgement_sender: mpsc::Sender<()>,
71}
72
73async fn find_last_archived_block<Block, CI>(
74    chain_info: &CI,
75    best_block_number_to_archive: BlockNumber,
76    best_block_root: &BlockRoot,
77) -> Option<(SegmentHeader, Block)>
78where
79    Block: GenericOwnedBlock,
80    CI: ChainInfo<Block>,
81{
82    let max_local_segment_index = chain_info.last_segment_header()?.segment_index.as_inner();
83
84    if max_local_segment_index == LocalSegmentIndex::ZERO {
85        // Just genesis, nothing else to check
86        return None;
87    }
88
89    for segment_header in (LocalSegmentIndex::ZERO..=max_local_segment_index)
90        .rev()
91        .filter_map(|segment_index| chain_info.get_segment_header(segment_index))
92    {
93        let last_archived_block_number = segment_header.last_archived_block.number();
94
95        if last_archived_block_number > best_block_number_to_archive {
96            // Last archived block in segment header is too high for the current state of the chain
97            // (segment headers store may know about more blocks in existence than is currently
98            // imported)
99            continue;
100        }
101
102        let Some(last_archived_block_header) =
103            chain_info.ancestor_header(last_archived_block_number, best_block_root)
104        else {
105            // This block number is not in our chain yet (segment headers store may know about more
106            // blocks in existence than is currently imported)
107            continue;
108        };
109
110        let last_archived_block_root = &*last_archived_block_header.header().root();
111
112        let Ok(last_archived_block) = chain_info.block(last_archived_block_root).await else {
113            // Block might have been pruned between ancestor search and disk read
114            continue;
115        };
116
117        return Some((segment_header, last_archived_block));
118    }
119
120    None
121}
122
123/// Derive the genesis segment on demand, returns `Ok(None)` in case the genesis block was already
124/// pruned
125pub fn recreate_genesis_segment(
126    owned_genesis_block: &OwnedBeaconChainBlock,
127    erasure_coding: ErasureCoding,
128) -> NewArchivedSegment {
129    let encoded_block = encode_block(owned_genesis_block);
130
131    let block_outcome = Archiver::new(ShardIndex::BEACON_CHAIN, erasure_coding)
132        .add_block(encoded_block, Vec::new())
133        .expect("Block is never empty and doesn't exceed u32; qed");
134    let mut archived_segment = block_outcome
135        .archived_segments
136        .into_iter()
137        .next()
138        .expect("Genesis block always results in exactly one archived segment; qed");
139
140    for piece in archived_segment.pieces.iter_mut() {
141        piece.header.super_segment_index = SuperSegmentIndex::ZERO.into();
142        // Since there is a single segment in super segment, the proof is empty
143    }
144
145    archived_segment.pieces = archived_segment.pieces.to_shared();
146
147    archived_segment
148}
149
150/// Encode block for archiving purposes
151pub fn encode_block<Block>(block: &Block) -> Vec<u8>
152where
153    Block: GenericOwnedBlock,
154{
155    let is_beacon_chain_genesis_block = Block::Block::SHARD_KIND == RealShardKind::BeaconChain
156        && block.header().header().prefix.number == BlockNumber::ZERO;
157    let header_buffer = block.header().buffer();
158    let body_buffer = block.body().buffer();
159
160    // TODO: Extra allocation here is unfortunate, would be nice to avoid it
161    let mut encoded_block = Vec::with_capacity(if is_beacon_chain_genesis_block {
162        RecordedHistorySegment::SIZE
163    } else {
164        size_of::<u32>() * 2 + header_buffer.len() as usize + body_buffer.len() as usize
165    });
166
167    encoded_block.extend_from_slice(&header_buffer.len().to_le_bytes());
168    encoded_block.extend_from_slice(&body_buffer.len().to_le_bytes());
169    encoded_block.extend_from_slice(header_buffer);
170    encoded_block.extend_from_slice(body_buffer);
171
172    if is_beacon_chain_genesis_block {
173        let encoded_block_length = encoded_block.len();
174
175        // Encoding of the genesis block is extended with extra data such that the very first
176        // archived segment can be produced right away, bootstrapping the farming process.
177        //
178        // Note: we add it to the end of the encoded block, so during decoding it'll actually be
179        // ignored even though it is technically present in encoded form.
180        encoded_block.resize(RecordedHistorySegment::SIZE, 0);
181        let mut rng = ChaCha8Rng::from_seed(*block.header().header().result.state_root);
182        rng.fill_bytes(&mut encoded_block[encoded_block_length..]);
183    }
184
185    encoded_block
186}
187
188/// Symmetrical to [`encode_block()`], used to decode previously encoded blocks
189pub fn decode_block<Block>(mut encoded_block: &[u8]) -> Option<Block>
190where
191    Block: GenericOwnedBlock,
192{
193    let header_length = {
194        let header_length = encoded_block.split_off(..size_of::<u32>())?;
195        u32::from_le_bytes([
196            header_length[0],
197            header_length[1],
198            header_length[2],
199            header_length[3],
200        ])
201    };
202    let body_length = {
203        let body_length = encoded_block.split_off(..size_of::<u32>())?;
204        u32::from_le_bytes([
205            body_length[0],
206            body_length[1],
207            body_length[2],
208            body_length[3],
209        ])
210    };
211
212    let header_buffer = encoded_block.split_off(..header_length as usize)?;
213    let body_buffer = encoded_block.split_off(..body_length as usize)?;
214
215    let header_buffer = SharedAlignedBuffer::from_bytes(header_buffer);
216    let body_buffer = SharedAlignedBuffer::from_bytes(body_buffer);
217
218    Block::from_buffers(header_buffer, body_buffer)
219}
220
221/// Segment archiver task error
222#[derive(Debug, thiserror::Error)]
223pub enum SegmentArchiverTaskError {
224    /// Archiver instantiation error
225    #[error("Archiver instantiation error: {error}")]
226    Instantiation {
227        /// Low-level error
228        #[from]
229        error: ArchiverInstantiationError,
230    },
231    /// Failed to persist a new segment header
232    #[error("Failed to persist a new segment header: {error}")]
233    PersistSegmentHeaders {
234        /// Low-level error
235        #[from]
236        error: PersistSegmentHeadersError,
237    },
238    /// Attempt to switch to a different fork beyond archiving depth
239    #[error(
240        "Attempt to switch to a different fork beyond archiving depth: parent block root \
241        {parent_block_root}, best archived block root {best_archived_block_root}"
242    )]
243    ArchivingReorg {
244        /// Parent block root
245        parent_block_root: BlockRoot,
246        /// Best archived block root
247        best_archived_block_root: BlockRoot,
248    },
249    /// There was a gap in blockchain history, and the last contiguous series of blocks doesn't
250    /// start with the archived segment
251    #[error(
252        "There was a gap in blockchain history, and the last contiguous series of blocks doesn't \
253        start with the archived segment (best archived block number {best_archived_block_number}, \
254        block number to archive {block_number_to_archive}), block about to be imported \
255        {importing_block_number})"
256    )]
257    BlockGap {
258        /// Best archived block number
259        best_archived_block_number: BlockNumber,
260        /// Block number to archive
261        block_number_to_archive: BlockNumber,
262        /// Importing block number
263        importing_block_number: BlockNumber,
264    },
265}
266
267struct InitializedArchiver {
268    archiver: Archiver,
269    best_archived_block: (BlockRoot, BlockNumber),
270}
271
272async fn initialize_archiver<Block, CI>(
273    chain_info: &CI,
274    block_confirmation_depth: BlockNumber,
275    erasure_coding: ErasureCoding,
276) -> Result<InitializedArchiver, SegmentArchiverTaskError>
277where
278    Block: GenericOwnedBlock,
279    CI: ChainInfoWrite<Block>,
280{
281    let best_block_header = chain_info.best_header();
282    let best_block_root = *best_block_header.header().root();
283    let best_block_number: BlockNumber = best_block_header.header().prefix.number;
284
285    let mut best_block_to_archive = best_block_number.saturating_sub(block_confirmation_depth);
286
287    if (best_block_to_archive..best_block_number).any(|block_number| {
288        chain_info
289            .ancestor_header(block_number, &best_block_root)
290            .is_none()
291    }) {
292        // If there are blocks missing headers between best block to archive and best block of the
293        // blockchain it means newer block was inserted in some special way and as such is by
294        // definition valid, so we can simply assume that is our best block to archive instead
295        best_block_to_archive = best_block_number;
296    }
297
298    let maybe_last_archived_block =
299        find_last_archived_block(chain_info, best_block_to_archive, &best_block_root).await;
300
301    let have_last_segment_header = maybe_last_archived_block.is_some();
302    let mut best_archived_block = None::<(BlockRoot, BlockNumber)>;
303
304    let mut archiver =
305        if let Some((last_segment_header, last_archived_block)) = maybe_last_archived_block {
306            // Continuing from existing initial state
307            let last_archived_block_number = last_segment_header.last_archived_block.number;
308            info!(
309                %last_archived_block_number,
310                "Resuming archiver from last archived block",
311            );
312
313            let last_archived_block_header = last_archived_block.header().header();
314            // Set initial value, this is needed in case only genesis block was archived and there
315            // is nothing else available
316            best_archived_block.replace((
317                *last_archived_block_header.root(),
318                last_archived_block_header.prefix.number,
319            ));
320
321            let last_archived_block_encoded = encode_block(&last_archived_block);
322
323            Archiver::with_initial_state(
324                best_block_header.header().prefix.shard_index,
325                erasure_coding,
326                last_segment_header,
327                &last_archived_block_encoded,
328                Vec::new(),
329            )?
330        } else {
331            info!("Starting archiving from genesis");
332
333            Archiver::new(
334                best_block_header.header().prefix.shard_index,
335                erasure_coding,
336            )
337        };
338
339    // Process blocks since last fully archived block up to the current head minus K
340    {
341        let blocks_to_archive_from = archiver
342            .last_archived_block_number()
343            .map(|n| n + BlockNumber::ONE)
344            .unwrap_or_default();
345        let blocks_to_archive_to = best_block_number
346            .checked_sub(block_confirmation_depth)
347            .filter(|&blocks_to_archive_to| blocks_to_archive_to >= blocks_to_archive_from)
348            .or({
349                if have_last_segment_header {
350                    None
351                } else {
352                    // If not continuation, archive genesis block
353                    Some(BlockNumber::ZERO)
354                }
355            });
356
357        if let Some(blocks_to_archive_to) = blocks_to_archive_to {
358            info!(
359                "Archiving already produced blocks {}..={}",
360                blocks_to_archive_from, blocks_to_archive_to,
361            );
362
363            for block_number_to_archive in blocks_to_archive_from..=blocks_to_archive_to {
364                let header = chain_info
365                    .ancestor_header(block_number_to_archive, &best_block_root)
366                    .expect("All blocks since last archived must be present; qed");
367
368                let block = chain_info
369                    .block(&header.header().root())
370                    .await
371                    .expect("All blocks since last archived must be present; qed");
372
373                let encoded_block = encode_block(&block);
374
375                debug!(
376                    "Encoded block {} has size of {}",
377                    block_number_to_archive,
378                    ByteSize::b(encoded_block.len() as u64).display().iec(),
379                );
380
381                let block_outcome = archiver
382                    .add_block(encoded_block, Vec::new())
383                    .expect("Block is never empty and doesn't exceed u32; qed");
384                let new_segment_headers: Vec<SegmentHeader> = block_outcome
385                    .archived_segments
386                    .iter()
387                    .map(|archived_segment| archived_segment.segment_header)
388                    .collect();
389
390                if !new_segment_headers.is_empty() {
391                    chain_info
392                        .persist_segment_headers(new_segment_headers)
393                        .await?;
394                }
395
396                if block_number_to_archive == blocks_to_archive_to {
397                    best_archived_block.replace((*header.header().root(), block_number_to_archive));
398                }
399            }
400        }
401    }
402
403    Ok(InitializedArchiver {
404        archiver,
405        best_archived_block: best_archived_block
406            .expect("Must always set if there is no logical error; qed"),
407    })
408}
409
410// TODO: Public API for re-archiving of blocks with ability to produce object mappings
411/// Create a segment archiver task.
412///
413/// Archiver task will listen for importing blocks and archive blocks at `K` depth, producing pieces
414/// and segment headers. It produces local segments initially, which after confirmation by the
415/// beacon chain will be updated with the necessary proof and given a global segment index.
416///
417/// NOTE: Archiver is doing blocking operations and must run in a dedicated task.
418///
419/// Archiver is only able to move forward and doesn't support reorgs. Upon restart, it will check
420/// segments in [`ChainInfo`] and chain history to reconstruct the "current" state it was in before
421/// the last shutdown and continue incrementally archiving blockchain history from there.
422///
423/// Archiving is triggered by block importing notification (`block_importing_notification_receiver`)
424/// and tries to archive the block at [`ConsensusConstants::block_confirmation_depth`] depth from
425/// the block being imported. Block importing will then wait for archiver to acknowledge processing,
426/// which is necessary for ensuring that when the next block is imported, the newly archived segment
427/// is already available deterministically.
428///
429/// Once a new segment is archived, a notification (`archived_segment_notification_sender`) will be
430/// sent and archiver will be paused until all receivers have provided an acknowledgement for it (or
431/// a very generous timeout has passed).
432pub async fn create_segment_archiver_task<Block, CI>(
433    chain_info: CI,
434    mut block_importing_notification_receiver: mpsc::Receiver<BlockImportingNotification>,
435    mut archived_segment_notification_sender: mpsc::Sender<ArchivedSegmentNotification>,
436    consensus_constants: ConsensusConstants,
437    erasure_coding: ErasureCoding,
438) -> Result<
439    impl Future<Output = Result<(), SegmentArchiverTaskError>> + Send + 'static,
440    SegmentArchiverTaskError,
441>
442where
443    Block: GenericOwnedBlock,
444    CI: ChainInfoWrite<Block> + 'static,
445{
446    let maybe_archiver = if chain_info.last_segment_header().is_none() {
447        let initialize_archiver_fut = initialize_archiver(
448            &chain_info,
449            consensus_constants.block_confirmation_depth,
450            erasure_coding.clone(),
451        );
452        Some(initialize_archiver_fut.await?)
453    } else {
454        None
455    };
456
457    Ok(async move {
458        let archiver = match maybe_archiver {
459            Some(archiver) => archiver,
460            None => {
461                let initialize_archiver_fut = initialize_archiver(
462                    &chain_info,
463                    consensus_constants.block_confirmation_depth,
464                    erasure_coding.clone(),
465                );
466                initialize_archiver_fut.await?
467            }
468        };
469
470        let InitializedArchiver {
471            mut archiver,
472            best_archived_block,
473        } = archiver;
474        let (mut best_archived_block_root, mut best_archived_block_number) = best_archived_block;
475
476        while let Some(block_importing_notification) =
477            block_importing_notification_receiver.next().await
478        {
479            let importing_block_number = block_importing_notification.block_number;
480            let block_number_to_archive = match importing_block_number
481                .checked_sub(consensus_constants.block_confirmation_depth)
482            {
483                Some(block_number_to_archive) => block_number_to_archive,
484                None => {
485                    // Too early to archive blocks
486                    continue;
487                }
488            };
489
490            let last_archived_block_number = chain_info
491                .last_segment_header()
492                .expect("Exists after archiver initialization; qed")
493                .last_archived_block
494                .number();
495            trace!(
496                %importing_block_number,
497                %block_number_to_archive,
498                %best_archived_block_number,
499                %last_archived_block_number,
500                "Checking if block needs to be skipped"
501            );
502
503            // Skip archived blocks
504            let skip_last_archived_blocks = last_archived_block_number > block_number_to_archive;
505            if best_archived_block_number >= block_number_to_archive || skip_last_archived_blocks {
506                // This block was already archived, skip
507                debug!(
508                    %importing_block_number,
509                    %block_number_to_archive,
510                    %best_archived_block_number,
511                    %last_archived_block_number,
512                    "Skipping already archived block",
513                );
514                continue;
515            }
516
517            let best_block_root = chain_info.best_root();
518
519            // In case there was a block gap, re-initialize archiver and continue with the current
520            // block number (rather than block number at some depth) to allow for special sync
521            // modes where pre-verified blocks are inserted at some point in the future comparing to
522            // previously existing blocks
523            if best_archived_block_number + BlockNumber::ONE != block_number_to_archive {
524                let initialize_archiver_fut = initialize_archiver(
525                    &chain_info,
526                    consensus_constants.block_confirmation_depth,
527                    erasure_coding.clone(),
528                );
529                InitializedArchiver {
530                    archiver,
531                    best_archived_block: (best_archived_block_root, best_archived_block_number),
532                } = initialize_archiver_fut.await?;
533
534                if best_archived_block_number + BlockNumber::ONE == block_number_to_archive {
535                    // As expected, can archive this block
536                } else if best_archived_block_number >= block_number_to_archive {
537                    // Special sync mode where verified blocks were inserted into the blockchain
538                    // directly, archiving of this block will naturally happen later
539                    continue;
540                } else if chain_info
541                    .ancestor_header(importing_block_number - BlockNumber::ONE, &best_block_root)
542                    .is_none()
543                {
544                    // We may have imported some block using special sync mode, so the block about
545                    // to be imported is the first one after the gap at which archiver is supposed
546                    // to be initialized, but we are only about to import it, so wait for the next
547                    // block for now
548                    continue;
549                } else {
550                    return Err(SegmentArchiverTaskError::BlockGap {
551                        best_archived_block_number,
552                        block_number_to_archive,
553                        importing_block_number,
554                    });
555                }
556            }
557
558            (best_archived_block_root, best_archived_block_number) = archive_block(
559                &mut archiver,
560                &chain_info,
561                &mut archived_segment_notification_sender,
562                best_archived_block_root,
563                block_number_to_archive,
564                &best_block_root,
565            )
566            .await?;
567        }
568
569        Ok(())
570    })
571}
572
573/// Tries to archive `block_number` and returns new (or old if not changed) best archived block
574async fn archive_block<Block, CI>(
575    archiver: &mut Archiver,
576    chain_info: &CI,
577    archived_segment_notification_sender: &mut mpsc::Sender<ArchivedSegmentNotification>,
578    best_archived_block_root: BlockRoot,
579    block_number_to_archive: BlockNumber,
580    best_block_root: &BlockRoot,
581) -> Result<(BlockRoot, BlockNumber), SegmentArchiverTaskError>
582where
583    Block: GenericOwnedBlock,
584    CI: ChainInfoWrite<Block>,
585{
586    let header = chain_info
587        .ancestor_header(block_number_to_archive, best_block_root)
588        .expect("All blocks since last archived must be present; qed");
589
590    let parent_block_root = header.header().prefix.parent_root;
591    if parent_block_root != best_archived_block_root {
592        return Err(SegmentArchiverTaskError::ArchivingReorg {
593            parent_block_root,
594            best_archived_block_root,
595        });
596    }
597
598    let block_root_to_archive = *header.header().root();
599
600    let block = chain_info
601        .block(&block_root_to_archive)
602        .await
603        .expect("All blocks since last archived must be present; qed");
604
605    debug!("Archiving block {block_number_to_archive} ({block_root_to_archive})");
606
607    let encoded_block = encode_block(&block);
608    debug!(
609        "Encoded block {block_number_to_archive} has size of {}",
610        ByteSize::b(encoded_block.len() as u64).display().iec(),
611    );
612
613    let block_outcome = archiver
614        .add_block(encoded_block, Vec::new())
615        .expect("Block is never empty and doesn't exceed u32; qed");
616    for archived_segment in block_outcome.archived_segments {
617        let segment_header = archived_segment.segment_header;
618
619        chain_info
620            .persist_segment_headers(vec![segment_header])
621            .await?;
622
623        send_archived_segment_notification(archived_segment_notification_sender, archived_segment)
624            .await;
625    }
626
627    Ok((block_root_to_archive, block_number_to_archive))
628}
629
630async fn send_archived_segment_notification(
631    archived_segment_notification_sender: &mut mpsc::Sender<ArchivedSegmentNotification>,
632    archived_segment: NewArchivedSegment,
633) {
634    let segment_index = archived_segment.segment_header.segment_index;
635    let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(1);
636    // Keep `archived_segment` around until all acknowledgements are received since some receivers
637    // might use weak references
638    let archived_segment = Arc::new(archived_segment);
639    let archived_segment_notification = ArchivedSegmentNotification {
640        archived_segment: Arc::clone(&archived_segment),
641        acknowledgement_sender,
642    };
643
644    if let Err(error) = archived_segment_notification_sender
645        .send(archived_segment_notification)
646        .await
647    {
648        warn!(
649            %error,
650            "Failed to send archived segment notification"
651        );
652    }
653
654    let wait_fut = async {
655        while acknowledgement_receiver.next().await.is_some() {
656            debug!(
657                "Archived segment notification acknowledged: {}",
658                segment_index
659            );
660        }
661    };
662
663    if tokio::time::timeout(ACKNOWLEDGEMENT_TIMEOUT, wait_fut)
664        .await
665        .is_err()
666    {
667        warn!(
668            "Archived segment notification was not acknowledged and reached timeout, continue \
669            regardless"
670        );
671    }
672}