ab_client_archiving/
archiving.rs

1//! Consensus archiver responsible for archival of blockchain history, it is driven by the block
2//! import pipeline.
3//!
4//! Implements archiving process in that converts blockchain history (blocks) into archived history
5//! (segments and pieces).
6//!
7//! The main entry point here is [`create_archiver_task`] that will create a task, which while
8//! driven will perform the archiving itself.
9//!
10//! Archiving is triggered by block importing notification and tries to archive the block at
11//! [`ConsensusConstants::confirmation_depth_k`](ab_client_consensus_common::ConsensusConstants::confirmation_depth_k)
12//! depth from the block being imported. Block import will then wait for archiver to acknowledge
13//! processing, which is necessary for ensuring that when the next block is imported, it will
14//! contain segment header of newly archived block (must happen exactly in the next block).
15//!
16//! Archiving itself will also wait for acknowledgement by various subscribers before proceeding,
17//! which includes farmer subscription, in case of reference implementation via RPC.
18//!
19//! [`SegmentHeadersStore`] is maintained as a data structure containing all known (including future
20//! in case of syncing) segment headers. This data structure contents is then made available to
21//! other parts of the protocol that need to know what correct archival history of the blockchain
22//! looks like. For example, it is used during node sync and farmer plotting in order to verify
23//! pieces of archival history received from other network participants.
24//!
25//! [`recreate_genesis_segment`] is a bit of a hack and is useful for deriving of the genesis
26//! segment that is special case since we don't have enough data in the blockchain history itself
27//! during genesis in order to do the archiving.
28//!
29//! [`encode_block`] and [`decode_block`] are symmetric encoding/decoding functions turning
30//! Blocks into bytes and back.
31
32use crate::segment_headers_store::{SegmentHeaderStoreError, SegmentHeadersStore};
33use ab_aligned_buffer::SharedAlignedBuffer;
34use ab_archiving::archiver::{Archiver, ArchiverInstantiationError, NewArchivedSegment};
35use ab_archiving::objects::{BlockObject, GlobalObject};
36use ab_client_api::ChainInfo;
37use ab_client_consensus_common::{BlockImportingNotification, ConsensusConstants};
38use ab_core_primitives::block::body::owned::GenericOwnedBlockBody;
39use ab_core_primitives::block::header::GenericBlockHeader;
40use ab_core_primitives::block::header::owned::GenericOwnedBlockHeader;
41use ab_core_primitives::block::owned::{GenericOwnedBlock, OwnedBeaconChainBlock};
42use ab_core_primitives::block::{BlockNumber, BlockRoot, GenericBlock};
43use ab_core_primitives::segments::{RecordedHistorySegment, SegmentHeader, SegmentIndex};
44use ab_core_primitives::shard::RealShardKind;
45use ab_erasure_coding::ErasureCoding;
46use bytesize::ByteSize;
47use chacha20::ChaCha8Rng;
48use chacha20::rand_core::{RngCore, SeedableRng};
49use futures::channel::mpsc;
50use futures::prelude::*;
51use std::num::NonZeroU64;
52use std::slice;
53use std::sync::Arc;
54use std::time::Duration;
55use tracing::{debug, info, trace, warn};
56
57/// Do not wait for acknowledgements beyond this time limit
58const ACKNOWLEDGEMENT_TIMEOUT: Duration = Duration::from_mins(2);
59
60// TODO: Maybe use or remove if database handles this completely on its own
61// /// How deep (in segments) should block be in order to be finalized.
62// ///
63// /// This is required for full nodes to not prune recent history such that keep-up sync in Substrate
64// /// works even without archival nodes (initial sync will be done from DSN).
65// ///
66// /// Ideally, we'd decouple pruning from finalization, but it may require invasive changes in
67// /// Substrate and is not worth it right now.
68// /// https://github.com/paritytech/substrate/discussions/14359
69// const FINALIZATION_DEPTH_IN_SEGMENTS: SegmentIndex = SegmentIndex::new(5);
70
71/// Notification with a new archived segment that was just archived
72#[derive(Debug)]
73pub struct ArchivedSegmentNotification {
74    /// Archived segment.
75    pub archived_segment: Arc<NewArchivedSegment>,
76    /// Sender that signified the fact of receiving archived segment by farmer.
77    ///
78    /// This must be used to send a message or else block import pipeline will get stuck.
79    pub acknowledgement_sender: mpsc::Sender<()>,
80}
81
82/// Notification with incrementally generated object mappings for a block (and any previous block
83/// continuation)
84#[derive(Debug, Clone)]
85pub struct ObjectMappingNotification {
86    /// Incremental object mappings for a block (and any previous block continuation).
87    ///
88    /// The archived data won't be available in pieces until the entire segment is full and archived.
89    pub object_mapping: Vec<GlobalObject>,
90    /// The block that these mappings are from.
91    pub block_number: BlockNumber,
92    // TODO: add an acknowledgement_sender for backpressure if needed
93}
94
95/// Whether to create object mappings.
96#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
97pub enum CreateObjectMappings {
98    /// Start creating object mappings from this block number.
99    ///
100    /// This can be lower than the latest archived block, but must be greater than genesis.
101    ///
102    /// The genesis block doesn't have mappings, so starting mappings at genesis is pointless.
103    /// The archiver will fail if it can't get the data for this block, but snap sync doesn't store
104    /// the genesis data on disk.  So avoiding genesis also avoids this error.
105    /// <https://github.com/paritytech/polkadot-sdk/issues/5366>
106    Block(NonZeroU64),
107    /// Create object mappings as archiving is happening
108    Yes,
109    /// Don't create object mappings
110    #[default]
111    No,
112}
113
114impl CreateObjectMappings {
115    /// The fixed block number to start creating object mappings from.
116    /// If there is no fixed block number, or mappings are disabled, returns None.
117    fn block(&self) -> Option<BlockNumber> {
118        match self {
119            CreateObjectMappings::Block(block) => Some(BlockNumber::new(block.get())),
120            CreateObjectMappings::Yes => None,
121            CreateObjectMappings::No => None,
122        }
123    }
124
125    /// Returns true if object mappings will be created from a past or future block.
126    pub fn is_enabled(&self) -> bool {
127        !matches!(self, CreateObjectMappings::No)
128    }
129
130    /// Does the supplied block number need object mappings?
131    pub fn is_enabled_for_block(&self, block: BlockNumber) -> bool {
132        if !self.is_enabled() {
133            return false;
134        }
135
136        if let Some(target_block) = self.block() {
137            return block >= target_block;
138        }
139
140        // We're continuing where we left off, so all blocks get mappings.
141        true
142    }
143}
144
145async fn find_last_archived_block<Block, CI, COM>(
146    chain_info: &CI,
147    segment_headers_store: &SegmentHeadersStore,
148    best_block_number_to_archive: BlockNumber,
149    best_block_root: &BlockRoot,
150    create_object_mappings: Option<COM>,
151) -> Option<(SegmentHeader, Block, Vec<BlockObject>)>
152where
153    Block: GenericOwnedBlock,
154    CI: ChainInfo<Block>,
155    COM: Fn(&Block) -> Vec<BlockObject>,
156{
157    let max_segment_index = segment_headers_store.max_segment_index()?;
158
159    if max_segment_index == SegmentIndex::ZERO {
160        // Just genesis, nothing else to check
161        return None;
162    }
163
164    for segment_header in (SegmentIndex::ZERO..=max_segment_index)
165        .rev()
166        .filter_map(|segment_index| segment_headers_store.get_segment_header(segment_index))
167    {
168        let last_archived_block_number = segment_header.last_archived_block.number();
169
170        if last_archived_block_number > best_block_number_to_archive {
171            // Last archived block in segment header is too high for the current state of the chain
172            // (segment headers store may know about more blocks in existence than is currently
173            // imported)
174            continue;
175        }
176
177        let Some(last_archived_block_header) =
178            chain_info.ancestor_header(last_archived_block_number, best_block_root)
179        else {
180            // This block number is not in our chain yet (segment headers store may know about more
181            // blocks in existence than is currently imported)
182            continue;
183        };
184
185        let last_archived_block_root = &*last_archived_block_header.header().root();
186
187        let Ok(last_archived_block) = chain_info.block(last_archived_block_root).await else {
188            // Block might have been pruned between ancestor search and disk read
189            continue;
190        };
191
192        // If we're starting mapping creation at this block, return its mappings
193        let block_object_mappings = if let Some(create_object_mappings) = create_object_mappings {
194            create_object_mappings(&last_archived_block)
195        } else {
196            Vec::new()
197        };
198
199        return Some((segment_header, last_archived_block, block_object_mappings));
200    }
201
202    None
203}
204
205/// Derive the genesis segment on demand, returns `Ok(None)` in case the genesis block was already
206/// pruned
207pub fn recreate_genesis_segment(
208    owned_genesis_block: &OwnedBeaconChainBlock,
209    erasure_coding: ErasureCoding,
210) -> NewArchivedSegment {
211    let encoded_block = encode_block(owned_genesis_block);
212
213    // There are no mappings in the genesis block, so they can be ignored
214    let block_outcome = Archiver::new(erasure_coding)
215        .add_block(encoded_block, Vec::new())
216        .expect("Block is never empty and doesn't exceed u32; qed");
217    block_outcome
218        .archived_segments
219        .into_iter()
220        .next()
221        .expect("Genesis block always results in exactly one archived segment; qed")
222}
223
224/// Encode block for archiving purposes
225pub fn encode_block<Block>(block: &Block) -> Vec<u8>
226where
227    Block: GenericOwnedBlock,
228{
229    let is_genesis_block = Block::Block::SHARD_KIND == RealShardKind::BeaconChain
230        && block.header().header().prefix.number == BlockNumber::ZERO;
231    let header_buffer = block.header().buffer();
232    let body_buffer = block.body().buffer();
233
234    // TODO: Extra allocation here is unfortunate, would be nice to avoid it
235    let mut encoded_block = Vec::with_capacity(if is_genesis_block {
236        RecordedHistorySegment::SIZE
237    } else {
238        size_of::<u32>() * 2 + header_buffer.len() as usize + body_buffer.len() as usize
239    });
240
241    encoded_block.extend_from_slice(&header_buffer.len().to_le_bytes());
242    encoded_block.extend_from_slice(&body_buffer.len().to_le_bytes());
243    encoded_block.extend_from_slice(header_buffer);
244    encoded_block.extend_from_slice(body_buffer);
245
246    if is_genesis_block {
247        let encoded_block_length = encoded_block.len();
248
249        // Encoding of the genesis block is extended with extra data such that the very first
250        // archived segment can be produced right away, bootstrapping the farming process.
251        //
252        // Note: we add it to the end of the encoded block, so during decoding it'll actually be
253        // ignored even though it is technically present in encoded form.
254        encoded_block.resize(RecordedHistorySegment::SIZE, 0);
255        let mut rng = ChaCha8Rng::from_seed(*block.header().header().result.state_root);
256        rng.fill_bytes(&mut encoded_block[encoded_block_length..]);
257    }
258
259    encoded_block
260}
261
262/// Symmetrical to [`encode_block()`], used to decode previously encoded blocks
263pub fn decode_block<Block>(mut encoded_block: &[u8]) -> Option<Block>
264where
265    Block: GenericOwnedBlock,
266{
267    let header_length = {
268        let header_length = encoded_block.split_off(..size_of::<u32>())?;
269        u32::from_le_bytes([
270            header_length[0],
271            header_length[1],
272            header_length[2],
273            header_length[3],
274        ])
275    };
276    let body_length = {
277        let body_length = encoded_block.split_off(..size_of::<u32>())?;
278        u32::from_le_bytes([
279            body_length[0],
280            body_length[1],
281            body_length[2],
282            body_length[3],
283        ])
284    };
285
286    let header_buffer = encoded_block.split_off(..header_length as usize)?;
287    let body_buffer = encoded_block.split_off(..body_length as usize)?;
288
289    let header_buffer = SharedAlignedBuffer::from_bytes(header_buffer);
290    let body_buffer = SharedAlignedBuffer::from_bytes(body_buffer);
291
292    Block::from_buffers(header_buffer, body_buffer)
293}
294
295/// Archiver task error
296#[derive(Debug, thiserror::Error)]
297pub enum ArchiverTaskError {
298    /// Archiver instantiation error
299    #[error("Archiver instantiation error: {error}")]
300    Instantiation {
301        /// Low-level error
302        #[from]
303        error: ArchiverInstantiationError,
304    },
305    /// Failed to add a new segment header to the segment headers store
306    #[error("Failed to add a new segment header to the segment headers store: {error}")]
307    SegmentHeaderStore {
308        /// Low-level error
309        #[from]
310        error: SegmentHeaderStoreError,
311    },
312    /// Attempt to switch to a different fork beyond archiving depth
313    #[error(
314        "Attempt to switch to a different fork beyond archiving depth: parent block root \
315        {parent_block_root}, best archived block root {best_archived_block_root}"
316    )]
317    ArchivingReorg {
318        /// Parent block root
319        parent_block_root: BlockRoot,
320        /// Best archived block root
321        best_archived_block_root: BlockRoot,
322    },
323    /// There was a gap in blockchain history, and the last contiguous series of blocks doesn't
324    /// start with the archived segment
325    #[error(
326        "There was a gap in blockchain history, and the last contiguous series of blocks doesn't \
327        start with the archived segment (best archived block number {best_archived_block_number}, \
328        block number to archive {block_number_to_archive}), block about to be imported \
329        {importing_block_number})"
330    )]
331    BlockGap {
332        /// Best archived block number
333        best_archived_block_number: BlockNumber,
334        /// Block number to archive
335        block_number_to_archive: BlockNumber,
336        /// Importing block number
337        importing_block_number: BlockNumber,
338    },
339}
340
341struct InitializedArchiver {
342    archiver: Archiver,
343    best_archived_block: (BlockRoot, BlockNumber),
344}
345
346async fn initialize_archiver<Block, CI>(
347    segment_headers_store: &SegmentHeadersStore,
348    chain_info: &CI,
349    confirmation_depth_k: BlockNumber,
350    create_object_mappings: CreateObjectMappings,
351    erasure_coding: ErasureCoding,
352) -> Result<InitializedArchiver, ArchiverTaskError>
353where
354    Block: GenericOwnedBlock,
355    CI: ChainInfo<Block>,
356{
357    let best_block_header = chain_info.best_header();
358    let best_block_root = *best_block_header.header().root();
359    let best_block_number: BlockNumber = best_block_header.header().prefix.number;
360
361    let mut best_block_to_archive = best_block_number.saturating_sub(confirmation_depth_k);
362    // Choose a lower block number if we want to get mappings from that specific block.
363    // If we are continuing from where we left off, we don't need to change the block number to archive.
364    // If there is no path to this block from the tip due to snap sync, we'll start archiving from
365    // an earlier segment, then start mapping again once archiving reaches this block.
366    if let Some(block_number) = create_object_mappings.block() {
367        // There aren't any mappings in the genesis block, so starting there is pointless.
368        // (And causes errors on restart, because genesis block data is never stored during snap sync.)
369        best_block_to_archive = best_block_to_archive.min(block_number);
370    }
371
372    if (best_block_to_archive..best_block_number).any(|block_number| {
373        chain_info
374            .ancestor_header(block_number, &best_block_root)
375            .is_none()
376    }) {
377        // If there are blocks missing headers between best block to archive and best block of the
378        // blockchain it means newer block was inserted in some special way and as such is by
379        // definition valid, so we can simply assume that is our best block to archive instead
380        best_block_to_archive = best_block_number;
381    }
382
383    // TODO: Uncomment once API for object mapping is established
384    // // If the user chooses an object mapping start block we don't have data or state for, we can't
385    // // create mappings for it, so the node must exit with an error. We ignore genesis here, because
386    // // it doesn't have mappings.
387    // if create_object_mappings.is_enabled() && best_block_to_archive >= BlockNumber::ONE {
388    //     let Some(best_block_to_archive_root) = client.root(NumberFor::<Block>::saturated_from(
389    //         best_block_to_archive.as_u64(),
390    //     ))?
391    //     else {
392    //         let error = format!(
393    //             "Missing root for mapping block {best_block_to_archive}, \
394    //             try a higher block number, or wipe your node and restart with `--sync full`"
395    //         );
396    //         return Err(sp_blockchain::Error::Application(error.into()));
397    //     };
398    //
399    //     let Some(_best_block_data) = client.block(best_block_to_archive_root)? else {
400    //         let error = format!(
401    //             "Missing data for mapping block {best_block_to_archive} \
402    //             root {best_block_to_archive_root}, \
403    //             try a higher block number, or wipe your node and restart with `--sync full`"
404    //         );
405    //         return Err(sp_blockchain::Error::Application(error.into()));
406    //     };
407    //
408    //     // Similarly, state can be pruned, even if the data is present
409    //     // TODO: Injection of external logic
410    //     // client
411    //     //     .runtime_api()
412    //     //     .extract_block_object_mapping(
413    //     //         *best_block_data.block.header().parent_root(),
414    //     //         best_block_data.block.clone(),
415    //     //     )
416    //     //     .map_err(|error| {
417    //     //         sp_blockchain::Error::Application(
418    //     //             format!(
419    //     //                 "Missing state for mapping block {best_block_to_archive} \
420    //     //                 root {best_block_to_archive_root}: {error}, \
421    //     //                 try a higher block number, or wipe your node and restart with `--sync full`"
422    //     //             )
423    //     //             .into(),
424    //     //         )
425    //     //     })?;
426    // }
427
428    let maybe_last_archived_block = find_last_archived_block(
429        chain_info,
430        segment_headers_store,
431        best_block_to_archive,
432        &best_block_root,
433        create_object_mappings
434            .is_enabled()
435            .then_some(|_block: &Block| {
436                // TODO: Injection of external logic
437                // let parent_root = *block.header().parent_root();
438                // client
439                //     .runtime_api()
440                //     .extract_block_object_mapping(parent_root, block)
441                //     .unwrap_or_default()
442                Vec::new()
443            }),
444    )
445    .await;
446
447    let have_last_segment_header = maybe_last_archived_block.is_some();
448    let mut best_archived_block = None::<(BlockRoot, BlockNumber)>;
449
450    let mut archiver =
451        if let Some((last_segment_header, last_archived_block, block_object_mappings)) =
452            maybe_last_archived_block
453        {
454            // Continuing from existing initial state
455            let last_archived_block_number = last_segment_header.last_archived_block.number;
456            info!(
457                %last_archived_block_number,
458                "Resuming archiver from last archived block",
459            );
460
461            let last_archived_block_header = last_archived_block.header().header();
462            // Set initial value, this is needed in case only genesis block was archived and there
463            // is nothing else available
464            best_archived_block.replace((
465                *last_archived_block_header.root(),
466                last_archived_block_header.prefix.number,
467            ));
468
469            let last_archived_block_encoded = encode_block(&last_archived_block);
470
471            Archiver::with_initial_state(
472                erasure_coding,
473                last_segment_header,
474                &last_archived_block_encoded,
475                block_object_mappings,
476            )?
477        } else {
478            info!("Starting archiving from genesis");
479
480            Archiver::new(erasure_coding)
481        };
482
483    // Process blocks since last fully archived block up to the current head minus K
484    {
485        let blocks_to_archive_from = archiver
486            .last_archived_block_number()
487            .map(|n| n + BlockNumber::ONE)
488            .unwrap_or_default();
489        let blocks_to_archive_to = best_block_number
490            .checked_sub(confirmation_depth_k)
491            .filter(|&blocks_to_archive_to| blocks_to_archive_to >= blocks_to_archive_from)
492            .or({
493                if have_last_segment_header {
494                    None
495                } else {
496                    // If not continuation, archive genesis block
497                    Some(BlockNumber::ZERO)
498                }
499            });
500
501        if let Some(blocks_to_archive_to) = blocks_to_archive_to {
502            info!(
503                "Archiving already produced blocks {}..={}",
504                blocks_to_archive_from, blocks_to_archive_to,
505            );
506
507            for block_number_to_archive in blocks_to_archive_from..=blocks_to_archive_to {
508                let header = chain_info
509                    .ancestor_header(block_number_to_archive, &best_block_root)
510                    .expect("All blocks since last archived must be present; qed");
511
512                let block = chain_info
513                    .block(&header.header().root())
514                    .await
515                    .expect("All blocks since last archived must be present; qed");
516
517                let block_object_mappings =
518                    if create_object_mappings.is_enabled_for_block(block_number_to_archive) {
519                        // TODO: Injection of external logic
520                        // runtime_api
521                        //     .extract_block_object_mapping(
522                        //         *block.block.header().parent_root(),
523                        //         block.block.clone(),
524                        //     )
525                        //     .unwrap_or_default()
526                        Vec::new()
527                    } else {
528                        Vec::new()
529                    };
530
531                let encoded_block = encode_block(&block);
532
533                debug!(
534                    "Encoded block {} has size of {}",
535                    block_number_to_archive,
536                    ByteSize::b(encoded_block.len() as u64).display().iec(),
537                );
538
539                let block_outcome = archiver
540                    .add_block(encoded_block, block_object_mappings)
541                    .expect("Block is never empty and doesn't exceed u32; qed");
542                // TODO: Allow to capture these from the outside
543                // send_object_mapping_notification(
544                //     &subspace_link.object_mapping_notification_sender,
545                //     block_outcome.global_objects,
546                //     block_number_to_archive,
547                // );
548                let new_segment_headers: Vec<SegmentHeader> = block_outcome
549                    .archived_segments
550                    .iter()
551                    .map(|archived_segment| archived_segment.segment_header)
552                    .collect();
553
554                if !new_segment_headers.is_empty() {
555                    segment_headers_store.add_segment_headers(&new_segment_headers)?;
556                }
557
558                if block_number_to_archive == blocks_to_archive_to {
559                    best_archived_block.replace((*header.header().root(), block_number_to_archive));
560                }
561            }
562        }
563    }
564
565    Ok(InitializedArchiver {
566        archiver,
567        best_archived_block: best_archived_block
568            .expect("Must always set if there is no logical error; qed"),
569    })
570}
571
572/// Create an archiver task.
573///
574/// Archiver task will listen for importing blocks and archive blocks at `K` depth, producing pieces
575/// and segment headers (segment headers are then added back to the blockchain in the next block).
576///
577/// NOTE: Archiver is doing blocking operations and must run in a dedicated task.
578///
579/// Archiver is only able to move forward and doesn't support reorgs. Upon restart, it will check
580/// [`SegmentHeadersStore`] and chain history to reconstruct the "current" state it was in before
581/// the last shutdown and continue incrementally archiving blockchain history from there.
582///
583/// Archiving is triggered by block importing notification (`block_importing_notification_receiver`)
584/// and tries to archive the block at [`ConsensusConstants::confirmation_depth_k`] depth from the
585/// block being imported. Block importing will then wait for archiver to acknowledge processing,
586/// which is necessary for ensuring that when the next block is imported, the body will contain a
587/// segment header of the newly archived segment.
588///
589/// `create_object_mappings` controls when object mappings are created for archived blocks. When
590/// these mappings are created.
591///
592/// Once a new segment is archived, a notification (`archived_segment_notification_sender`) will be
593/// sent and archiver will be paused until all receivers have provided an acknowledgement for it.
594pub async fn create_archiver_task<Block, CI>(
595    segment_headers_store: SegmentHeadersStore,
596    chain_info: CI,
597    mut block_importing_notification_receiver: mpsc::Receiver<BlockImportingNotification>,
598    mut archived_segment_notification_sender: mpsc::Sender<ArchivedSegmentNotification>,
599    consensus_constants: ConsensusConstants,
600    create_object_mappings: CreateObjectMappings,
601    erasure_coding: ErasureCoding,
602) -> Result<impl Future<Output = Result<(), ArchiverTaskError>> + Send + 'static, ArchiverTaskError>
603where
604    Block: GenericOwnedBlock,
605    CI: ChainInfo<Block> + 'static,
606{
607    if create_object_mappings.is_enabled() {
608        info!(
609            ?create_object_mappings,
610            "Creating object mappings from the configured block onwards"
611        );
612    } else {
613        info!("Not creating object mappings");
614    }
615
616    let maybe_archiver = if segment_headers_store.max_segment_index().is_none() {
617        let initialize_archiver_fut = initialize_archiver(
618            &segment_headers_store,
619            &chain_info,
620            consensus_constants.confirmation_depth_k,
621            create_object_mappings,
622            erasure_coding.clone(),
623        );
624        Some(initialize_archiver_fut.await?)
625    } else {
626        None
627    };
628
629    Ok(async move {
630        let archiver = match maybe_archiver {
631            Some(archiver) => archiver,
632            None => {
633                let initialize_archiver_fut = initialize_archiver(
634                    &segment_headers_store,
635                    &chain_info,
636                    consensus_constants.confirmation_depth_k,
637                    create_object_mappings,
638                    erasure_coding.clone(),
639                );
640                initialize_archiver_fut.await?
641            }
642        };
643
644        let InitializedArchiver {
645            mut archiver,
646            best_archived_block,
647        } = archiver;
648        let (mut best_archived_block_root, mut best_archived_block_number) = best_archived_block;
649
650        while let Some(block_importing_notification) =
651            block_importing_notification_receiver.next().await
652        {
653            let importing_block_number = block_importing_notification.block_number;
654            let block_number_to_archive = match importing_block_number
655                .checked_sub(consensus_constants.confirmation_depth_k)
656            {
657                Some(block_number_to_archive) => block_number_to_archive,
658                None => {
659                    // Too early to archive blocks
660                    continue;
661                }
662            };
663
664            let last_archived_block_number = segment_headers_store
665                .last_segment_header()
666                .expect("Exists after archiver initialization; qed")
667                .last_archived_block
668                .number();
669            let create_mappings =
670                create_object_mappings.is_enabled_for_block(last_archived_block_number);
671            trace!(
672                %importing_block_number,
673                %block_number_to_archive,
674                %best_archived_block_number,
675                %last_archived_block_number,
676                "Checking if block needs to be skipped"
677            );
678
679            // Skip archived blocks, unless we're producing object mappings for them
680            let skip_last_archived_blocks =
681                last_archived_block_number > block_number_to_archive && !create_mappings;
682            if best_archived_block_number >= block_number_to_archive || skip_last_archived_blocks {
683                // This block was already archived, skip
684                debug!(
685                    %importing_block_number,
686                    %block_number_to_archive,
687                    %best_archived_block_number,
688                    %last_archived_block_number,
689                    "Skipping already archived block",
690                );
691                continue;
692            }
693
694            let best_block_root = chain_info.best_root();
695
696            // In case there was a block gap, re-initialize archiver and continue with the current
697            // block number (rather than block number at some depth) to allow for special sync
698            // modes where pre-verified blocks are inserted at some point in the future comparing to
699            // previously existing blocks
700            if best_archived_block_number + BlockNumber::ONE != block_number_to_archive {
701                let initialize_archiver_fut = initialize_archiver(
702                    &segment_headers_store,
703                    &chain_info,
704                    consensus_constants.confirmation_depth_k,
705                    create_object_mappings,
706                    erasure_coding.clone(),
707                );
708                InitializedArchiver {
709                    archiver,
710                    best_archived_block: (best_archived_block_root, best_archived_block_number),
711                } = initialize_archiver_fut.await?;
712
713                if best_archived_block_number + BlockNumber::ONE == block_number_to_archive {
714                    // As expected, can archive this block
715                } else if best_archived_block_number >= block_number_to_archive {
716                    // Special sync mode where verified blocks were inserted into the blockchain
717                    // directly, archiving of this block will naturally happen later
718                    continue;
719                } else if chain_info
720                    .ancestor_header(importing_block_number - BlockNumber::ONE, &best_block_root)
721                    .is_none()
722                {
723                    // We may have imported some block using special sync mode, so the block about
724                    // to be imported is the first one after the gap at which archiver is supposed
725                    // to be initialized, but we are only about to import it, so wait for the next
726                    // block for now
727                    continue;
728                } else {
729                    return Err(ArchiverTaskError::BlockGap {
730                        best_archived_block_number,
731                        block_number_to_archive,
732                        importing_block_number,
733                    });
734                }
735            }
736
737            (best_archived_block_root, best_archived_block_number) = archive_block(
738                &mut archiver,
739                segment_headers_store.clone(),
740                &chain_info,
741                &mut archived_segment_notification_sender,
742                best_archived_block_root,
743                block_number_to_archive,
744                &best_block_root,
745                create_object_mappings,
746            )
747            .await?;
748        }
749
750        Ok(())
751    })
752}
753
754/// Tries to archive `block_number` and returns new (or old if not changed) best archived block
755#[allow(clippy::too_many_arguments)]
756async fn archive_block<Block, CI>(
757    archiver: &mut Archiver,
758    segment_headers_store: SegmentHeadersStore,
759    chain_info: &CI,
760    // TODO: Probably remove
761    // object_mapping_notification_sender: SubspaceNotificationSender<ObjectMappingNotification>,
762    archived_segment_notification_sender: &mut mpsc::Sender<ArchivedSegmentNotification>,
763    best_archived_block_root: BlockRoot,
764    block_number_to_archive: BlockNumber,
765    best_block_root: &BlockRoot,
766    create_object_mappings: CreateObjectMappings,
767) -> Result<(BlockRoot, BlockNumber), ArchiverTaskError>
768where
769    Block: GenericOwnedBlock,
770    CI: ChainInfo<Block>,
771{
772    let header = chain_info
773        .ancestor_header(block_number_to_archive, best_block_root)
774        .expect("All blocks since last archived must be present; qed");
775
776    let parent_block_root = header.header().prefix.parent_root;
777    if parent_block_root != best_archived_block_root {
778        return Err(ArchiverTaskError::ArchivingReorg {
779            parent_block_root,
780            best_archived_block_root,
781        });
782    }
783
784    let block_root_to_archive = *header.header().root();
785
786    let block = chain_info
787        .block(&block_root_to_archive)
788        .await
789        .expect("All blocks since last archived must be present; qed");
790
791    debug!("Archiving block {block_number_to_archive} ({block_root_to_archive})");
792
793    let create_mappings = create_object_mappings.is_enabled_for_block(block_number_to_archive);
794
795    let block_object_mappings = if create_mappings {
796        // TODO: Injection of external logic
797        // client
798        //     .runtime_api()
799        //     .extract_block_object_mapping(parent_block_root, block.block.clone())
800        //     .map_err(|error| {
801        //         sp_blockchain::Error::Application(
802        //             format!("Failed to retrieve block object mappings: {error}").into(),
803        //         )
804        //     })?
805        Vec::new()
806    } else {
807        Vec::new()
808    };
809
810    let encoded_block = encode_block(&block);
811    debug!(
812        "Encoded block {block_number_to_archive} has size of {}",
813        ByteSize::b(encoded_block.len() as u64).display().iec(),
814    );
815
816    let block_outcome = archiver
817        .add_block(encoded_block, block_object_mappings)
818        .expect("Block is never empty and doesn't exceed u32; qed");
819    // TODO: Probably remove
820    // send_object_mapping_notification(
821    //     &object_mapping_notification_sender,
822    //     block_outcome.global_objects,
823    //     block_number_to_archive,
824    // );
825    for archived_segment in block_outcome.archived_segments {
826        let segment_header = archived_segment.segment_header;
827
828        segment_headers_store.add_segment_headers(slice::from_ref(&segment_header))?;
829
830        send_archived_segment_notification(archived_segment_notification_sender, archived_segment)
831            .await;
832    }
833
834    Ok((block_root_to_archive, block_number_to_archive))
835}
836
837// TODO: Probably remove
838// fn send_object_mapping_notification(
839//     object_mapping_notification_sender: &SubspaceNotificationSender<ObjectMappingNotification>,
840//     object_mapping: Vec<GlobalObject>,
841//     block_number: BlockNumber,
842// ) {
843//     if object_mapping.is_empty() {
844//         return;
845//     }
846//
847//     let object_mapping_notification = ObjectMappingNotification {
848//         object_mapping,
849//         block_number,
850//     };
851//
852//     object_mapping_notification_sender.notify(move || object_mapping_notification);
853// }
854
855async fn send_archived_segment_notification(
856    archived_segment_notification_sender: &mut mpsc::Sender<ArchivedSegmentNotification>,
857    archived_segment: NewArchivedSegment,
858) {
859    let segment_index = archived_segment.segment_header.segment_index;
860    let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(1);
861    // Keep `archived_segment` around until all acknowledgements are received since some receivers
862    // might use weak references
863    let archived_segment = Arc::new(archived_segment);
864    let archived_segment_notification = ArchivedSegmentNotification {
865        archived_segment: Arc::clone(&archived_segment),
866        acknowledgement_sender,
867    };
868
869    if let Err(error) = archived_segment_notification_sender
870        .send(archived_segment_notification)
871        .await
872    {
873        warn!(
874            %error,
875            "Failed to send archived segment notification"
876        );
877    }
878
879    let wait_fut = async {
880        while acknowledgement_receiver.next().await.is_some() {
881            debug!(
882                "Archived segment notification acknowledged: {}",
883                segment_index
884            );
885        }
886    };
887
888    if tokio::time::timeout(ACKNOWLEDGEMENT_TIMEOUT, wait_fut)
889        .await
890        .is_err()
891    {
892        warn!(
893            "Archived segment notification was not acknowledged and reached timeout, continue \
894            regardless"
895        );
896    }
897}