1use ab_aligned_buffer::SharedAlignedBuffer;
26use ab_archiving::archiver::{Archiver, ArchiverInstantiationError, NewArchivedSegment};
27use ab_client_api::{ChainInfo, ChainInfoWrite, PersistSegmentHeadersError};
28use ab_client_consensus_common::{BlockImportingNotification, ConsensusConstants};
29use ab_core_primitives::block::body::owned::GenericOwnedBlockBody;
30use ab_core_primitives::block::header::GenericBlockHeader;
31use ab_core_primitives::block::header::owned::GenericOwnedBlockHeader;
32use ab_core_primitives::block::owned::{GenericOwnedBlock, OwnedBeaconChainBlock};
33use ab_core_primitives::block::{BlockNumber, BlockRoot, GenericBlock};
34use ab_core_primitives::segments::{
35 LocalSegmentIndex, RecordedHistorySegment, SegmentHeader, SuperSegmentIndex,
36};
37use ab_core_primitives::shard::{RealShardKind, ShardIndex};
38use ab_erasure_coding::ErasureCoding;
39use bytesize::ByteSize;
40use chacha20::ChaCha8Rng;
41use chacha20::rand_core::{Rng, SeedableRng};
42use futures::channel::mpsc;
43use futures::prelude::*;
44use std::sync::Arc;
45use std::time::Duration;
46use tracing::{debug, info, trace, warn};
47
48const ACKNOWLEDGEMENT_TIMEOUT: Duration = Duration::from_mins(2);
50
51#[derive(Debug)]
64pub struct ArchivedSegmentNotification {
65 pub archived_segment: Arc<NewArchivedSegment>,
67 pub acknowledgement_sender: mpsc::Sender<()>,
71}
72
73async fn find_last_archived_block<Block, CI>(
74 chain_info: &CI,
75 best_block_number_to_archive: BlockNumber,
76 best_block_root: &BlockRoot,
77) -> Option<(SegmentHeader, Block)>
78where
79 Block: GenericOwnedBlock,
80 CI: ChainInfo<Block>,
81{
82 let max_local_segment_index = chain_info.last_segment_header()?.segment_index.as_inner();
83
84 if max_local_segment_index == LocalSegmentIndex::ZERO {
85 return None;
87 }
88
89 for segment_header in (LocalSegmentIndex::ZERO..=max_local_segment_index)
90 .rev()
91 .filter_map(|segment_index| chain_info.get_segment_header(segment_index))
92 {
93 let last_archived_block_number = segment_header.last_archived_block.number();
94
95 if last_archived_block_number > best_block_number_to_archive {
96 continue;
100 }
101
102 let Some(last_archived_block_header) =
103 chain_info.ancestor_header(last_archived_block_number, best_block_root)
104 else {
105 continue;
108 };
109
110 let last_archived_block_root = &*last_archived_block_header.header().root();
111
112 let Ok(last_archived_block) = chain_info.block(last_archived_block_root).await else {
113 continue;
115 };
116
117 return Some((segment_header, last_archived_block));
118 }
119
120 None
121}
122
123pub fn recreate_genesis_segment(
126 owned_genesis_block: &OwnedBeaconChainBlock,
127 erasure_coding: ErasureCoding,
128) -> NewArchivedSegment {
129 let encoded_block = encode_block(owned_genesis_block);
130
131 let block_outcome = Archiver::new(ShardIndex::BEACON_CHAIN, erasure_coding)
132 .add_block(encoded_block, Vec::new())
133 .expect("Block is never empty and doesn't exceed u32; qed");
134 let mut archived_segment = block_outcome
135 .archived_segments
136 .into_iter()
137 .next()
138 .expect("Genesis block always results in exactly one archived segment; qed");
139
140 for piece in archived_segment.pieces.iter_mut() {
141 piece.header.super_segment_index = SuperSegmentIndex::ZERO.into();
142 }
144
145 archived_segment.pieces = archived_segment.pieces.to_shared();
146
147 archived_segment
148}
149
150pub fn encode_block<Block>(block: &Block) -> Vec<u8>
152where
153 Block: GenericOwnedBlock,
154{
155 let is_beacon_chain_genesis_block = Block::Block::SHARD_KIND == RealShardKind::BeaconChain
156 && block.header().header().prefix.number == BlockNumber::ZERO;
157 let header_buffer = block.header().buffer();
158 let body_buffer = block.body().buffer();
159
160 let mut encoded_block = Vec::with_capacity(if is_beacon_chain_genesis_block {
162 RecordedHistorySegment::SIZE
163 } else {
164 size_of::<u32>() * 2 + header_buffer.len() as usize + body_buffer.len() as usize
165 });
166
167 encoded_block.extend_from_slice(&header_buffer.len().to_le_bytes());
168 encoded_block.extend_from_slice(&body_buffer.len().to_le_bytes());
169 encoded_block.extend_from_slice(header_buffer);
170 encoded_block.extend_from_slice(body_buffer);
171
172 if is_beacon_chain_genesis_block {
173 let encoded_block_length = encoded_block.len();
174
175 encoded_block.resize(RecordedHistorySegment::SIZE, 0);
181 let mut rng = ChaCha8Rng::from_seed(*block.header().header().result.state_root);
182 rng.fill_bytes(&mut encoded_block[encoded_block_length..]);
183 }
184
185 encoded_block
186}
187
188pub fn decode_block<Block>(mut encoded_block: &[u8]) -> Option<Block>
190where
191 Block: GenericOwnedBlock,
192{
193 let header_length = {
194 let header_length = encoded_block.split_off(..size_of::<u32>())?;
195 u32::from_le_bytes([
196 header_length[0],
197 header_length[1],
198 header_length[2],
199 header_length[3],
200 ])
201 };
202 let body_length = {
203 let body_length = encoded_block.split_off(..size_of::<u32>())?;
204 u32::from_le_bytes([
205 body_length[0],
206 body_length[1],
207 body_length[2],
208 body_length[3],
209 ])
210 };
211
212 let header_buffer = encoded_block.split_off(..header_length as usize)?;
213 let body_buffer = encoded_block.split_off(..body_length as usize)?;
214
215 let header_buffer = SharedAlignedBuffer::from_bytes(header_buffer);
216 let body_buffer = SharedAlignedBuffer::from_bytes(body_buffer);
217
218 Block::from_buffers(header_buffer, body_buffer)
219}
220
221#[derive(Debug, thiserror::Error)]
223pub enum SegmentArchiverTaskError {
224 #[error("Archiver instantiation error: {error}")]
226 Instantiation {
227 #[from]
229 error: ArchiverInstantiationError,
230 },
231 #[error("Failed to persist a new segment header: {error}")]
233 PersistSegmentHeaders {
234 #[from]
236 error: PersistSegmentHeadersError,
237 },
238 #[error(
240 "Attempt to switch to a different fork beyond archiving depth: parent block root \
241 {parent_block_root}, best archived block root {best_archived_block_root}"
242 )]
243 ArchivingReorg {
244 parent_block_root: BlockRoot,
246 best_archived_block_root: BlockRoot,
248 },
249 #[error(
252 "There was a gap in blockchain history, and the last contiguous series of blocks doesn't \
253 start with the archived segment (best archived block number {best_archived_block_number}, \
254 block number to archive {block_number_to_archive}), block about to be imported \
255 {importing_block_number})"
256 )]
257 BlockGap {
258 best_archived_block_number: BlockNumber,
260 block_number_to_archive: BlockNumber,
262 importing_block_number: BlockNumber,
264 },
265}
266
267struct InitializedArchiver {
268 archiver: Archiver,
269 best_archived_block: (BlockRoot, BlockNumber),
270}
271
272async fn initialize_archiver<Block, CI>(
273 chain_info: &CI,
274 block_confirmation_depth: BlockNumber,
275 erasure_coding: ErasureCoding,
276) -> Result<InitializedArchiver, SegmentArchiverTaskError>
277where
278 Block: GenericOwnedBlock,
279 CI: ChainInfoWrite<Block>,
280{
281 let best_block_header = chain_info.best_header();
282 let best_block_root = *best_block_header.header().root();
283 let best_block_number: BlockNumber = best_block_header.header().prefix.number;
284
285 let mut best_block_to_archive = best_block_number.saturating_sub(block_confirmation_depth);
286
287 if (best_block_to_archive..best_block_number).any(|block_number| {
288 chain_info
289 .ancestor_header(block_number, &best_block_root)
290 .is_none()
291 }) {
292 best_block_to_archive = best_block_number;
296 }
297
298 let maybe_last_archived_block =
299 find_last_archived_block(chain_info, best_block_to_archive, &best_block_root).await;
300
301 let have_last_segment_header = maybe_last_archived_block.is_some();
302 let mut best_archived_block = None::<(BlockRoot, BlockNumber)>;
303
304 let mut archiver =
305 if let Some((last_segment_header, last_archived_block)) = maybe_last_archived_block {
306 let last_archived_block_number = last_segment_header.last_archived_block.number;
308 info!(
309 %last_archived_block_number,
310 "Resuming archiver from last archived block",
311 );
312
313 let last_archived_block_header = last_archived_block.header().header();
314 best_archived_block.replace((
317 *last_archived_block_header.root(),
318 last_archived_block_header.prefix.number,
319 ));
320
321 let last_archived_block_encoded = encode_block(&last_archived_block);
322
323 Archiver::with_initial_state(
324 best_block_header.header().prefix.shard_index,
325 erasure_coding,
326 last_segment_header,
327 &last_archived_block_encoded,
328 Vec::new(),
329 )?
330 } else {
331 info!("Starting archiving from genesis");
332
333 Archiver::new(
334 best_block_header.header().prefix.shard_index,
335 erasure_coding,
336 )
337 };
338
339 {
341 let blocks_to_archive_from = archiver
342 .last_archived_block_number()
343 .map(|n| n + BlockNumber::ONE)
344 .unwrap_or_default();
345 let blocks_to_archive_to = best_block_number
346 .checked_sub(block_confirmation_depth)
347 .filter(|&blocks_to_archive_to| blocks_to_archive_to >= blocks_to_archive_from)
348 .or({
349 if have_last_segment_header {
350 None
351 } else {
352 Some(BlockNumber::ZERO)
354 }
355 });
356
357 if let Some(blocks_to_archive_to) = blocks_to_archive_to {
358 info!(
359 "Archiving already produced blocks {}..={}",
360 blocks_to_archive_from, blocks_to_archive_to,
361 );
362
363 for block_number_to_archive in blocks_to_archive_from..=blocks_to_archive_to {
364 let header = chain_info
365 .ancestor_header(block_number_to_archive, &best_block_root)
366 .expect("All blocks since last archived must be present; qed");
367
368 let block = chain_info
369 .block(&header.header().root())
370 .await
371 .expect("All blocks since last archived must be present; qed");
372
373 let encoded_block = encode_block(&block);
374
375 debug!(
376 "Encoded block {} has size of {}",
377 block_number_to_archive,
378 ByteSize::b(encoded_block.len() as u64).display().iec(),
379 );
380
381 let block_outcome = archiver
382 .add_block(encoded_block, Vec::new())
383 .expect("Block is never empty and doesn't exceed u32; qed");
384 let new_segment_headers: Vec<SegmentHeader> = block_outcome
385 .archived_segments
386 .iter()
387 .map(|archived_segment| archived_segment.segment_header)
388 .collect();
389
390 if !new_segment_headers.is_empty() {
391 chain_info
392 .persist_segment_headers(new_segment_headers)
393 .await?;
394 }
395
396 if block_number_to_archive == blocks_to_archive_to {
397 best_archived_block.replace((*header.header().root(), block_number_to_archive));
398 }
399 }
400 }
401 }
402
403 Ok(InitializedArchiver {
404 archiver,
405 best_archived_block: best_archived_block
406 .expect("Must always set if there is no logical error; qed"),
407 })
408}
409
410pub async fn create_segment_archiver_task<Block, CI>(
433 chain_info: CI,
434 mut block_importing_notification_receiver: mpsc::Receiver<BlockImportingNotification>,
435 mut archived_segment_notification_sender: mpsc::Sender<ArchivedSegmentNotification>,
436 consensus_constants: ConsensusConstants,
437 erasure_coding: ErasureCoding,
438) -> Result<
439 impl Future<Output = Result<(), SegmentArchiverTaskError>> + Send + 'static,
440 SegmentArchiverTaskError,
441>
442where
443 Block: GenericOwnedBlock,
444 CI: ChainInfoWrite<Block> + 'static,
445{
446 let maybe_archiver = if chain_info.last_segment_header().is_none() {
447 let initialize_archiver_fut = initialize_archiver(
448 &chain_info,
449 consensus_constants.block_confirmation_depth,
450 erasure_coding.clone(),
451 );
452 Some(initialize_archiver_fut.await?)
453 } else {
454 None
455 };
456
457 Ok(async move {
458 let archiver = match maybe_archiver {
459 Some(archiver) => archiver,
460 None => {
461 let initialize_archiver_fut = initialize_archiver(
462 &chain_info,
463 consensus_constants.block_confirmation_depth,
464 erasure_coding.clone(),
465 );
466 initialize_archiver_fut.await?
467 }
468 };
469
470 let InitializedArchiver {
471 mut archiver,
472 best_archived_block,
473 } = archiver;
474 let (mut best_archived_block_root, mut best_archived_block_number) = best_archived_block;
475
476 while let Some(block_importing_notification) =
477 block_importing_notification_receiver.next().await
478 {
479 let importing_block_number = block_importing_notification.block_number;
480 let block_number_to_archive = match importing_block_number
481 .checked_sub(consensus_constants.block_confirmation_depth)
482 {
483 Some(block_number_to_archive) => block_number_to_archive,
484 None => {
485 continue;
487 }
488 };
489
490 let last_archived_block_number = chain_info
491 .last_segment_header()
492 .expect("Exists after archiver initialization; qed")
493 .last_archived_block
494 .number();
495 trace!(
496 %importing_block_number,
497 %block_number_to_archive,
498 %best_archived_block_number,
499 %last_archived_block_number,
500 "Checking if block needs to be skipped"
501 );
502
503 let skip_last_archived_blocks = last_archived_block_number > block_number_to_archive;
505 if best_archived_block_number >= block_number_to_archive || skip_last_archived_blocks {
506 debug!(
508 %importing_block_number,
509 %block_number_to_archive,
510 %best_archived_block_number,
511 %last_archived_block_number,
512 "Skipping already archived block",
513 );
514 continue;
515 }
516
517 let best_block_root = chain_info.best_root();
518
519 if best_archived_block_number + BlockNumber::ONE != block_number_to_archive {
524 let initialize_archiver_fut = initialize_archiver(
525 &chain_info,
526 consensus_constants.block_confirmation_depth,
527 erasure_coding.clone(),
528 );
529 InitializedArchiver {
530 archiver,
531 best_archived_block: (best_archived_block_root, best_archived_block_number),
532 } = initialize_archiver_fut.await?;
533
534 if best_archived_block_number + BlockNumber::ONE == block_number_to_archive {
535 } else if best_archived_block_number >= block_number_to_archive {
537 continue;
540 } else if chain_info
541 .ancestor_header(importing_block_number - BlockNumber::ONE, &best_block_root)
542 .is_none()
543 {
544 continue;
549 } else {
550 return Err(SegmentArchiverTaskError::BlockGap {
551 best_archived_block_number,
552 block_number_to_archive,
553 importing_block_number,
554 });
555 }
556 }
557
558 (best_archived_block_root, best_archived_block_number) = archive_block(
559 &mut archiver,
560 &chain_info,
561 &mut archived_segment_notification_sender,
562 best_archived_block_root,
563 block_number_to_archive,
564 &best_block_root,
565 )
566 .await?;
567 }
568
569 Ok(())
570 })
571}
572
573async fn archive_block<Block, CI>(
575 archiver: &mut Archiver,
576 chain_info: &CI,
577 archived_segment_notification_sender: &mut mpsc::Sender<ArchivedSegmentNotification>,
578 best_archived_block_root: BlockRoot,
579 block_number_to_archive: BlockNumber,
580 best_block_root: &BlockRoot,
581) -> Result<(BlockRoot, BlockNumber), SegmentArchiverTaskError>
582where
583 Block: GenericOwnedBlock,
584 CI: ChainInfoWrite<Block>,
585{
586 let header = chain_info
587 .ancestor_header(block_number_to_archive, best_block_root)
588 .expect("All blocks since last archived must be present; qed");
589
590 let parent_block_root = header.header().prefix.parent_root;
591 if parent_block_root != best_archived_block_root {
592 return Err(SegmentArchiverTaskError::ArchivingReorg {
593 parent_block_root,
594 best_archived_block_root,
595 });
596 }
597
598 let block_root_to_archive = *header.header().root();
599
600 let block = chain_info
601 .block(&block_root_to_archive)
602 .await
603 .expect("All blocks since last archived must be present; qed");
604
605 debug!("Archiving block {block_number_to_archive} ({block_root_to_archive})");
606
607 let encoded_block = encode_block(&block);
608 debug!(
609 "Encoded block {block_number_to_archive} has size of {}",
610 ByteSize::b(encoded_block.len() as u64).display().iec(),
611 );
612
613 let block_outcome = archiver
614 .add_block(encoded_block, Vec::new())
615 .expect("Block is never empty and doesn't exceed u32; qed");
616 for archived_segment in block_outcome.archived_segments {
617 let segment_header = archived_segment.segment_header;
618
619 chain_info
620 .persist_segment_headers(vec![segment_header])
621 .await?;
622
623 send_archived_segment_notification(archived_segment_notification_sender, archived_segment)
624 .await;
625 }
626
627 Ok((block_root_to_archive, block_number_to_archive))
628}
629
630async fn send_archived_segment_notification(
631 archived_segment_notification_sender: &mut mpsc::Sender<ArchivedSegmentNotification>,
632 archived_segment: NewArchivedSegment,
633) {
634 let segment_index = archived_segment.segment_header.segment_index;
635 let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(1);
636 let archived_segment = Arc::new(archived_segment);
639 let archived_segment_notification = ArchivedSegmentNotification {
640 archived_segment: Arc::clone(&archived_segment),
641 acknowledgement_sender,
642 };
643
644 if let Err(error) = archived_segment_notification_sender
645 .send(archived_segment_notification)
646 .await
647 {
648 warn!(
649 %error,
650 "Failed to send archived segment notification"
651 );
652 }
653
654 let wait_fut = async {
655 while acknowledgement_receiver.next().await.is_some() {
656 debug!(
657 "Archived segment notification acknowledged: {}",
658 segment_index
659 );
660 }
661 };
662
663 if tokio::time::timeout(ACKNOWLEDGEMENT_TIMEOUT, wait_fut)
664 .await
665 .is_err()
666 {
667 warn!(
668 "Archived segment notification was not acknowledged and reached timeout, continue \
669 regardless"
670 );
671 }
672}