Skip to main content

ab_client_archiving/
task.rs

1//! Segment archiving task.
2//!
3//! Implements archiving process in that converts blockchain history (blocks) into archived history
4//! (segments and pieces).
5//!
6//! The main entry point here is [`create_segment_archiver_task`] that will create a task, which
7//! while driven will perform the archiving itself.
8//!
9//! Archiving itself will also wait for acknowledgement by various subscribers before proceeding,
10//! which includes farmer subscription, in case of reference implementation via RPC.
11//!
12//! All segment headers of the archived segments are available to other parts of the protocol that
13//! need to know what the correct archival history of the blockchain looks like through
14//! [`ChainInfo`]. For example, it is used during node sync and farmer plotting to verify pieces of
15//! archival history received from other network participants. Future segment header might also be
16//! already known in the case of syncing from DSN.
17//!
18//! [`encode_block`] and [`decode_block`] are symmetric encoding/decoding functions turning
19//! Blocks into bytes and back.
20
21use ab_aligned_buffer::SharedAlignedBuffer;
22use ab_archiving::archiver::{Archiver, ArchiverInstantiationError, NewArchivedSegment};
23use ab_client_api::{ChainInfo, ChainInfoWrite, PersistSegmentHeadersError};
24use ab_client_consensus_common::{BlockImportingNotification, ConsensusConstants};
25use ab_core_primitives::block::body::owned::GenericOwnedBlockBody;
26use ab_core_primitives::block::header::GenericBlockHeader;
27use ab_core_primitives::block::header::owned::GenericOwnedBlockHeader;
28use ab_core_primitives::block::owned::GenericOwnedBlock;
29use ab_core_primitives::block::{BlockNumber, BlockRoot, GenericBlock};
30use ab_core_primitives::segments::{LocalSegmentIndex, RecordedHistorySegment, SegmentHeader};
31use ab_core_primitives::shard::RealShardKind;
32use ab_erasure_coding::ErasureCoding;
33use bytesize::ByteSize;
34use chacha20::ChaCha8Rng;
35use chacha20::rand_core::{Rng, SeedableRng};
36use futures::channel::mpsc;
37use futures::prelude::*;
38use std::sync::Arc;
39use std::time::Duration;
40use tracing::{debug, info, trace, warn};
41
42/// Do not wait for acknowledgements beyond this time limit
43const ACKNOWLEDGEMENT_TIMEOUT: Duration = Duration::from_mins(2);
44
45// TODO: Maybe use or remove if database handles this completely on its own
46// /// How deep (in segments) should block be in order to be finalized.
47// ///
48// /// This is required for full nodes to not prune recent history such that keep-up sync in
49// /// Substrate works even without archival nodes (initial sync will be done from DSN).
50// ///
51// /// Ideally, we'd decouple pruning from finalization, but it may require invasive changes in
52// /// Substrate and is not worth it right now.
53// /// https://github.com/paritytech/substrate/discussions/14359
54// const FINALIZATION_DEPTH_IN_SEGMENTS: SegmentIndex = SegmentIndex::from(5);
55
56/// Notification with a new archived segment that was just archived
57#[derive(Debug)]
58pub struct ArchivedSegmentNotification {
59    /// Archived segment.
60    pub archived_segment: Arc<NewArchivedSegment>,
61    /// Sender that signified the fact of receiving an archived segment by farmer.
62    ///
63    /// This must be used to send a message, or else the block import pipeline will get stuck.
64    pub acknowledgement_sender: mpsc::Sender<()>,
65}
66
67async fn find_last_archived_block<Block, CI>(
68    chain_info: &CI,
69    best_block_number_to_archive: BlockNumber,
70    best_block_root: &BlockRoot,
71) -> Option<(SegmentHeader, Block)>
72where
73    Block: GenericOwnedBlock,
74    CI: ChainInfo<Block>,
75{
76    let max_local_segment_index = chain_info.last_segment_header()?.index.as_inner();
77
78    if max_local_segment_index == LocalSegmentIndex::ZERO {
79        // Just genesis, nothing else to check
80        return None;
81    }
82
83    for segment_header in (LocalSegmentIndex::ZERO..=max_local_segment_index)
84        .rev()
85        .filter_map(|segment_index| chain_info.get_segment_header(segment_index))
86    {
87        let last_archived_block_number = segment_header.last_archived_block.number();
88
89        if last_archived_block_number > best_block_number_to_archive {
90            // Last archived block in segment header is too high for the current state of the chain
91            // (segment headers store may know about more blocks in existence than is currently
92            // imported)
93            continue;
94        }
95
96        let Some(last_archived_block_header) =
97            chain_info.ancestor_header(last_archived_block_number, best_block_root)
98        else {
99            // This block number is not in our chain yet (segment headers store may know about more
100            // blocks in existence than is currently imported)
101            continue;
102        };
103
104        let last_archived_block_root = &*last_archived_block_header.header().root();
105
106        let Ok(last_archived_block) = chain_info.block(last_archived_block_root).await else {
107            // Block might have been pruned between ancestor search and disk read
108            continue;
109        };
110
111        return Some((segment_header, last_archived_block));
112    }
113
114    None
115}
116
117/// Encode block for archiving purposes
118pub fn encode_block<Block>(block: &Block) -> Vec<u8>
119where
120    Block: GenericOwnedBlock,
121{
122    let is_beacon_chain_genesis_block = Block::Block::SHARD_KIND == RealShardKind::BeaconChain
123        && block.header().header().prefix.number == BlockNumber::ZERO;
124    let header_buffer = block.header().buffer();
125    let body_buffer = block.body().buffer();
126
127    // TODO: Extra allocation here is unfortunate, would be nice to avoid it
128    let mut encoded_block = Vec::with_capacity(if is_beacon_chain_genesis_block {
129        RecordedHistorySegment::SIZE
130    } else {
131        size_of::<u32>() * 2 + header_buffer.len() as usize + body_buffer.len() as usize
132    });
133
134    encoded_block.extend_from_slice(&header_buffer.len().to_le_bytes());
135    encoded_block.extend_from_slice(&body_buffer.len().to_le_bytes());
136    encoded_block.extend_from_slice(header_buffer);
137    encoded_block.extend_from_slice(body_buffer);
138
139    if is_beacon_chain_genesis_block {
140        let encoded_block_length = encoded_block.len();
141
142        // Encoding of the genesis block is extended with extra data such that the very first
143        // archived segment can be produced right away, bootstrapping the farming process.
144        //
145        // Note: we add it to the end of the encoded block, so during decoding it'll actually be
146        // ignored even though it is technically present in encoded form.
147        encoded_block.resize(RecordedHistorySegment::SIZE, 0);
148        let mut rng = ChaCha8Rng::from_seed(*block.header().header().result.state_root);
149        rng.fill_bytes(&mut encoded_block[encoded_block_length..]);
150    }
151
152    encoded_block
153}
154
155/// Symmetrical to [`encode_block()`], used to decode previously encoded blocks
156pub fn decode_block<Block>(mut encoded_block: &[u8]) -> Option<Block>
157where
158    Block: GenericOwnedBlock,
159{
160    let header_length = {
161        let header_length = encoded_block.split_off(..size_of::<u32>())?;
162        u32::from_le_bytes([
163            header_length[0],
164            header_length[1],
165            header_length[2],
166            header_length[3],
167        ])
168    };
169    let body_length = {
170        let body_length = encoded_block.split_off(..size_of::<u32>())?;
171        u32::from_le_bytes([
172            body_length[0],
173            body_length[1],
174            body_length[2],
175            body_length[3],
176        ])
177    };
178
179    let header_buffer = encoded_block.split_off(..header_length as usize)?;
180    let body_buffer = encoded_block.split_off(..body_length as usize)?;
181
182    let header_buffer = SharedAlignedBuffer::from_bytes(header_buffer);
183    let body_buffer = SharedAlignedBuffer::from_bytes(body_buffer);
184
185    Block::from_buffers(header_buffer, body_buffer)
186}
187
188/// Segment archiver task error
189#[derive(Debug, thiserror::Error)]
190pub enum SegmentArchiverTaskError {
191    /// Archiver instantiation error
192    #[error("Archiver instantiation error: {error}")]
193    Instantiation {
194        /// Low-level error
195        #[from]
196        error: ArchiverInstantiationError,
197    },
198    /// Failed to persist a new segment header
199    #[error("Failed to persist a new segment header: {error}")]
200    PersistSegmentHeaders {
201        /// Low-level error
202        #[from]
203        error: PersistSegmentHeadersError,
204    },
205    /// Attempt to switch to a different fork beyond archiving depth
206    #[error(
207        "Attempt to switch to a different fork beyond archiving depth: parent block root \
208        {parent_block_root}, best archived block root {best_archived_block_root}"
209    )]
210    ArchivingReorg {
211        /// Parent block root
212        parent_block_root: BlockRoot,
213        /// Best archived block root
214        best_archived_block_root: BlockRoot,
215    },
216    /// There was a gap in blockchain history, and the last contiguous series of blocks doesn't
217    /// start with the archived segment
218    #[error(
219        "There was a gap in blockchain history, and the last contiguous series of blocks doesn't \
220        start with the archived segment (best archived block number {best_archived_block_number}, \
221        block number to archive {block_number_to_archive}), block about to be imported \
222        {importing_block_number})"
223    )]
224    BlockGap {
225        /// Best archived block number
226        best_archived_block_number: BlockNumber,
227        /// Block number to archive
228        block_number_to_archive: BlockNumber,
229        /// Importing block number
230        importing_block_number: BlockNumber,
231    },
232}
233
234struct InitializedArchiver {
235    archiver: Archiver,
236    best_archived_block: (BlockRoot, BlockNumber),
237}
238
239async fn initialize_archiver<Block, CI>(
240    chain_info: &CI,
241    block_confirmation_depth: BlockNumber,
242    erasure_coding: ErasureCoding,
243) -> Result<InitializedArchiver, SegmentArchiverTaskError>
244where
245    Block: GenericOwnedBlock,
246    CI: ChainInfoWrite<Block>,
247{
248    let best_block_header = chain_info.best_header();
249    let best_block_root = *best_block_header.header().root();
250    let best_block_number: BlockNumber = best_block_header.header().prefix.number;
251
252    let mut best_block_to_archive = best_block_number.saturating_sub(block_confirmation_depth);
253
254    if (best_block_to_archive..best_block_number).any(|block_number| {
255        chain_info
256            .ancestor_header(block_number, &best_block_root)
257            .is_none()
258    }) {
259        // If there are blocks missing headers between best block to archive and best block of the
260        // blockchain it means newer block was inserted in some special way and as such is by
261        // definition valid, so we can simply assume that is our best block to archive instead
262        best_block_to_archive = best_block_number;
263    }
264
265    let maybe_last_archived_block =
266        find_last_archived_block(chain_info, best_block_to_archive, &best_block_root).await;
267
268    let have_last_segment_header = maybe_last_archived_block.is_some();
269    let mut best_archived_block = None::<(BlockRoot, BlockNumber)>;
270
271    let mut archiver =
272        if let Some((last_segment_header, last_archived_block)) = maybe_last_archived_block {
273            // Continuing from existing initial state
274            let last_archived_block_number = last_segment_header.last_archived_block.number;
275            info!(
276                %last_archived_block_number,
277                "Resuming archiver from last archived block",
278            );
279
280            let last_archived_block_header = last_archived_block.header().header();
281            // Set initial value, this is needed in case only genesis block was archived and there
282            // is nothing else available
283            best_archived_block.replace((
284                *last_archived_block_header.root(),
285                last_archived_block_header.prefix.number,
286            ));
287
288            let last_archived_block_encoded = encode_block(&last_archived_block);
289
290            Archiver::with_initial_state(
291                best_block_header.header().prefix.shard_index,
292                erasure_coding,
293                last_segment_header,
294                &last_archived_block_encoded,
295                Vec::new(),
296            )?
297        } else {
298            info!("Starting archiving from genesis");
299
300            Archiver::new(
301                best_block_header.header().prefix.shard_index,
302                erasure_coding,
303            )
304        };
305
306    // Process blocks since last fully archived block up to the current head minus K
307    {
308        let blocks_to_archive_from = archiver
309            .last_archived_block_number()
310            .map(|n| n + BlockNumber::ONE)
311            .unwrap_or_default();
312        let blocks_to_archive_to = best_block_number
313            .checked_sub(block_confirmation_depth)
314            .filter(|&blocks_to_archive_to| blocks_to_archive_to >= blocks_to_archive_from)
315            .or({
316                if have_last_segment_header {
317                    None
318                } else {
319                    // If not continuation, archive genesis block
320                    Some(BlockNumber::ZERO)
321                }
322            });
323
324        if let Some(blocks_to_archive_to) = blocks_to_archive_to {
325            info!(
326                "Archiving already produced blocks {}..={}",
327                blocks_to_archive_from, blocks_to_archive_to,
328            );
329
330            for block_number_to_archive in blocks_to_archive_from..=blocks_to_archive_to {
331                let header = chain_info
332                    .ancestor_header(block_number_to_archive, &best_block_root)
333                    .expect("All blocks since last archived must be present; qed");
334
335                let block = chain_info
336                    .block(&header.header().root())
337                    .await
338                    .expect("All blocks since last archived must be present; qed");
339
340                let encoded_block = encode_block(&block);
341
342                debug!(
343                    "Encoded block {} has size of {}",
344                    block_number_to_archive,
345                    ByteSize::b(encoded_block.len() as u64).display().iec(),
346                );
347
348                let block_outcome = archiver
349                    .add_block(encoded_block, Vec::new())
350                    .expect("Block is never empty and doesn't exceed u32; qed");
351                let new_segment_headers: Vec<SegmentHeader> = block_outcome
352                    .archived_segments
353                    .iter()
354                    .map(|archived_segment| archived_segment.segment_header)
355                    .collect();
356
357                if !new_segment_headers.is_empty() {
358                    chain_info
359                        .persist_segment_headers(new_segment_headers)
360                        .await?;
361                }
362
363                if block_number_to_archive == blocks_to_archive_to {
364                    best_archived_block.replace((*header.header().root(), block_number_to_archive));
365                }
366            }
367        }
368    }
369
370    Ok(InitializedArchiver {
371        archiver,
372        best_archived_block: best_archived_block
373            .expect("Must always set if there is no logical error; qed"),
374    })
375}
376
377// TODO: Public API for re-archiving of blocks with ability to produce object mappings
378/// Create a segment archiver task.
379///
380/// Archiver task will listen for importing blocks and archive blocks at `K` depth, producing pieces
381/// and segment headers. It produces local segments initially, which after confirmation by the
382/// beacon chain will be updated with the necessary proof and given a global segment index.
383///
384/// NOTE: Archiver is doing blocking operations and must run in a dedicated task.
385///
386/// Archiver is only able to move forward and doesn't support reorgs. Upon restart, it will check
387/// segments in [`ChainInfo`] and chain history to reconstruct the "current" state it was in before
388/// the last shutdown and continue incrementally archiving blockchain history from there.
389///
390/// Archiving is triggered by block importing notification (`block_importing_notification_receiver`)
391/// and tries to archive the block at [`ConsensusConstants::block_confirmation_depth`] depth from
392/// the block being imported. Block importing will then wait for archiver to acknowledge processing,
393/// which is necessary for ensuring that when the next block is imported, the newly archived segment
394/// is already available deterministically.
395///
396/// Once a new segment is archived, a notification (`archived_segment_notification_sender`) will be
397/// sent and archiver will be paused until all receivers have provided an acknowledgement for it (or
398/// a very generous timeout has passed).
399pub async fn create_segment_archiver_task<Block, CI>(
400    chain_info: CI,
401    mut block_importing_notification_receiver: mpsc::Receiver<BlockImportingNotification>,
402    mut archived_segment_notification_sender: mpsc::Sender<ArchivedSegmentNotification>,
403    consensus_constants: ConsensusConstants,
404    erasure_coding: ErasureCoding,
405) -> Result<
406    impl Future<Output = Result<(), SegmentArchiverTaskError>> + Send + 'static,
407    SegmentArchiverTaskError,
408>
409where
410    Block: GenericOwnedBlock,
411    CI: ChainInfoWrite<Block> + 'static,
412{
413    let maybe_archiver = if chain_info.last_segment_header().is_none() {
414        let initialize_archiver_fut = initialize_archiver(
415            &chain_info,
416            consensus_constants.block_confirmation_depth,
417            erasure_coding.clone(),
418        );
419        Some(initialize_archiver_fut.await?)
420    } else {
421        None
422    };
423
424    Ok(async move {
425        let archiver = match maybe_archiver {
426            Some(archiver) => archiver,
427            None => {
428                let initialize_archiver_fut = initialize_archiver(
429                    &chain_info,
430                    consensus_constants.block_confirmation_depth,
431                    erasure_coding.clone(),
432                );
433                initialize_archiver_fut.await?
434            }
435        };
436
437        let InitializedArchiver {
438            mut archiver,
439            best_archived_block,
440        } = archiver;
441        let (mut best_archived_block_root, mut best_archived_block_number) = best_archived_block;
442
443        while let Some(block_importing_notification) =
444            block_importing_notification_receiver.next().await
445        {
446            let importing_block_number = block_importing_notification.block_number;
447            let block_number_to_archive = match importing_block_number
448                .checked_sub(consensus_constants.block_confirmation_depth)
449            {
450                Some(block_number_to_archive) => block_number_to_archive,
451                None => {
452                    // Too early to archive blocks
453                    continue;
454                }
455            };
456
457            let last_archived_block_number = chain_info
458                .last_segment_header()
459                .expect("Exists after archiver initialization; qed")
460                .last_archived_block
461                .number();
462            trace!(
463                %importing_block_number,
464                %block_number_to_archive,
465                %best_archived_block_number,
466                %last_archived_block_number,
467                "Checking if block needs to be skipped"
468            );
469
470            // Skip archived blocks
471            let skip_last_archived_blocks = last_archived_block_number > block_number_to_archive;
472            if best_archived_block_number >= block_number_to_archive || skip_last_archived_blocks {
473                // This block was already archived, skip
474                debug!(
475                    %importing_block_number,
476                    %block_number_to_archive,
477                    %best_archived_block_number,
478                    %last_archived_block_number,
479                    "Skipping already archived block",
480                );
481                continue;
482            }
483
484            let best_block_root = chain_info.best_root();
485
486            // In case there was a block gap, re-initialize archiver and continue with the current
487            // block number (rather than block number at some depth) to allow for special sync
488            // modes where pre-verified blocks are inserted at some point in the future comparing to
489            // previously existing blocks
490            if best_archived_block_number + BlockNumber::ONE != block_number_to_archive {
491                let initialize_archiver_fut = initialize_archiver(
492                    &chain_info,
493                    consensus_constants.block_confirmation_depth,
494                    erasure_coding.clone(),
495                );
496                InitializedArchiver {
497                    archiver,
498                    best_archived_block: (best_archived_block_root, best_archived_block_number),
499                } = initialize_archiver_fut.await?;
500
501                if best_archived_block_number + BlockNumber::ONE == block_number_to_archive {
502                    // As expected, can archive this block
503                } else if best_archived_block_number >= block_number_to_archive {
504                    // Special sync mode where verified blocks were inserted into the blockchain
505                    // directly, archiving of this block will naturally happen later
506                    continue;
507                } else if chain_info
508                    .ancestor_header(importing_block_number - BlockNumber::ONE, &best_block_root)
509                    .is_none()
510                {
511                    // We may have imported some block using special sync mode, so the block about
512                    // to be imported is the first one after the gap at which archiver is supposed
513                    // to be initialized, but we are only about to import it, so wait for the next
514                    // block for now
515                    continue;
516                } else {
517                    return Err(SegmentArchiverTaskError::BlockGap {
518                        best_archived_block_number,
519                        block_number_to_archive,
520                        importing_block_number,
521                    });
522                }
523            }
524
525            (best_archived_block_root, best_archived_block_number) = archive_block(
526                &mut archiver,
527                &chain_info,
528                &mut archived_segment_notification_sender,
529                best_archived_block_root,
530                block_number_to_archive,
531                &best_block_root,
532            )
533            .await?;
534        }
535
536        Ok(())
537    })
538}
539
540/// Tries to archive `block_number` and returns new (or old if not changed) best archived block
541async fn archive_block<Block, CI>(
542    archiver: &mut Archiver,
543    chain_info: &CI,
544    archived_segment_notification_sender: &mut mpsc::Sender<ArchivedSegmentNotification>,
545    best_archived_block_root: BlockRoot,
546    block_number_to_archive: BlockNumber,
547    best_block_root: &BlockRoot,
548) -> Result<(BlockRoot, BlockNumber), SegmentArchiverTaskError>
549where
550    Block: GenericOwnedBlock,
551    CI: ChainInfoWrite<Block>,
552{
553    let header = chain_info
554        .ancestor_header(block_number_to_archive, best_block_root)
555        .expect("All blocks since last archived must be present; qed");
556
557    let parent_block_root = header.header().prefix.parent_root;
558    if parent_block_root != best_archived_block_root {
559        return Err(SegmentArchiverTaskError::ArchivingReorg {
560            parent_block_root,
561            best_archived_block_root,
562        });
563    }
564
565    let block_root_to_archive = *header.header().root();
566
567    let block = chain_info
568        .block(&block_root_to_archive)
569        .await
570        .expect("All blocks since last archived must be present; qed");
571
572    debug!("Archiving block {block_number_to_archive} ({block_root_to_archive})");
573
574    let encoded_block = encode_block(&block);
575    debug!(
576        "Encoded block {block_number_to_archive} has size of {}",
577        ByteSize::b(encoded_block.len() as u64).display().iec(),
578    );
579
580    let block_outcome = archiver
581        .add_block(encoded_block, Vec::new())
582        .expect("Block is never empty and doesn't exceed u32; qed");
583    for archived_segment in block_outcome.archived_segments {
584        let segment_header = archived_segment.segment_header;
585
586        chain_info
587            .persist_segment_headers(vec![segment_header])
588            .await?;
589
590        send_archived_segment_notification(archived_segment_notification_sender, archived_segment)
591            .await;
592    }
593
594    Ok((block_root_to_archive, block_number_to_archive))
595}
596
597async fn send_archived_segment_notification(
598    archived_segment_notification_sender: &mut mpsc::Sender<ArchivedSegmentNotification>,
599    archived_segment: NewArchivedSegment,
600) {
601    let segment_index = archived_segment.segment_header.index;
602    let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(1);
603    // Keep `archived_segment` around until all acknowledgements are received since some receivers
604    // might use weak references
605    let archived_segment = Arc::new(archived_segment);
606    let archived_segment_notification = ArchivedSegmentNotification {
607        archived_segment: Arc::clone(&archived_segment),
608        acknowledgement_sender,
609    };
610
611    if let Err(error) = archived_segment_notification_sender
612        .send(archived_segment_notification)
613        .await
614    {
615        warn!(
616            %error,
617            "Failed to send archived segment notification"
618        );
619    }
620
621    let wait_fut = async {
622        while acknowledgement_receiver.next().await.is_some() {
623            debug!(
624                "Archived segment notification acknowledged: {}",
625                segment_index
626            );
627        }
628    };
629
630    if tokio::time::timeout(ACKNOWLEDGEMENT_TIMEOUT, wait_fut)
631        .await
632        .is_err()
633    {
634        warn!(
635            "Archived segment notification was not acknowledged and reached timeout, continue \
636            regardless"
637        );
638    }
639}