ab_client_archiving/lib.rs
1//! Consensus archiver responsible for archival of blockchain history, it is driven by the block
2//! import pipeline.
3//!
4//! Implements archiving process in that converts blockchain history (blocks) into archived history
5//! (segments and pieces).
6//!
7//! The main entry point here is [`create_archiver_task`] that will create a task, which while
8//! driven will perform the archiving itself.
9//!
10//! Archiving is triggered by block importing notification and tries to archive the block at
11//! [`ConsensusConstants::confirmation_depth_k`](ab_client_consensus_common::ConsensusConstants::confirmation_depth_k)
12//! depth from the block being imported. Block import will then wait for archiver to acknowledge
13//! processing, which is necessary for ensuring that when the next block is imported, it will
14//! contain a segment header of the newly archived block (must happen exactly in the next block).
15//!
16//! Archiving itself will also wait for acknowledgement by various subscribers before proceeding,
17//! which includes farmer subscription, in case of reference implementation via RPC.
18//!
19//! Known segment headers contain all known (including future in case of syncing) segment headers.
20//! It is available to other parts of the protocol that need to know what the correct archival
21//! history of the blockchain looks like through [`ChainInfo`]. For example, it is used during node
22//! sync and farmer plotting to verify pieces of archival history received from other network
23//! participants.
24//!
25//! [`recreate_genesis_segment`] is a bit of a hack and is useful for deriving of the genesis
26//! segment that is a special case since we don't have enough data in the blockchain history itself
27//! during genesis to do the archiving.
28//!
29//! [`encode_block`] and [`decode_block`] are symmetric encoding/decoding functions turning
30//! Blocks into bytes and back.
31
32use ab_aligned_buffer::SharedAlignedBuffer;
33use ab_archiving::archiver::{Archiver, ArchiverInstantiationError, NewArchivedSegment};
34use ab_archiving::objects::{BlockObject, GlobalObject};
35use ab_client_api::{ChainInfo, ChainInfoWrite, PersistSegmentHeadersError};
36use ab_client_consensus_common::{BlockImportingNotification, ConsensusConstants};
37use ab_core_primitives::block::body::owned::GenericOwnedBlockBody;
38use ab_core_primitives::block::header::GenericBlockHeader;
39use ab_core_primitives::block::header::owned::GenericOwnedBlockHeader;
40use ab_core_primitives::block::owned::{GenericOwnedBlock, OwnedBeaconChainBlock};
41use ab_core_primitives::block::{BlockNumber, BlockRoot, GenericBlock};
42use ab_core_primitives::segments::{RecordedHistorySegment, SegmentHeader, SegmentIndex};
43use ab_core_primitives::shard::RealShardKind;
44use ab_erasure_coding::ErasureCoding;
45use bytesize::ByteSize;
46use chacha20::ChaCha8Rng;
47use chacha20::rand_core::{RngCore, SeedableRng};
48use futures::channel::mpsc;
49use futures::prelude::*;
50use std::num::NonZeroU64;
51use std::sync::Arc;
52use std::time::Duration;
53use tracing::{debug, info, trace, warn};
54
55/// Do not wait for acknowledgements beyond this time limit
56const ACKNOWLEDGEMENT_TIMEOUT: Duration = Duration::from_mins(2);
57
58// TODO: Maybe use or remove if database handles this completely on its own
59// /// How deep (in segments) should block be in order to be finalized.
60// ///
61// /// This is required for full nodes to not prune recent history such that keep-up sync in
62// /// Substrate works even without archival nodes (initial sync will be done from DSN).
63// ///
64// /// Ideally, we'd decouple pruning from finalization, but it may require invasive changes in
65// /// Substrate and is not worth it right now.
66// /// https://github.com/paritytech/substrate/discussions/14359
67// const FINALIZATION_DEPTH_IN_SEGMENTS: SegmentIndex = SegmentIndex::new(5);
68
69/// Notification with a new archived segment that was just archived
70#[derive(Debug)]
71pub struct ArchivedSegmentNotification {
72 /// Archived segment.
73 pub archived_segment: Arc<NewArchivedSegment>,
74 /// Sender that signified the fact of receiving archived segment by farmer.
75 ///
76 /// This must be used to send a message or else block import pipeline will get stuck.
77 pub acknowledgement_sender: mpsc::Sender<()>,
78}
79
80/// Notification with incrementally generated object mappings for a block (and any previous block
81/// continuation)
82#[derive(Debug, Clone)]
83pub struct ObjectMappingNotification {
84 /// Incremental object mappings for a block (and any previous block continuation).
85 ///
86 /// The archived data won't be available in pieces until the entire segment is full and
87 /// archived.
88 pub object_mapping: Vec<GlobalObject>,
89 /// The block that these mappings are from.
90 pub block_number: BlockNumber,
91 // TODO: add an acknowledgement_sender for backpressure if needed
92}
93
94/// Whether to create object mappings.
95#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
96pub enum CreateObjectMappings {
97 /// Start creating object mappings from this block number.
98 ///
99 /// This can be lower than the latest archived block, but must be greater than genesis.
100 ///
101 /// The genesis block doesn't have mappings, so starting mappings at genesis is pointless.
102 /// The archiver will fail if it can't get the data for this block, but snap sync doesn't store
103 /// the genesis data on disk. So avoiding genesis also avoids this error.
104 /// <https://github.com/paritytech/polkadot-sdk/issues/5366>
105 Block(NonZeroU64),
106 /// Create object mappings as archiving is happening
107 Yes,
108 /// Don't create object mappings
109 #[default]
110 No,
111}
112
113impl CreateObjectMappings {
114 /// The fixed block number to start creating object mappings from.
115 /// If there is no fixed block number, or mappings are disabled, returns None.
116 fn block(&self) -> Option<BlockNumber> {
117 match self {
118 CreateObjectMappings::Block(block) => Some(BlockNumber::new(block.get())),
119 CreateObjectMappings::Yes => None,
120 CreateObjectMappings::No => None,
121 }
122 }
123
124 /// Returns true if object mappings will be created from a past or future block.
125 pub fn is_enabled(&self) -> bool {
126 !matches!(self, CreateObjectMappings::No)
127 }
128
129 /// Does the supplied block number need object mappings?
130 pub fn is_enabled_for_block(&self, block: BlockNumber) -> bool {
131 if !self.is_enabled() {
132 return false;
133 }
134
135 if let Some(target_block) = self.block() {
136 return block >= target_block;
137 }
138
139 // We're continuing where we left off, so all blocks get mappings.
140 true
141 }
142}
143
144async fn find_last_archived_block<Block, CI, COM>(
145 chain_info: &CI,
146 best_block_number_to_archive: BlockNumber,
147 best_block_root: &BlockRoot,
148 create_object_mappings: Option<COM>,
149) -> Option<(SegmentHeader, Block, Vec<BlockObject>)>
150where
151 Block: GenericOwnedBlock,
152 CI: ChainInfo<Block>,
153 COM: Fn(&Block) -> Vec<BlockObject>,
154{
155 let max_segment_index = chain_info.max_segment_index()?;
156
157 if max_segment_index == SegmentIndex::ZERO {
158 // Just genesis, nothing else to check
159 return None;
160 }
161
162 for segment_header in (SegmentIndex::ZERO..=max_segment_index)
163 .rev()
164 .filter_map(|segment_index| chain_info.get_segment_header(segment_index))
165 {
166 let last_archived_block_number = segment_header.last_archived_block.number();
167
168 if last_archived_block_number > best_block_number_to_archive {
169 // Last archived block in segment header is too high for the current state of the chain
170 // (segment headers store may know about more blocks in existence than is currently
171 // imported)
172 continue;
173 }
174
175 let Some(last_archived_block_header) =
176 chain_info.ancestor_header(last_archived_block_number, best_block_root)
177 else {
178 // This block number is not in our chain yet (segment headers store may know about more
179 // blocks in existence than is currently imported)
180 continue;
181 };
182
183 let last_archived_block_root = &*last_archived_block_header.header().root();
184
185 let Ok(last_archived_block) = chain_info.block(last_archived_block_root).await else {
186 // Block might have been pruned between ancestor search and disk read
187 continue;
188 };
189
190 // If we're starting mapping creation at this block, return its mappings
191 let block_object_mappings = if let Some(create_object_mappings) = create_object_mappings {
192 create_object_mappings(&last_archived_block)
193 } else {
194 Vec::new()
195 };
196
197 return Some((segment_header, last_archived_block, block_object_mappings));
198 }
199
200 None
201}
202
203/// Derive the genesis segment on demand, returns `Ok(None)` in case the genesis block was already
204/// pruned
205pub fn recreate_genesis_segment(
206 owned_genesis_block: &OwnedBeaconChainBlock,
207 erasure_coding: ErasureCoding,
208) -> NewArchivedSegment {
209 let encoded_block = encode_block(owned_genesis_block);
210
211 // There are no mappings in the genesis block, so they can be ignored
212 let block_outcome = Archiver::new(erasure_coding)
213 .add_block(encoded_block, Vec::new())
214 .expect("Block is never empty and doesn't exceed u32; qed");
215 block_outcome
216 .archived_segments
217 .into_iter()
218 .next()
219 .expect("Genesis block always results in exactly one archived segment; qed")
220}
221
222/// Encode block for archiving purposes
223pub fn encode_block<Block>(block: &Block) -> Vec<u8>
224where
225 Block: GenericOwnedBlock,
226{
227 let is_beacon_chain_genesis_block = Block::Block::SHARD_KIND == RealShardKind::BeaconChain
228 && block.header().header().prefix.number == BlockNumber::ZERO;
229 let header_buffer = block.header().buffer();
230 let body_buffer = block.body().buffer();
231
232 // TODO: Extra allocation here is unfortunate, would be nice to avoid it
233 let mut encoded_block = Vec::with_capacity(if is_beacon_chain_genesis_block {
234 RecordedHistorySegment::SIZE
235 } else {
236 size_of::<u32>() * 2 + header_buffer.len() as usize + body_buffer.len() as usize
237 });
238
239 encoded_block.extend_from_slice(&header_buffer.len().to_le_bytes());
240 encoded_block.extend_from_slice(&body_buffer.len().to_le_bytes());
241 encoded_block.extend_from_slice(header_buffer);
242 encoded_block.extend_from_slice(body_buffer);
243
244 if is_beacon_chain_genesis_block {
245 let encoded_block_length = encoded_block.len();
246
247 // Encoding of the genesis block is extended with extra data such that the very first
248 // archived segment can be produced right away, bootstrapping the farming process.
249 //
250 // Note: we add it to the end of the encoded block, so during decoding it'll actually be
251 // ignored even though it is technically present in encoded form.
252 encoded_block.resize(RecordedHistorySegment::SIZE, 0);
253 let mut rng = ChaCha8Rng::from_seed(*block.header().header().result.state_root);
254 rng.fill_bytes(&mut encoded_block[encoded_block_length..]);
255 }
256
257 encoded_block
258}
259
260/// Symmetrical to [`encode_block()`], used to decode previously encoded blocks
261pub fn decode_block<Block>(mut encoded_block: &[u8]) -> Option<Block>
262where
263 Block: GenericOwnedBlock,
264{
265 let header_length = {
266 let header_length = encoded_block.split_off(..size_of::<u32>())?;
267 u32::from_le_bytes([
268 header_length[0],
269 header_length[1],
270 header_length[2],
271 header_length[3],
272 ])
273 };
274 let body_length = {
275 let body_length = encoded_block.split_off(..size_of::<u32>())?;
276 u32::from_le_bytes([
277 body_length[0],
278 body_length[1],
279 body_length[2],
280 body_length[3],
281 ])
282 };
283
284 let header_buffer = encoded_block.split_off(..header_length as usize)?;
285 let body_buffer = encoded_block.split_off(..body_length as usize)?;
286
287 let header_buffer = SharedAlignedBuffer::from_bytes(header_buffer);
288 let body_buffer = SharedAlignedBuffer::from_bytes(body_buffer);
289
290 Block::from_buffers(header_buffer, body_buffer)
291}
292
293/// Archiver task error
294#[derive(Debug, thiserror::Error)]
295pub enum ArchiverTaskError {
296 /// Archiver instantiation error
297 #[error("Archiver instantiation error: {error}")]
298 Instantiation {
299 /// Low-level error
300 #[from]
301 error: ArchiverInstantiationError,
302 },
303 /// Failed to persis a new segment header
304 #[error("Failed to persis a new segment header: {error}")]
305 PersistSegmentHeaders {
306 /// Low-level error
307 #[from]
308 error: PersistSegmentHeadersError,
309 },
310 /// Attempt to switch to a different fork beyond archiving depth
311 #[error(
312 "Attempt to switch to a different fork beyond archiving depth: parent block root \
313 {parent_block_root}, best archived block root {best_archived_block_root}"
314 )]
315 ArchivingReorg {
316 /// Parent block root
317 parent_block_root: BlockRoot,
318 /// Best archived block root
319 best_archived_block_root: BlockRoot,
320 },
321 /// There was a gap in blockchain history, and the last contiguous series of blocks doesn't
322 /// start with the archived segment
323 #[error(
324 "There was a gap in blockchain history, and the last contiguous series of blocks doesn't \
325 start with the archived segment (best archived block number {best_archived_block_number}, \
326 block number to archive {block_number_to_archive}), block about to be imported \
327 {importing_block_number})"
328 )]
329 BlockGap {
330 /// Best archived block number
331 best_archived_block_number: BlockNumber,
332 /// Block number to archive
333 block_number_to_archive: BlockNumber,
334 /// Importing block number
335 importing_block_number: BlockNumber,
336 },
337}
338
339struct InitializedArchiver {
340 archiver: Archiver,
341 best_archived_block: (BlockRoot, BlockNumber),
342}
343
344async fn initialize_archiver<Block, CI>(
345 chain_info: &CI,
346 confirmation_depth_k: BlockNumber,
347 create_object_mappings: CreateObjectMappings,
348 erasure_coding: ErasureCoding,
349) -> Result<InitializedArchiver, ArchiverTaskError>
350where
351 Block: GenericOwnedBlock,
352 CI: ChainInfoWrite<Block>,
353{
354 let best_block_header = chain_info.best_header();
355 let best_block_root = *best_block_header.header().root();
356 let best_block_number: BlockNumber = best_block_header.header().prefix.number;
357
358 let mut best_block_to_archive = best_block_number.saturating_sub(confirmation_depth_k);
359 // Choose a lower block number if we want to get mappings from that specific block.
360 // If we are continuing from where we left off, we don't need to change the block number to
361 // archive. If there is no path to this block from the tip due to snap sync, we'll start
362 // archiving from an earlier segment, then start mapping again once archiving reaches this
363 // block.
364 if let Some(block_number) = create_object_mappings.block() {
365 // There aren't any mappings in the genesis block, so starting there is pointless.
366 // (And causes errors on restart, because genesis block data is never stored during snap
367 // sync.)
368 best_block_to_archive = best_block_to_archive.min(block_number);
369 }
370
371 if (best_block_to_archive..best_block_number).any(|block_number| {
372 chain_info
373 .ancestor_header(block_number, &best_block_root)
374 .is_none()
375 }) {
376 // If there are blocks missing headers between best block to archive and best block of the
377 // blockchain it means newer block was inserted in some special way and as such is by
378 // definition valid, so we can simply assume that is our best block to archive instead
379 best_block_to_archive = best_block_number;
380 }
381
382 // TODO: Uncomment once API for object mapping is established
383 // // If the user chooses an object mapping start block we don't have data or state for, we
384 // // can't create mappings for it, so the node must exit with an error. We ignore genesis
385 // // here, because it doesn't have mappings.
386 // if create_object_mappings.is_enabled() && best_block_to_archive >= BlockNumber::ONE {
387 // let Some(best_block_to_archive_root) = client.root(NumberFor::<Block>::saturated_from(
388 // best_block_to_archive.as_u64(),
389 // ))?
390 // else {
391 // let error = format!(
392 // "Missing root for mapping block {best_block_to_archive}, \
393 // try a higher block number, or wipe your node and restart with `--sync full`"
394 // );
395 // return Err(sp_blockchain::Error::Application(error.into()));
396 // };
397 //
398 // let Some(_best_block_data) = client.block(best_block_to_archive_root)? else {
399 // let error = format!(
400 // "Missing data for mapping block {best_block_to_archive} \
401 // root {best_block_to_archive_root}, \
402 // try a higher block number, or wipe your node and restart with `--sync full`"
403 // );
404 // return Err(sp_blockchain::Error::Application(error.into()));
405 // };
406 //
407 // // Similarly, state can be pruned, even if the data is present
408 // // TODO: Injection of external logic
409 // // client
410 // // .runtime_api()
411 // // .extract_block_object_mapping(
412 // // *best_block_data.block.header().parent_root(),
413 // // best_block_data.block.clone(),
414 // // )
415 // // .map_err(|error| {
416 // // sp_blockchain::Error::Application(
417 // // format!(
418 // // "Missing state for mapping block {best_block_to_archive} \
419 // // root {best_block_to_archive_root}: {error}, \
420 // // try a higher block number, or wipe your node and restart with
421 // // `--sync full`"
422 // // )
423 // // .into(),
424 // // )
425 // // })?;
426 // }
427
428 let maybe_last_archived_block = find_last_archived_block(
429 chain_info,
430 best_block_to_archive,
431 &best_block_root,
432 create_object_mappings
433 .is_enabled()
434 .then_some(|_block: &Block| {
435 // TODO: Injection of external logic
436 // let parent_root = *block.header().parent_root();
437 // client
438 // .runtime_api()
439 // .extract_block_object_mapping(parent_root, block)
440 // .unwrap_or_default()
441 Vec::new()
442 }),
443 )
444 .await;
445
446 let have_last_segment_header = maybe_last_archived_block.is_some();
447 let mut best_archived_block = None::<(BlockRoot, BlockNumber)>;
448
449 let mut archiver =
450 if let Some((last_segment_header, last_archived_block, block_object_mappings)) =
451 maybe_last_archived_block
452 {
453 // Continuing from existing initial state
454 let last_archived_block_number = last_segment_header.last_archived_block.number;
455 info!(
456 %last_archived_block_number,
457 "Resuming archiver from last archived block",
458 );
459
460 let last_archived_block_header = last_archived_block.header().header();
461 // Set initial value, this is needed in case only genesis block was archived and there
462 // is nothing else available
463 best_archived_block.replace((
464 *last_archived_block_header.root(),
465 last_archived_block_header.prefix.number,
466 ));
467
468 let last_archived_block_encoded = encode_block(&last_archived_block);
469
470 Archiver::with_initial_state(
471 erasure_coding,
472 last_segment_header,
473 &last_archived_block_encoded,
474 block_object_mappings,
475 )?
476 } else {
477 info!("Starting archiving from genesis");
478
479 Archiver::new(erasure_coding)
480 };
481
482 // Process blocks since last fully archived block up to the current head minus K
483 {
484 let blocks_to_archive_from = archiver
485 .last_archived_block_number()
486 .map(|n| n + BlockNumber::ONE)
487 .unwrap_or_default();
488 let blocks_to_archive_to = best_block_number
489 .checked_sub(confirmation_depth_k)
490 .filter(|&blocks_to_archive_to| blocks_to_archive_to >= blocks_to_archive_from)
491 .or({
492 if have_last_segment_header {
493 None
494 } else {
495 // If not continuation, archive genesis block
496 Some(BlockNumber::ZERO)
497 }
498 });
499
500 if let Some(blocks_to_archive_to) = blocks_to_archive_to {
501 info!(
502 "Archiving already produced blocks {}..={}",
503 blocks_to_archive_from, blocks_to_archive_to,
504 );
505
506 for block_number_to_archive in blocks_to_archive_from..=blocks_to_archive_to {
507 let header = chain_info
508 .ancestor_header(block_number_to_archive, &best_block_root)
509 .expect("All blocks since last archived must be present; qed");
510
511 let block = chain_info
512 .block(&header.header().root())
513 .await
514 .expect("All blocks since last archived must be present; qed");
515
516 let block_object_mappings =
517 if create_object_mappings.is_enabled_for_block(block_number_to_archive) {
518 // TODO: Injection of external logic
519 // runtime_api
520 // .extract_block_object_mapping(
521 // *block.block.header().parent_root(),
522 // block.block.clone(),
523 // )
524 // .unwrap_or_default()
525 Vec::new()
526 } else {
527 Vec::new()
528 };
529
530 let encoded_block = encode_block(&block);
531
532 debug!(
533 "Encoded block {} has size of {}",
534 block_number_to_archive,
535 ByteSize::b(encoded_block.len() as u64).display().iec(),
536 );
537
538 let block_outcome = archiver
539 .add_block(encoded_block, block_object_mappings)
540 .expect("Block is never empty and doesn't exceed u32; qed");
541 // TODO: Allow to capture these from the outside
542 // send_object_mapping_notification(
543 // &subspace_link.object_mapping_notification_sender,
544 // block_outcome.global_objects,
545 // block_number_to_archive,
546 // );
547 let new_segment_headers: Vec<SegmentHeader> = block_outcome
548 .archived_segments
549 .iter()
550 .map(|archived_segment| archived_segment.segment_header)
551 .collect();
552
553 if !new_segment_headers.is_empty() {
554 chain_info
555 .persist_segment_headers(new_segment_headers)
556 .await?;
557 }
558
559 if block_number_to_archive == blocks_to_archive_to {
560 best_archived_block.replace((*header.header().root(), block_number_to_archive));
561 }
562 }
563 }
564 }
565
566 Ok(InitializedArchiver {
567 archiver,
568 best_archived_block: best_archived_block
569 .expect("Must always set if there is no logical error; qed"),
570 })
571}
572
573/// Create an archiver task.
574///
575/// Archiver task will listen for importing blocks and archive blocks at `K` depth, producing pieces
576/// and segment headers (segment headers are then added back to the blockchain in the next block).
577///
578/// NOTE: Archiver is doing blocking operations and must run in a dedicated task.
579///
580/// Archiver is only able to move forward and doesn't support reorgs. Upon restart, it will check
581/// segments in [`ChainInfo`] and chain history to reconstruct the "current" state it was in before
582/// the last shutdown and continue incrementally archiving blockchain history from there.
583///
584/// Archiving is triggered by block importing notification (`block_importing_notification_receiver`)
585/// and tries to archive the block at [`ConsensusConstants::confirmation_depth_k`] depth from the
586/// block being imported. Block importing will then wait for archiver to acknowledge processing,
587/// which is necessary for ensuring that when the next block is imported, the body will contain a
588/// segment header of the newly archived segment.
589///
590/// `create_object_mappings` controls when object mappings are created for archived blocks. When
591/// these mappings are created.
592///
593/// Once a new segment is archived, a notification (`archived_segment_notification_sender`) will be
594/// sent and archiver will be paused until all receivers have provided an acknowledgement for it.
595pub async fn create_archiver_task<Block, CI>(
596 chain_info: CI,
597 mut block_importing_notification_receiver: mpsc::Receiver<BlockImportingNotification>,
598 mut archived_segment_notification_sender: mpsc::Sender<ArchivedSegmentNotification>,
599 consensus_constants: ConsensusConstants,
600 create_object_mappings: CreateObjectMappings,
601 erasure_coding: ErasureCoding,
602) -> Result<impl Future<Output = Result<(), ArchiverTaskError>> + Send + 'static, ArchiverTaskError>
603where
604 Block: GenericOwnedBlock,
605 CI: ChainInfoWrite<Block> + 'static,
606{
607 if create_object_mappings.is_enabled() {
608 info!(
609 ?create_object_mappings,
610 "Creating object mappings from the configured block onwards"
611 );
612 } else {
613 info!("Not creating object mappings");
614 }
615
616 let maybe_archiver = if chain_info.max_segment_index().is_none() {
617 let initialize_archiver_fut = initialize_archiver(
618 &chain_info,
619 consensus_constants.confirmation_depth_k,
620 create_object_mappings,
621 erasure_coding.clone(),
622 );
623 Some(initialize_archiver_fut.await?)
624 } else {
625 None
626 };
627
628 Ok(async move {
629 let archiver = match maybe_archiver {
630 Some(archiver) => archiver,
631 None => {
632 let initialize_archiver_fut = initialize_archiver(
633 &chain_info,
634 consensus_constants.confirmation_depth_k,
635 create_object_mappings,
636 erasure_coding.clone(),
637 );
638 initialize_archiver_fut.await?
639 }
640 };
641
642 let InitializedArchiver {
643 mut archiver,
644 best_archived_block,
645 } = archiver;
646 let (mut best_archived_block_root, mut best_archived_block_number) = best_archived_block;
647
648 while let Some(block_importing_notification) =
649 block_importing_notification_receiver.next().await
650 {
651 let importing_block_number = block_importing_notification.block_number;
652 let block_number_to_archive = match importing_block_number
653 .checked_sub(consensus_constants.confirmation_depth_k)
654 {
655 Some(block_number_to_archive) => block_number_to_archive,
656 None => {
657 // Too early to archive blocks
658 continue;
659 }
660 };
661
662 let last_archived_block_number = chain_info
663 .last_segment_header()
664 .expect("Exists after archiver initialization; qed")
665 .last_archived_block
666 .number();
667 let create_mappings =
668 create_object_mappings.is_enabled_for_block(last_archived_block_number);
669 trace!(
670 %importing_block_number,
671 %block_number_to_archive,
672 %best_archived_block_number,
673 %last_archived_block_number,
674 "Checking if block needs to be skipped"
675 );
676
677 // Skip archived blocks, unless we're producing object mappings for them
678 let skip_last_archived_blocks =
679 last_archived_block_number > block_number_to_archive && !create_mappings;
680 if best_archived_block_number >= block_number_to_archive || skip_last_archived_blocks {
681 // This block was already archived, skip
682 debug!(
683 %importing_block_number,
684 %block_number_to_archive,
685 %best_archived_block_number,
686 %last_archived_block_number,
687 "Skipping already archived block",
688 );
689 continue;
690 }
691
692 let best_block_root = chain_info.best_root();
693
694 // In case there was a block gap, re-initialize archiver and continue with the current
695 // block number (rather than block number at some depth) to allow for special sync
696 // modes where pre-verified blocks are inserted at some point in the future comparing to
697 // previously existing blocks
698 if best_archived_block_number + BlockNumber::ONE != block_number_to_archive {
699 let initialize_archiver_fut = initialize_archiver(
700 &chain_info,
701 consensus_constants.confirmation_depth_k,
702 create_object_mappings,
703 erasure_coding.clone(),
704 );
705 InitializedArchiver {
706 archiver,
707 best_archived_block: (best_archived_block_root, best_archived_block_number),
708 } = initialize_archiver_fut.await?;
709
710 if best_archived_block_number + BlockNumber::ONE == block_number_to_archive {
711 // As expected, can archive this block
712 } else if best_archived_block_number >= block_number_to_archive {
713 // Special sync mode where verified blocks were inserted into the blockchain
714 // directly, archiving of this block will naturally happen later
715 continue;
716 } else if chain_info
717 .ancestor_header(importing_block_number - BlockNumber::ONE, &best_block_root)
718 .is_none()
719 {
720 // We may have imported some block using special sync mode, so the block about
721 // to be imported is the first one after the gap at which archiver is supposed
722 // to be initialized, but we are only about to import it, so wait for the next
723 // block for now
724 continue;
725 } else {
726 return Err(ArchiverTaskError::BlockGap {
727 best_archived_block_number,
728 block_number_to_archive,
729 importing_block_number,
730 });
731 }
732 }
733
734 (best_archived_block_root, best_archived_block_number) = archive_block(
735 &mut archiver,
736 &chain_info,
737 &mut archived_segment_notification_sender,
738 best_archived_block_root,
739 block_number_to_archive,
740 &best_block_root,
741 create_object_mappings,
742 )
743 .await?;
744 }
745
746 Ok(())
747 })
748}
749
750/// Tries to archive `block_number` and returns new (or old if not changed) best archived block
751async fn archive_block<Block, CI>(
752 archiver: &mut Archiver,
753 chain_info: &CI,
754 // TODO: Probably remove
755 // object_mapping_notification_sender: SubspaceNotificationSender<ObjectMappingNotification>,
756 archived_segment_notification_sender: &mut mpsc::Sender<ArchivedSegmentNotification>,
757 best_archived_block_root: BlockRoot,
758 block_number_to_archive: BlockNumber,
759 best_block_root: &BlockRoot,
760 create_object_mappings: CreateObjectMappings,
761) -> Result<(BlockRoot, BlockNumber), ArchiverTaskError>
762where
763 Block: GenericOwnedBlock,
764 CI: ChainInfoWrite<Block>,
765{
766 let header = chain_info
767 .ancestor_header(block_number_to_archive, best_block_root)
768 .expect("All blocks since last archived must be present; qed");
769
770 let parent_block_root = header.header().prefix.parent_root;
771 if parent_block_root != best_archived_block_root {
772 return Err(ArchiverTaskError::ArchivingReorg {
773 parent_block_root,
774 best_archived_block_root,
775 });
776 }
777
778 let block_root_to_archive = *header.header().root();
779
780 let block = chain_info
781 .block(&block_root_to_archive)
782 .await
783 .expect("All blocks since last archived must be present; qed");
784
785 debug!("Archiving block {block_number_to_archive} ({block_root_to_archive})");
786
787 let create_mappings = create_object_mappings.is_enabled_for_block(block_number_to_archive);
788
789 let block_object_mappings = if create_mappings {
790 // TODO: Injection of external logic
791 // client
792 // .runtime_api()
793 // .extract_block_object_mapping(parent_block_root, block.block.clone())
794 // .map_err(|error| {
795 // sp_blockchain::Error::Application(
796 // format!("Failed to retrieve block object mappings: {error}").into(),
797 // )
798 // })?
799 Vec::new()
800 } else {
801 Vec::new()
802 };
803
804 let encoded_block = encode_block(&block);
805 debug!(
806 "Encoded block {block_number_to_archive} has size of {}",
807 ByteSize::b(encoded_block.len() as u64).display().iec(),
808 );
809
810 let block_outcome = archiver
811 .add_block(encoded_block, block_object_mappings)
812 .expect("Block is never empty and doesn't exceed u32; qed");
813 // TODO: Probably remove
814 // send_object_mapping_notification(
815 // &object_mapping_notification_sender,
816 // block_outcome.global_objects,
817 // block_number_to_archive,
818 // );
819 for archived_segment in block_outcome.archived_segments {
820 let segment_header = archived_segment.segment_header;
821
822 chain_info
823 .persist_segment_headers(vec![segment_header])
824 .await?;
825
826 send_archived_segment_notification(archived_segment_notification_sender, archived_segment)
827 .await;
828 }
829
830 Ok((block_root_to_archive, block_number_to_archive))
831}
832
833// TODO: Probably remove
834// fn send_object_mapping_notification(
835// object_mapping_notification_sender: &SubspaceNotificationSender<ObjectMappingNotification>,
836// object_mapping: Vec<GlobalObject>,
837// block_number: BlockNumber,
838// ) {
839// if object_mapping.is_empty() {
840// return;
841// }
842//
843// let object_mapping_notification = ObjectMappingNotification {
844// object_mapping,
845// block_number,
846// };
847//
848// object_mapping_notification_sender.notify(move || object_mapping_notification);
849// }
850
851async fn send_archived_segment_notification(
852 archived_segment_notification_sender: &mut mpsc::Sender<ArchivedSegmentNotification>,
853 archived_segment: NewArchivedSegment,
854) {
855 let segment_index = archived_segment.segment_header.segment_index;
856 let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(1);
857 // Keep `archived_segment` around until all acknowledgements are received since some receivers
858 // might use weak references
859 let archived_segment = Arc::new(archived_segment);
860 let archived_segment_notification = ArchivedSegmentNotification {
861 archived_segment: Arc::clone(&archived_segment),
862 acknowledgement_sender,
863 };
864
865 if let Err(error) = archived_segment_notification_sender
866 .send(archived_segment_notification)
867 .await
868 {
869 warn!(
870 %error,
871 "Failed to send archived segment notification"
872 );
873 }
874
875 let wait_fut = async {
876 while acknowledgement_receiver.next().await.is_some() {
877 debug!(
878 "Archived segment notification acknowledged: {}",
879 segment_index
880 );
881 }
882 };
883
884 if tokio::time::timeout(ACKNOWLEDGEMENT_TIMEOUT, wait_fut)
885 .await
886 .is_err()
887 {
888 warn!(
889 "Archived segment notification was not acknowledged and reached timeout, continue \
890 regardless"
891 );
892 }
893}