ab_client_archiving/
lib.rs

1//! Consensus archiver responsible for archival of blockchain history, it is driven by the block
2//! import pipeline.
3//!
4//! Implements archiving process in that converts blockchain history (blocks) into archived history
5//! (segments and pieces).
6//!
7//! The main entry point here is [`create_archiver_task`] that will create a task, which while
8//! driven will perform the archiving itself.
9//!
10//! Archiving is triggered by block importing notification and tries to archive the block at
11//! [`ConsensusConstants::confirmation_depth_k`](ab_client_consensus_common::ConsensusConstants::confirmation_depth_k)
12//! depth from the block being imported. Block import will then wait for archiver to acknowledge
13//! processing, which is necessary for ensuring that when the next block is imported, it will
14//! contain a segment header of the newly archived block (must happen exactly in the next block).
15//!
16//! Archiving itself will also wait for acknowledgement by various subscribers before proceeding,
17//! which includes farmer subscription, in case of reference implementation via RPC.
18//!
19//! Known segment headers contain all known (including future in case of syncing) segment headers.
20//! It is available to other parts of the protocol that need to know what the correct archival
21//! history of the blockchain looks like through [`ChainInfo`]. For example, it is used during node
22//! sync and farmer plotting to verify pieces of archival history received from other network
23//! participants.
24//!
25//! [`recreate_genesis_segment`] is a bit of a hack and is useful for deriving of the genesis
26//! segment that is a special case since we don't have enough data in the blockchain history itself
27//! during genesis to do the archiving.
28//!
29//! [`encode_block`] and [`decode_block`] are symmetric encoding/decoding functions turning
30//! Blocks into bytes and back.
31
32use ab_aligned_buffer::SharedAlignedBuffer;
33use ab_archiving::archiver::{Archiver, ArchiverInstantiationError, NewArchivedSegment};
34use ab_archiving::objects::{BlockObject, GlobalObject};
35use ab_client_api::{ChainInfo, ChainInfoWrite, PersistSegmentHeadersError};
36use ab_client_consensus_common::{BlockImportingNotification, ConsensusConstants};
37use ab_core_primitives::block::body::owned::GenericOwnedBlockBody;
38use ab_core_primitives::block::header::GenericBlockHeader;
39use ab_core_primitives::block::header::owned::GenericOwnedBlockHeader;
40use ab_core_primitives::block::owned::{GenericOwnedBlock, OwnedBeaconChainBlock};
41use ab_core_primitives::block::{BlockNumber, BlockRoot, GenericBlock};
42use ab_core_primitives::segments::{RecordedHistorySegment, SegmentHeader, SegmentIndex};
43use ab_core_primitives::shard::RealShardKind;
44use ab_erasure_coding::ErasureCoding;
45use bytesize::ByteSize;
46use chacha20::ChaCha8Rng;
47use chacha20::rand_core::{RngCore, SeedableRng};
48use futures::channel::mpsc;
49use futures::prelude::*;
50use std::num::NonZeroU64;
51use std::sync::Arc;
52use std::time::Duration;
53use tracing::{debug, info, trace, warn};
54
55/// Do not wait for acknowledgements beyond this time limit
56const ACKNOWLEDGEMENT_TIMEOUT: Duration = Duration::from_mins(2);
57
58// TODO: Maybe use or remove if database handles this completely on its own
59// /// How deep (in segments) should block be in order to be finalized.
60// ///
61// /// This is required for full nodes to not prune recent history such that keep-up sync in
62// /// Substrate works even without archival nodes (initial sync will be done from DSN).
63// ///
64// /// Ideally, we'd decouple pruning from finalization, but it may require invasive changes in
65// /// Substrate and is not worth it right now.
66// /// https://github.com/paritytech/substrate/discussions/14359
67// const FINALIZATION_DEPTH_IN_SEGMENTS: SegmentIndex = SegmentIndex::new(5);
68
69/// Notification with a new archived segment that was just archived
70#[derive(Debug)]
71pub struct ArchivedSegmentNotification {
72    /// Archived segment.
73    pub archived_segment: Arc<NewArchivedSegment>,
74    /// Sender that signified the fact of receiving archived segment by farmer.
75    ///
76    /// This must be used to send a message or else block import pipeline will get stuck.
77    pub acknowledgement_sender: mpsc::Sender<()>,
78}
79
80/// Notification with incrementally generated object mappings for a block (and any previous block
81/// continuation)
82#[derive(Debug, Clone)]
83pub struct ObjectMappingNotification {
84    /// Incremental object mappings for a block (and any previous block continuation).
85    ///
86    /// The archived data won't be available in pieces until the entire segment is full and
87    /// archived.
88    pub object_mapping: Vec<GlobalObject>,
89    /// The block that these mappings are from.
90    pub block_number: BlockNumber,
91    // TODO: add an acknowledgement_sender for backpressure if needed
92}
93
94/// Whether to create object mappings.
95#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
96pub enum CreateObjectMappings {
97    /// Start creating object mappings from this block number.
98    ///
99    /// This can be lower than the latest archived block, but must be greater than genesis.
100    ///
101    /// The genesis block doesn't have mappings, so starting mappings at genesis is pointless.
102    /// The archiver will fail if it can't get the data for this block, but snap sync doesn't store
103    /// the genesis data on disk.  So avoiding genesis also avoids this error.
104    /// <https://github.com/paritytech/polkadot-sdk/issues/5366>
105    Block(NonZeroU64),
106    /// Create object mappings as archiving is happening
107    Yes,
108    /// Don't create object mappings
109    #[default]
110    No,
111}
112
113impl CreateObjectMappings {
114    /// The fixed block number to start creating object mappings from.
115    /// If there is no fixed block number, or mappings are disabled, returns None.
116    fn block(&self) -> Option<BlockNumber> {
117        match self {
118            CreateObjectMappings::Block(block) => Some(BlockNumber::new(block.get())),
119            CreateObjectMappings::Yes => None,
120            CreateObjectMappings::No => None,
121        }
122    }
123
124    /// Returns true if object mappings will be created from a past or future block.
125    pub fn is_enabled(&self) -> bool {
126        !matches!(self, CreateObjectMappings::No)
127    }
128
129    /// Does the supplied block number need object mappings?
130    pub fn is_enabled_for_block(&self, block: BlockNumber) -> bool {
131        if !self.is_enabled() {
132            return false;
133        }
134
135        if let Some(target_block) = self.block() {
136            return block >= target_block;
137        }
138
139        // We're continuing where we left off, so all blocks get mappings.
140        true
141    }
142}
143
144async fn find_last_archived_block<Block, CI, COM>(
145    chain_info: &CI,
146    best_block_number_to_archive: BlockNumber,
147    best_block_root: &BlockRoot,
148    create_object_mappings: Option<COM>,
149) -> Option<(SegmentHeader, Block, Vec<BlockObject>)>
150where
151    Block: GenericOwnedBlock,
152    CI: ChainInfo<Block>,
153    COM: Fn(&Block) -> Vec<BlockObject>,
154{
155    let max_segment_index = chain_info.max_segment_index()?;
156
157    if max_segment_index == SegmentIndex::ZERO {
158        // Just genesis, nothing else to check
159        return None;
160    }
161
162    for segment_header in (SegmentIndex::ZERO..=max_segment_index)
163        .rev()
164        .filter_map(|segment_index| chain_info.get_segment_header(segment_index))
165    {
166        let last_archived_block_number = segment_header.last_archived_block.number();
167
168        if last_archived_block_number > best_block_number_to_archive {
169            // Last archived block in segment header is too high for the current state of the chain
170            // (segment headers store may know about more blocks in existence than is currently
171            // imported)
172            continue;
173        }
174
175        let Some(last_archived_block_header) =
176            chain_info.ancestor_header(last_archived_block_number, best_block_root)
177        else {
178            // This block number is not in our chain yet (segment headers store may know about more
179            // blocks in existence than is currently imported)
180            continue;
181        };
182
183        let last_archived_block_root = &*last_archived_block_header.header().root();
184
185        let Ok(last_archived_block) = chain_info.block(last_archived_block_root).await else {
186            // Block might have been pruned between ancestor search and disk read
187            continue;
188        };
189
190        // If we're starting mapping creation at this block, return its mappings
191        let block_object_mappings = if let Some(create_object_mappings) = create_object_mappings {
192            create_object_mappings(&last_archived_block)
193        } else {
194            Vec::new()
195        };
196
197        return Some((segment_header, last_archived_block, block_object_mappings));
198    }
199
200    None
201}
202
203/// Derive the genesis segment on demand, returns `Ok(None)` in case the genesis block was already
204/// pruned
205pub fn recreate_genesis_segment(
206    owned_genesis_block: &OwnedBeaconChainBlock,
207    erasure_coding: ErasureCoding,
208) -> NewArchivedSegment {
209    let encoded_block = encode_block(owned_genesis_block);
210
211    // There are no mappings in the genesis block, so they can be ignored
212    let block_outcome = Archiver::new(erasure_coding)
213        .add_block(encoded_block, Vec::new())
214        .expect("Block is never empty and doesn't exceed u32; qed");
215    block_outcome
216        .archived_segments
217        .into_iter()
218        .next()
219        .expect("Genesis block always results in exactly one archived segment; qed")
220}
221
222/// Encode block for archiving purposes
223pub fn encode_block<Block>(block: &Block) -> Vec<u8>
224where
225    Block: GenericOwnedBlock,
226{
227    let is_beacon_chain_genesis_block = Block::Block::SHARD_KIND == RealShardKind::BeaconChain
228        && block.header().header().prefix.number == BlockNumber::ZERO;
229    let header_buffer = block.header().buffer();
230    let body_buffer = block.body().buffer();
231
232    // TODO: Extra allocation here is unfortunate, would be nice to avoid it
233    let mut encoded_block = Vec::with_capacity(if is_beacon_chain_genesis_block {
234        RecordedHistorySegment::SIZE
235    } else {
236        size_of::<u32>() * 2 + header_buffer.len() as usize + body_buffer.len() as usize
237    });
238
239    encoded_block.extend_from_slice(&header_buffer.len().to_le_bytes());
240    encoded_block.extend_from_slice(&body_buffer.len().to_le_bytes());
241    encoded_block.extend_from_slice(header_buffer);
242    encoded_block.extend_from_slice(body_buffer);
243
244    if is_beacon_chain_genesis_block {
245        let encoded_block_length = encoded_block.len();
246
247        // Encoding of the genesis block is extended with extra data such that the very first
248        // archived segment can be produced right away, bootstrapping the farming process.
249        //
250        // Note: we add it to the end of the encoded block, so during decoding it'll actually be
251        // ignored even though it is technically present in encoded form.
252        encoded_block.resize(RecordedHistorySegment::SIZE, 0);
253        let mut rng = ChaCha8Rng::from_seed(*block.header().header().result.state_root);
254        rng.fill_bytes(&mut encoded_block[encoded_block_length..]);
255    }
256
257    encoded_block
258}
259
260/// Symmetrical to [`encode_block()`], used to decode previously encoded blocks
261pub fn decode_block<Block>(mut encoded_block: &[u8]) -> Option<Block>
262where
263    Block: GenericOwnedBlock,
264{
265    let header_length = {
266        let header_length = encoded_block.split_off(..size_of::<u32>())?;
267        u32::from_le_bytes([
268            header_length[0],
269            header_length[1],
270            header_length[2],
271            header_length[3],
272        ])
273    };
274    let body_length = {
275        let body_length = encoded_block.split_off(..size_of::<u32>())?;
276        u32::from_le_bytes([
277            body_length[0],
278            body_length[1],
279            body_length[2],
280            body_length[3],
281        ])
282    };
283
284    let header_buffer = encoded_block.split_off(..header_length as usize)?;
285    let body_buffer = encoded_block.split_off(..body_length as usize)?;
286
287    let header_buffer = SharedAlignedBuffer::from_bytes(header_buffer);
288    let body_buffer = SharedAlignedBuffer::from_bytes(body_buffer);
289
290    Block::from_buffers(header_buffer, body_buffer)
291}
292
293/// Archiver task error
294#[derive(Debug, thiserror::Error)]
295pub enum ArchiverTaskError {
296    /// Archiver instantiation error
297    #[error("Archiver instantiation error: {error}")]
298    Instantiation {
299        /// Low-level error
300        #[from]
301        error: ArchiverInstantiationError,
302    },
303    /// Failed to persis a new segment header
304    #[error("Failed to persis a new segment header: {error}")]
305    PersistSegmentHeaders {
306        /// Low-level error
307        #[from]
308        error: PersistSegmentHeadersError,
309    },
310    /// Attempt to switch to a different fork beyond archiving depth
311    #[error(
312        "Attempt to switch to a different fork beyond archiving depth: parent block root \
313        {parent_block_root}, best archived block root {best_archived_block_root}"
314    )]
315    ArchivingReorg {
316        /// Parent block root
317        parent_block_root: BlockRoot,
318        /// Best archived block root
319        best_archived_block_root: BlockRoot,
320    },
321    /// There was a gap in blockchain history, and the last contiguous series of blocks doesn't
322    /// start with the archived segment
323    #[error(
324        "There was a gap in blockchain history, and the last contiguous series of blocks doesn't \
325        start with the archived segment (best archived block number {best_archived_block_number}, \
326        block number to archive {block_number_to_archive}), block about to be imported \
327        {importing_block_number})"
328    )]
329    BlockGap {
330        /// Best archived block number
331        best_archived_block_number: BlockNumber,
332        /// Block number to archive
333        block_number_to_archive: BlockNumber,
334        /// Importing block number
335        importing_block_number: BlockNumber,
336    },
337}
338
339struct InitializedArchiver {
340    archiver: Archiver,
341    best_archived_block: (BlockRoot, BlockNumber),
342}
343
344async fn initialize_archiver<Block, CI>(
345    chain_info: &CI,
346    confirmation_depth_k: BlockNumber,
347    create_object_mappings: CreateObjectMappings,
348    erasure_coding: ErasureCoding,
349) -> Result<InitializedArchiver, ArchiverTaskError>
350where
351    Block: GenericOwnedBlock,
352    CI: ChainInfoWrite<Block>,
353{
354    let best_block_header = chain_info.best_header();
355    let best_block_root = *best_block_header.header().root();
356    let best_block_number: BlockNumber = best_block_header.header().prefix.number;
357
358    let mut best_block_to_archive = best_block_number.saturating_sub(confirmation_depth_k);
359    // Choose a lower block number if we want to get mappings from that specific block.
360    // If we are continuing from where we left off, we don't need to change the block number to
361    // archive. If there is no path to this block from the tip due to snap sync, we'll start
362    // archiving from an earlier segment, then start mapping again once archiving reaches this
363    // block.
364    if let Some(block_number) = create_object_mappings.block() {
365        // There aren't any mappings in the genesis block, so starting there is pointless.
366        // (And causes errors on restart, because genesis block data is never stored during snap
367        // sync.)
368        best_block_to_archive = best_block_to_archive.min(block_number);
369    }
370
371    if (best_block_to_archive..best_block_number).any(|block_number| {
372        chain_info
373            .ancestor_header(block_number, &best_block_root)
374            .is_none()
375    }) {
376        // If there are blocks missing headers between best block to archive and best block of the
377        // blockchain it means newer block was inserted in some special way and as such is by
378        // definition valid, so we can simply assume that is our best block to archive instead
379        best_block_to_archive = best_block_number;
380    }
381
382    // TODO: Uncomment once API for object mapping is established
383    // // If the user chooses an object mapping start block we don't have data or state for, we
384    // // can't create mappings for it, so the node must exit with an error. We ignore genesis
385    // // here, because it doesn't have mappings.
386    // if create_object_mappings.is_enabled() && best_block_to_archive >= BlockNumber::ONE {
387    //     let Some(best_block_to_archive_root) = client.root(NumberFor::<Block>::saturated_from(
388    //         best_block_to_archive.as_u64(),
389    //     ))?
390    //     else {
391    //         let error = format!(
392    //             "Missing root for mapping block {best_block_to_archive}, \
393    //             try a higher block number, or wipe your node and restart with `--sync full`"
394    //         );
395    //         return Err(sp_blockchain::Error::Application(error.into()));
396    //     };
397    //
398    //     let Some(_best_block_data) = client.block(best_block_to_archive_root)? else {
399    //         let error = format!(
400    //             "Missing data for mapping block {best_block_to_archive} \
401    //             root {best_block_to_archive_root}, \
402    //             try a higher block number, or wipe your node and restart with `--sync full`"
403    //         );
404    //         return Err(sp_blockchain::Error::Application(error.into()));
405    //     };
406    //
407    //     // Similarly, state can be pruned, even if the data is present
408    //     // TODO: Injection of external logic
409    //     // client
410    //     //     .runtime_api()
411    //     //     .extract_block_object_mapping(
412    //     //         *best_block_data.block.header().parent_root(),
413    //     //         best_block_data.block.clone(),
414    //     //     )
415    //     //     .map_err(|error| {
416    //     //         sp_blockchain::Error::Application(
417    //     //             format!(
418    //     //                 "Missing state for mapping block {best_block_to_archive} \
419    //     //                 root {best_block_to_archive_root}: {error}, \
420    //     //                 try a higher block number, or wipe your node and restart with
421    //     //                 `--sync full`"
422    //     //             )
423    //     //             .into(),
424    //     //         )
425    //     //     })?;
426    // }
427
428    let maybe_last_archived_block = find_last_archived_block(
429        chain_info,
430        best_block_to_archive,
431        &best_block_root,
432        create_object_mappings
433            .is_enabled()
434            .then_some(|_block: &Block| {
435                // TODO: Injection of external logic
436                // let parent_root = *block.header().parent_root();
437                // client
438                //     .runtime_api()
439                //     .extract_block_object_mapping(parent_root, block)
440                //     .unwrap_or_default()
441                Vec::new()
442            }),
443    )
444    .await;
445
446    let have_last_segment_header = maybe_last_archived_block.is_some();
447    let mut best_archived_block = None::<(BlockRoot, BlockNumber)>;
448
449    let mut archiver =
450        if let Some((last_segment_header, last_archived_block, block_object_mappings)) =
451            maybe_last_archived_block
452        {
453            // Continuing from existing initial state
454            let last_archived_block_number = last_segment_header.last_archived_block.number;
455            info!(
456                %last_archived_block_number,
457                "Resuming archiver from last archived block",
458            );
459
460            let last_archived_block_header = last_archived_block.header().header();
461            // Set initial value, this is needed in case only genesis block was archived and there
462            // is nothing else available
463            best_archived_block.replace((
464                *last_archived_block_header.root(),
465                last_archived_block_header.prefix.number,
466            ));
467
468            let last_archived_block_encoded = encode_block(&last_archived_block);
469
470            Archiver::with_initial_state(
471                erasure_coding,
472                last_segment_header,
473                &last_archived_block_encoded,
474                block_object_mappings,
475            )?
476        } else {
477            info!("Starting archiving from genesis");
478
479            Archiver::new(erasure_coding)
480        };
481
482    // Process blocks since last fully archived block up to the current head minus K
483    {
484        let blocks_to_archive_from = archiver
485            .last_archived_block_number()
486            .map(|n| n + BlockNumber::ONE)
487            .unwrap_or_default();
488        let blocks_to_archive_to = best_block_number
489            .checked_sub(confirmation_depth_k)
490            .filter(|&blocks_to_archive_to| blocks_to_archive_to >= blocks_to_archive_from)
491            .or({
492                if have_last_segment_header {
493                    None
494                } else {
495                    // If not continuation, archive genesis block
496                    Some(BlockNumber::ZERO)
497                }
498            });
499
500        if let Some(blocks_to_archive_to) = blocks_to_archive_to {
501            info!(
502                "Archiving already produced blocks {}..={}",
503                blocks_to_archive_from, blocks_to_archive_to,
504            );
505
506            for block_number_to_archive in blocks_to_archive_from..=blocks_to_archive_to {
507                let header = chain_info
508                    .ancestor_header(block_number_to_archive, &best_block_root)
509                    .expect("All blocks since last archived must be present; qed");
510
511                let block = chain_info
512                    .block(&header.header().root())
513                    .await
514                    .expect("All blocks since last archived must be present; qed");
515
516                let block_object_mappings =
517                    if create_object_mappings.is_enabled_for_block(block_number_to_archive) {
518                        // TODO: Injection of external logic
519                        // runtime_api
520                        //     .extract_block_object_mapping(
521                        //         *block.block.header().parent_root(),
522                        //         block.block.clone(),
523                        //     )
524                        //     .unwrap_or_default()
525                        Vec::new()
526                    } else {
527                        Vec::new()
528                    };
529
530                let encoded_block = encode_block(&block);
531
532                debug!(
533                    "Encoded block {} has size of {}",
534                    block_number_to_archive,
535                    ByteSize::b(encoded_block.len() as u64).display().iec(),
536                );
537
538                let block_outcome = archiver
539                    .add_block(encoded_block, block_object_mappings)
540                    .expect("Block is never empty and doesn't exceed u32; qed");
541                // TODO: Allow to capture these from the outside
542                // send_object_mapping_notification(
543                //     &subspace_link.object_mapping_notification_sender,
544                //     block_outcome.global_objects,
545                //     block_number_to_archive,
546                // );
547                let new_segment_headers: Vec<SegmentHeader> = block_outcome
548                    .archived_segments
549                    .iter()
550                    .map(|archived_segment| archived_segment.segment_header)
551                    .collect();
552
553                if !new_segment_headers.is_empty() {
554                    chain_info
555                        .persist_segment_headers(new_segment_headers)
556                        .await?;
557                }
558
559                if block_number_to_archive == blocks_to_archive_to {
560                    best_archived_block.replace((*header.header().root(), block_number_to_archive));
561                }
562            }
563        }
564    }
565
566    Ok(InitializedArchiver {
567        archiver,
568        best_archived_block: best_archived_block
569            .expect("Must always set if there is no logical error; qed"),
570    })
571}
572
573/// Create an archiver task.
574///
575/// Archiver task will listen for importing blocks and archive blocks at `K` depth, producing pieces
576/// and segment headers (segment headers are then added back to the blockchain in the next block).
577///
578/// NOTE: Archiver is doing blocking operations and must run in a dedicated task.
579///
580/// Archiver is only able to move forward and doesn't support reorgs. Upon restart, it will check
581/// segments in [`ChainInfo`] and chain history to reconstruct the "current" state it was in before
582/// the last shutdown and continue incrementally archiving blockchain history from there.
583///
584/// Archiving is triggered by block importing notification (`block_importing_notification_receiver`)
585/// and tries to archive the block at [`ConsensusConstants::confirmation_depth_k`] depth from the
586/// block being imported. Block importing will then wait for archiver to acknowledge processing,
587/// which is necessary for ensuring that when the next block is imported, the body will contain a
588/// segment header of the newly archived segment.
589///
590/// `create_object_mappings` controls when object mappings are created for archived blocks. When
591/// these mappings are created.
592///
593/// Once a new segment is archived, a notification (`archived_segment_notification_sender`) will be
594/// sent and archiver will be paused until all receivers have provided an acknowledgement for it.
595pub async fn create_archiver_task<Block, CI>(
596    chain_info: CI,
597    mut block_importing_notification_receiver: mpsc::Receiver<BlockImportingNotification>,
598    mut archived_segment_notification_sender: mpsc::Sender<ArchivedSegmentNotification>,
599    consensus_constants: ConsensusConstants,
600    create_object_mappings: CreateObjectMappings,
601    erasure_coding: ErasureCoding,
602) -> Result<impl Future<Output = Result<(), ArchiverTaskError>> + Send + 'static, ArchiverTaskError>
603where
604    Block: GenericOwnedBlock,
605    CI: ChainInfoWrite<Block> + 'static,
606{
607    if create_object_mappings.is_enabled() {
608        info!(
609            ?create_object_mappings,
610            "Creating object mappings from the configured block onwards"
611        );
612    } else {
613        info!("Not creating object mappings");
614    }
615
616    let maybe_archiver = if chain_info.max_segment_index().is_none() {
617        let initialize_archiver_fut = initialize_archiver(
618            &chain_info,
619            consensus_constants.confirmation_depth_k,
620            create_object_mappings,
621            erasure_coding.clone(),
622        );
623        Some(initialize_archiver_fut.await?)
624    } else {
625        None
626    };
627
628    Ok(async move {
629        let archiver = match maybe_archiver {
630            Some(archiver) => archiver,
631            None => {
632                let initialize_archiver_fut = initialize_archiver(
633                    &chain_info,
634                    consensus_constants.confirmation_depth_k,
635                    create_object_mappings,
636                    erasure_coding.clone(),
637                );
638                initialize_archiver_fut.await?
639            }
640        };
641
642        let InitializedArchiver {
643            mut archiver,
644            best_archived_block,
645        } = archiver;
646        let (mut best_archived_block_root, mut best_archived_block_number) = best_archived_block;
647
648        while let Some(block_importing_notification) =
649            block_importing_notification_receiver.next().await
650        {
651            let importing_block_number = block_importing_notification.block_number;
652            let block_number_to_archive = match importing_block_number
653                .checked_sub(consensus_constants.confirmation_depth_k)
654            {
655                Some(block_number_to_archive) => block_number_to_archive,
656                None => {
657                    // Too early to archive blocks
658                    continue;
659                }
660            };
661
662            let last_archived_block_number = chain_info
663                .last_segment_header()
664                .expect("Exists after archiver initialization; qed")
665                .last_archived_block
666                .number();
667            let create_mappings =
668                create_object_mappings.is_enabled_for_block(last_archived_block_number);
669            trace!(
670                %importing_block_number,
671                %block_number_to_archive,
672                %best_archived_block_number,
673                %last_archived_block_number,
674                "Checking if block needs to be skipped"
675            );
676
677            // Skip archived blocks, unless we're producing object mappings for them
678            let skip_last_archived_blocks =
679                last_archived_block_number > block_number_to_archive && !create_mappings;
680            if best_archived_block_number >= block_number_to_archive || skip_last_archived_blocks {
681                // This block was already archived, skip
682                debug!(
683                    %importing_block_number,
684                    %block_number_to_archive,
685                    %best_archived_block_number,
686                    %last_archived_block_number,
687                    "Skipping already archived block",
688                );
689                continue;
690            }
691
692            let best_block_root = chain_info.best_root();
693
694            // In case there was a block gap, re-initialize archiver and continue with the current
695            // block number (rather than block number at some depth) to allow for special sync
696            // modes where pre-verified blocks are inserted at some point in the future comparing to
697            // previously existing blocks
698            if best_archived_block_number + BlockNumber::ONE != block_number_to_archive {
699                let initialize_archiver_fut = initialize_archiver(
700                    &chain_info,
701                    consensus_constants.confirmation_depth_k,
702                    create_object_mappings,
703                    erasure_coding.clone(),
704                );
705                InitializedArchiver {
706                    archiver,
707                    best_archived_block: (best_archived_block_root, best_archived_block_number),
708                } = initialize_archiver_fut.await?;
709
710                if best_archived_block_number + BlockNumber::ONE == block_number_to_archive {
711                    // As expected, can archive this block
712                } else if best_archived_block_number >= block_number_to_archive {
713                    // Special sync mode where verified blocks were inserted into the blockchain
714                    // directly, archiving of this block will naturally happen later
715                    continue;
716                } else if chain_info
717                    .ancestor_header(importing_block_number - BlockNumber::ONE, &best_block_root)
718                    .is_none()
719                {
720                    // We may have imported some block using special sync mode, so the block about
721                    // to be imported is the first one after the gap at which archiver is supposed
722                    // to be initialized, but we are only about to import it, so wait for the next
723                    // block for now
724                    continue;
725                } else {
726                    return Err(ArchiverTaskError::BlockGap {
727                        best_archived_block_number,
728                        block_number_to_archive,
729                        importing_block_number,
730                    });
731                }
732            }
733
734            (best_archived_block_root, best_archived_block_number) = archive_block(
735                &mut archiver,
736                &chain_info,
737                &mut archived_segment_notification_sender,
738                best_archived_block_root,
739                block_number_to_archive,
740                &best_block_root,
741                create_object_mappings,
742            )
743            .await?;
744        }
745
746        Ok(())
747    })
748}
749
750/// Tries to archive `block_number` and returns new (or old if not changed) best archived block
751async fn archive_block<Block, CI>(
752    archiver: &mut Archiver,
753    chain_info: &CI,
754    // TODO: Probably remove
755    // object_mapping_notification_sender: SubspaceNotificationSender<ObjectMappingNotification>,
756    archived_segment_notification_sender: &mut mpsc::Sender<ArchivedSegmentNotification>,
757    best_archived_block_root: BlockRoot,
758    block_number_to_archive: BlockNumber,
759    best_block_root: &BlockRoot,
760    create_object_mappings: CreateObjectMappings,
761) -> Result<(BlockRoot, BlockNumber), ArchiverTaskError>
762where
763    Block: GenericOwnedBlock,
764    CI: ChainInfoWrite<Block>,
765{
766    let header = chain_info
767        .ancestor_header(block_number_to_archive, best_block_root)
768        .expect("All blocks since last archived must be present; qed");
769
770    let parent_block_root = header.header().prefix.parent_root;
771    if parent_block_root != best_archived_block_root {
772        return Err(ArchiverTaskError::ArchivingReorg {
773            parent_block_root,
774            best_archived_block_root,
775        });
776    }
777
778    let block_root_to_archive = *header.header().root();
779
780    let block = chain_info
781        .block(&block_root_to_archive)
782        .await
783        .expect("All blocks since last archived must be present; qed");
784
785    debug!("Archiving block {block_number_to_archive} ({block_root_to_archive})");
786
787    let create_mappings = create_object_mappings.is_enabled_for_block(block_number_to_archive);
788
789    let block_object_mappings = if create_mappings {
790        // TODO: Injection of external logic
791        // client
792        //     .runtime_api()
793        //     .extract_block_object_mapping(parent_block_root, block.block.clone())
794        //     .map_err(|error| {
795        //         sp_blockchain::Error::Application(
796        //             format!("Failed to retrieve block object mappings: {error}").into(),
797        //         )
798        //     })?
799        Vec::new()
800    } else {
801        Vec::new()
802    };
803
804    let encoded_block = encode_block(&block);
805    debug!(
806        "Encoded block {block_number_to_archive} has size of {}",
807        ByteSize::b(encoded_block.len() as u64).display().iec(),
808    );
809
810    let block_outcome = archiver
811        .add_block(encoded_block, block_object_mappings)
812        .expect("Block is never empty and doesn't exceed u32; qed");
813    // TODO: Probably remove
814    // send_object_mapping_notification(
815    //     &object_mapping_notification_sender,
816    //     block_outcome.global_objects,
817    //     block_number_to_archive,
818    // );
819    for archived_segment in block_outcome.archived_segments {
820        let segment_header = archived_segment.segment_header;
821
822        chain_info
823            .persist_segment_headers(vec![segment_header])
824            .await?;
825
826        send_archived_segment_notification(archived_segment_notification_sender, archived_segment)
827            .await;
828    }
829
830    Ok((block_root_to_archive, block_number_to_archive))
831}
832
833// TODO: Probably remove
834// fn send_object_mapping_notification(
835//     object_mapping_notification_sender: &SubspaceNotificationSender<ObjectMappingNotification>,
836//     object_mapping: Vec<GlobalObject>,
837//     block_number: BlockNumber,
838// ) {
839//     if object_mapping.is_empty() {
840//         return;
841//     }
842//
843//     let object_mapping_notification = ObjectMappingNotification {
844//         object_mapping,
845//         block_number,
846//     };
847//
848//     object_mapping_notification_sender.notify(move || object_mapping_notification);
849// }
850
851async fn send_archived_segment_notification(
852    archived_segment_notification_sender: &mut mpsc::Sender<ArchivedSegmentNotification>,
853    archived_segment: NewArchivedSegment,
854) {
855    let segment_index = archived_segment.segment_header.segment_index;
856    let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(1);
857    // Keep `archived_segment` around until all acknowledgements are received since some receivers
858    // might use weak references
859    let archived_segment = Arc::new(archived_segment);
860    let archived_segment_notification = ArchivedSegmentNotification {
861        archived_segment: Arc::clone(&archived_segment),
862        acknowledgement_sender,
863    };
864
865    if let Err(error) = archived_segment_notification_sender
866        .send(archived_segment_notification)
867        .await
868    {
869        warn!(
870            %error,
871            "Failed to send archived segment notification"
872        );
873    }
874
875    let wait_fut = async {
876        while acknowledgement_receiver.next().await.is_some() {
877            debug!(
878                "Archived segment notification acknowledged: {}",
879                segment_index
880            );
881        }
882    };
883
884    if tokio::time::timeout(ACKNOWLEDGEMENT_TIMEOUT, wait_fut)
885        .await
886        .is_err()
887    {
888        warn!(
889            "Archived segment notification was not acknowledged and reached timeout, continue \
890            regardless"
891        );
892    }
893}