1use crate::segment_headers_store::{SegmentHeaderStoreError, SegmentHeadersStore};
33use ab_aligned_buffer::SharedAlignedBuffer;
34use ab_archiving::archiver::{Archiver, ArchiverInstantiationError, NewArchivedSegment};
35use ab_archiving::objects::{BlockObject, GlobalObject};
36use ab_client_api::ChainInfo;
37use ab_client_consensus_common::{BlockImportingNotification, ConsensusConstants};
38use ab_core_primitives::block::body::owned::GenericOwnedBlockBody;
39use ab_core_primitives::block::header::GenericBlockHeader;
40use ab_core_primitives::block::header::owned::GenericOwnedBlockHeader;
41use ab_core_primitives::block::owned::{GenericOwnedBlock, OwnedBeaconChainBlock};
42use ab_core_primitives::block::{BlockNumber, BlockRoot, GenericBlock};
43use ab_core_primitives::segments::{RecordedHistorySegment, SegmentHeader, SegmentIndex};
44use ab_core_primitives::shard::RealShardKind;
45use ab_erasure_coding::ErasureCoding;
46use bytesize::ByteSize;
47use chacha20::ChaCha8Rng;
48use chacha20::rand_core::{RngCore, SeedableRng};
49use futures::channel::mpsc;
50use futures::prelude::*;
51use std::num::NonZeroU64;
52use std::slice;
53use std::sync::Arc;
54use std::time::Duration;
55use tracing::{debug, info, trace, warn};
56
57const ACKNOWLEDGEMENT_TIMEOUT: Duration = Duration::from_mins(2);
59
60#[derive(Debug)]
73pub struct ArchivedSegmentNotification {
74 pub archived_segment: Arc<NewArchivedSegment>,
76 pub acknowledgement_sender: mpsc::Sender<()>,
80}
81
82#[derive(Debug, Clone)]
85pub struct ObjectMappingNotification {
86 pub object_mapping: Vec<GlobalObject>,
90 pub block_number: BlockNumber,
92 }
94
95#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
97pub enum CreateObjectMappings {
98 Block(NonZeroU64),
107 Yes,
109 #[default]
111 No,
112}
113
114impl CreateObjectMappings {
115 fn block(&self) -> Option<BlockNumber> {
118 match self {
119 CreateObjectMappings::Block(block) => Some(BlockNumber::new(block.get())),
120 CreateObjectMappings::Yes => None,
121 CreateObjectMappings::No => None,
122 }
123 }
124
125 pub fn is_enabled(&self) -> bool {
127 !matches!(self, CreateObjectMappings::No)
128 }
129
130 pub fn is_enabled_for_block(&self, block: BlockNumber) -> bool {
132 if !self.is_enabled() {
133 return false;
134 }
135
136 if let Some(target_block) = self.block() {
137 return block >= target_block;
138 }
139
140 true
142 }
143}
144
145async fn find_last_archived_block<Block, CI, COM>(
146 chain_info: &CI,
147 segment_headers_store: &SegmentHeadersStore,
148 best_block_number_to_archive: BlockNumber,
149 best_block_root: &BlockRoot,
150 create_object_mappings: Option<COM>,
151) -> Option<(SegmentHeader, Block, Vec<BlockObject>)>
152where
153 Block: GenericOwnedBlock,
154 CI: ChainInfo<Block>,
155 COM: Fn(&Block) -> Vec<BlockObject>,
156{
157 let max_segment_index = segment_headers_store.max_segment_index()?;
158
159 if max_segment_index == SegmentIndex::ZERO {
160 return None;
162 }
163
164 for segment_header in (SegmentIndex::ZERO..=max_segment_index)
165 .rev()
166 .filter_map(|segment_index| segment_headers_store.get_segment_header(segment_index))
167 {
168 let last_archived_block_number = segment_header.last_archived_block.number();
169
170 if last_archived_block_number > best_block_number_to_archive {
171 continue;
175 }
176
177 let Some(last_archived_block_header) =
178 chain_info.ancestor_header(last_archived_block_number, best_block_root)
179 else {
180 continue;
183 };
184
185 let last_archived_block_root = &*last_archived_block_header.header().root();
186
187 let Ok(last_archived_block) = chain_info.block(last_archived_block_root).await else {
188 continue;
190 };
191
192 let block_object_mappings = if let Some(create_object_mappings) = create_object_mappings {
194 create_object_mappings(&last_archived_block)
195 } else {
196 Vec::new()
197 };
198
199 return Some((segment_header, last_archived_block, block_object_mappings));
200 }
201
202 None
203}
204
205pub fn recreate_genesis_segment(
208 owned_genesis_block: &OwnedBeaconChainBlock,
209 erasure_coding: ErasureCoding,
210) -> NewArchivedSegment {
211 let encoded_block = encode_block(owned_genesis_block);
212
213 let block_outcome = Archiver::new(erasure_coding)
215 .add_block(encoded_block, Vec::new())
216 .expect("Block is never empty and doesn't exceed u32; qed");
217 block_outcome
218 .archived_segments
219 .into_iter()
220 .next()
221 .expect("Genesis block always results in exactly one archived segment; qed")
222}
223
224pub fn encode_block<Block>(block: &Block) -> Vec<u8>
226where
227 Block: GenericOwnedBlock,
228{
229 let is_genesis_block = Block::Block::SHARD_KIND == RealShardKind::BeaconChain
230 && block.header().header().prefix.number == BlockNumber::ZERO;
231 let header_buffer = block.header().buffer();
232 let body_buffer = block.body().buffer();
233
234 let mut encoded_block = Vec::with_capacity(if is_genesis_block {
236 RecordedHistorySegment::SIZE
237 } else {
238 size_of::<u32>() * 2 + header_buffer.len() as usize + body_buffer.len() as usize
239 });
240
241 encoded_block.extend_from_slice(&header_buffer.len().to_le_bytes());
242 encoded_block.extend_from_slice(&body_buffer.len().to_le_bytes());
243 encoded_block.extend_from_slice(header_buffer);
244 encoded_block.extend_from_slice(body_buffer);
245
246 if is_genesis_block {
247 let encoded_block_length = encoded_block.len();
248
249 encoded_block.resize(RecordedHistorySegment::SIZE, 0);
255 let mut rng = ChaCha8Rng::from_seed(*block.header().header().result.state_root);
256 rng.fill_bytes(&mut encoded_block[encoded_block_length..]);
257 }
258
259 encoded_block
260}
261
262pub fn decode_block<Block>(mut encoded_block: &[u8]) -> Option<Block>
264where
265 Block: GenericOwnedBlock,
266{
267 let header_length = {
268 let header_length = encoded_block.split_off(..size_of::<u32>())?;
269 u32::from_le_bytes([
270 header_length[0],
271 header_length[1],
272 header_length[2],
273 header_length[3],
274 ])
275 };
276 let body_length = {
277 let body_length = encoded_block.split_off(..size_of::<u32>())?;
278 u32::from_le_bytes([
279 body_length[0],
280 body_length[1],
281 body_length[2],
282 body_length[3],
283 ])
284 };
285
286 let header_buffer = encoded_block.split_off(..header_length as usize)?;
287 let body_buffer = encoded_block.split_off(..body_length as usize)?;
288
289 let header_buffer = SharedAlignedBuffer::from_bytes(header_buffer);
290 let body_buffer = SharedAlignedBuffer::from_bytes(body_buffer);
291
292 Block::from_buffers(header_buffer, body_buffer)
293}
294
295#[derive(Debug, thiserror::Error)]
297pub enum ArchiverTaskError {
298 #[error("Archiver instantiation error: {error}")]
300 Instantiation {
301 #[from]
303 error: ArchiverInstantiationError,
304 },
305 #[error("Failed to add a new segment header to the segment headers store: {error}")]
307 SegmentHeaderStore {
308 #[from]
310 error: SegmentHeaderStoreError,
311 },
312 #[error(
314 "Attempt to switch to a different fork beyond archiving depth: parent block root \
315 {parent_block_root}, best archived block root {best_archived_block_root}"
316 )]
317 ArchivingReorg {
318 parent_block_root: BlockRoot,
320 best_archived_block_root: BlockRoot,
322 },
323 #[error(
326 "There was a gap in blockchain history, and the last contiguous series of blocks doesn't \
327 start with the archived segment (best archived block number {best_archived_block_number}, \
328 block number to archive {block_number_to_archive}), block about to be imported \
329 {importing_block_number})"
330 )]
331 BlockGap {
332 best_archived_block_number: BlockNumber,
334 block_number_to_archive: BlockNumber,
336 importing_block_number: BlockNumber,
338 },
339}
340
341struct InitializedArchiver {
342 archiver: Archiver,
343 best_archived_block: (BlockRoot, BlockNumber),
344}
345
346async fn initialize_archiver<Block, CI>(
347 segment_headers_store: &SegmentHeadersStore,
348 chain_info: &CI,
349 confirmation_depth_k: BlockNumber,
350 create_object_mappings: CreateObjectMappings,
351 erasure_coding: ErasureCoding,
352) -> Result<InitializedArchiver, ArchiverTaskError>
353where
354 Block: GenericOwnedBlock,
355 CI: ChainInfo<Block>,
356{
357 let best_block_header = chain_info.best_header();
358 let best_block_root = *best_block_header.header().root();
359 let best_block_number: BlockNumber = best_block_header.header().prefix.number;
360
361 let mut best_block_to_archive = best_block_number.saturating_sub(confirmation_depth_k);
362 if let Some(block_number) = create_object_mappings.block() {
367 best_block_to_archive = best_block_to_archive.min(block_number);
370 }
371
372 if (best_block_to_archive..best_block_number).any(|block_number| {
373 chain_info
374 .ancestor_header(block_number, &best_block_root)
375 .is_none()
376 }) {
377 best_block_to_archive = best_block_number;
381 }
382
383 let maybe_last_archived_block = find_last_archived_block(
429 chain_info,
430 segment_headers_store,
431 best_block_to_archive,
432 &best_block_root,
433 create_object_mappings
434 .is_enabled()
435 .then_some(|_block: &Block| {
436 Vec::new()
443 }),
444 )
445 .await;
446
447 let have_last_segment_header = maybe_last_archived_block.is_some();
448 let mut best_archived_block = None::<(BlockRoot, BlockNumber)>;
449
450 let mut archiver =
451 if let Some((last_segment_header, last_archived_block, block_object_mappings)) =
452 maybe_last_archived_block
453 {
454 let last_archived_block_number = last_segment_header.last_archived_block.number;
456 info!(
457 %last_archived_block_number,
458 "Resuming archiver from last archived block",
459 );
460
461 let last_archived_block_header = last_archived_block.header().header();
462 best_archived_block.replace((
465 *last_archived_block_header.root(),
466 last_archived_block_header.prefix.number,
467 ));
468
469 let last_archived_block_encoded = encode_block(&last_archived_block);
470
471 Archiver::with_initial_state(
472 erasure_coding,
473 last_segment_header,
474 &last_archived_block_encoded,
475 block_object_mappings,
476 )?
477 } else {
478 info!("Starting archiving from genesis");
479
480 Archiver::new(erasure_coding)
481 };
482
483 {
485 let blocks_to_archive_from = archiver
486 .last_archived_block_number()
487 .map(|n| n + BlockNumber::ONE)
488 .unwrap_or_default();
489 let blocks_to_archive_to = best_block_number
490 .checked_sub(confirmation_depth_k)
491 .filter(|&blocks_to_archive_to| blocks_to_archive_to >= blocks_to_archive_from)
492 .or({
493 if have_last_segment_header {
494 None
495 } else {
496 Some(BlockNumber::ZERO)
498 }
499 });
500
501 if let Some(blocks_to_archive_to) = blocks_to_archive_to {
502 info!(
503 "Archiving already produced blocks {}..={}",
504 blocks_to_archive_from, blocks_to_archive_to,
505 );
506
507 for block_number_to_archive in blocks_to_archive_from..=blocks_to_archive_to {
508 let header = chain_info
509 .ancestor_header(block_number_to_archive, &best_block_root)
510 .expect("All blocks since last archived must be present; qed");
511
512 let block = chain_info
513 .block(&header.header().root())
514 .await
515 .expect("All blocks since last archived must be present; qed");
516
517 let block_object_mappings =
518 if create_object_mappings.is_enabled_for_block(block_number_to_archive) {
519 Vec::new()
527 } else {
528 Vec::new()
529 };
530
531 let encoded_block = encode_block(&block);
532
533 debug!(
534 "Encoded block {} has size of {}",
535 block_number_to_archive,
536 ByteSize::b(encoded_block.len() as u64).display().iec(),
537 );
538
539 let block_outcome = archiver
540 .add_block(encoded_block, block_object_mappings)
541 .expect("Block is never empty and doesn't exceed u32; qed");
542 let new_segment_headers: Vec<SegmentHeader> = block_outcome
549 .archived_segments
550 .iter()
551 .map(|archived_segment| archived_segment.segment_header)
552 .collect();
553
554 if !new_segment_headers.is_empty() {
555 segment_headers_store.add_segment_headers(&new_segment_headers)?;
556 }
557
558 if block_number_to_archive == blocks_to_archive_to {
559 best_archived_block.replace((*header.header().root(), block_number_to_archive));
560 }
561 }
562 }
563 }
564
565 Ok(InitializedArchiver {
566 archiver,
567 best_archived_block: best_archived_block
568 .expect("Must always set if there is no logical error; qed"),
569 })
570}
571
572pub async fn create_archiver_task<Block, CI>(
595 segment_headers_store: SegmentHeadersStore,
596 chain_info: CI,
597 mut block_importing_notification_receiver: mpsc::Receiver<BlockImportingNotification>,
598 mut archived_segment_notification_sender: mpsc::Sender<ArchivedSegmentNotification>,
599 consensus_constants: ConsensusConstants,
600 create_object_mappings: CreateObjectMappings,
601 erasure_coding: ErasureCoding,
602) -> Result<impl Future<Output = Result<(), ArchiverTaskError>> + Send + 'static, ArchiverTaskError>
603where
604 Block: GenericOwnedBlock,
605 CI: ChainInfo<Block> + 'static,
606{
607 if create_object_mappings.is_enabled() {
608 info!(
609 ?create_object_mappings,
610 "Creating object mappings from the configured block onwards"
611 );
612 } else {
613 info!("Not creating object mappings");
614 }
615
616 let maybe_archiver = if segment_headers_store.max_segment_index().is_none() {
617 let initialize_archiver_fut = initialize_archiver(
618 &segment_headers_store,
619 &chain_info,
620 consensus_constants.confirmation_depth_k,
621 create_object_mappings,
622 erasure_coding.clone(),
623 );
624 Some(initialize_archiver_fut.await?)
625 } else {
626 None
627 };
628
629 Ok(async move {
630 let archiver = match maybe_archiver {
631 Some(archiver) => archiver,
632 None => {
633 let initialize_archiver_fut = initialize_archiver(
634 &segment_headers_store,
635 &chain_info,
636 consensus_constants.confirmation_depth_k,
637 create_object_mappings,
638 erasure_coding.clone(),
639 );
640 initialize_archiver_fut.await?
641 }
642 };
643
644 let InitializedArchiver {
645 mut archiver,
646 best_archived_block,
647 } = archiver;
648 let (mut best_archived_block_root, mut best_archived_block_number) = best_archived_block;
649
650 while let Some(block_importing_notification) =
651 block_importing_notification_receiver.next().await
652 {
653 let importing_block_number = block_importing_notification.block_number;
654 let block_number_to_archive = match importing_block_number
655 .checked_sub(consensus_constants.confirmation_depth_k)
656 {
657 Some(block_number_to_archive) => block_number_to_archive,
658 None => {
659 continue;
661 }
662 };
663
664 let last_archived_block_number = segment_headers_store
665 .last_segment_header()
666 .expect("Exists after archiver initialization; qed")
667 .last_archived_block
668 .number();
669 let create_mappings =
670 create_object_mappings.is_enabled_for_block(last_archived_block_number);
671 trace!(
672 %importing_block_number,
673 %block_number_to_archive,
674 %best_archived_block_number,
675 %last_archived_block_number,
676 "Checking if block needs to be skipped"
677 );
678
679 let skip_last_archived_blocks =
681 last_archived_block_number > block_number_to_archive && !create_mappings;
682 if best_archived_block_number >= block_number_to_archive || skip_last_archived_blocks {
683 debug!(
685 %importing_block_number,
686 %block_number_to_archive,
687 %best_archived_block_number,
688 %last_archived_block_number,
689 "Skipping already archived block",
690 );
691 continue;
692 }
693
694 let best_block_root = chain_info.best_root();
695
696 if best_archived_block_number + BlockNumber::ONE != block_number_to_archive {
701 let initialize_archiver_fut = initialize_archiver(
702 &segment_headers_store,
703 &chain_info,
704 consensus_constants.confirmation_depth_k,
705 create_object_mappings,
706 erasure_coding.clone(),
707 );
708 InitializedArchiver {
709 archiver,
710 best_archived_block: (best_archived_block_root, best_archived_block_number),
711 } = initialize_archiver_fut.await?;
712
713 if best_archived_block_number + BlockNumber::ONE == block_number_to_archive {
714 } else if best_archived_block_number >= block_number_to_archive {
716 continue;
719 } else if chain_info
720 .ancestor_header(importing_block_number - BlockNumber::ONE, &best_block_root)
721 .is_none()
722 {
723 continue;
728 } else {
729 return Err(ArchiverTaskError::BlockGap {
730 best_archived_block_number,
731 block_number_to_archive,
732 importing_block_number,
733 });
734 }
735 }
736
737 (best_archived_block_root, best_archived_block_number) = archive_block(
738 &mut archiver,
739 segment_headers_store.clone(),
740 &chain_info,
741 &mut archived_segment_notification_sender,
742 best_archived_block_root,
743 block_number_to_archive,
744 &best_block_root,
745 create_object_mappings,
746 )
747 .await?;
748 }
749
750 Ok(())
751 })
752}
753
754#[allow(clippy::too_many_arguments)]
756async fn archive_block<Block, CI>(
757 archiver: &mut Archiver,
758 segment_headers_store: SegmentHeadersStore,
759 chain_info: &CI,
760 archived_segment_notification_sender: &mut mpsc::Sender<ArchivedSegmentNotification>,
763 best_archived_block_root: BlockRoot,
764 block_number_to_archive: BlockNumber,
765 best_block_root: &BlockRoot,
766 create_object_mappings: CreateObjectMappings,
767) -> Result<(BlockRoot, BlockNumber), ArchiverTaskError>
768where
769 Block: GenericOwnedBlock,
770 CI: ChainInfo<Block>,
771{
772 let header = chain_info
773 .ancestor_header(block_number_to_archive, best_block_root)
774 .expect("All blocks since last archived must be present; qed");
775
776 let parent_block_root = header.header().prefix.parent_root;
777 if parent_block_root != best_archived_block_root {
778 return Err(ArchiverTaskError::ArchivingReorg {
779 parent_block_root,
780 best_archived_block_root,
781 });
782 }
783
784 let block_root_to_archive = *header.header().root();
785
786 let block = chain_info
787 .block(&block_root_to_archive)
788 .await
789 .expect("All blocks since last archived must be present; qed");
790
791 debug!("Archiving block {block_number_to_archive} ({block_root_to_archive})");
792
793 let create_mappings = create_object_mappings.is_enabled_for_block(block_number_to_archive);
794
795 let block_object_mappings = if create_mappings {
796 Vec::new()
806 } else {
807 Vec::new()
808 };
809
810 let encoded_block = encode_block(&block);
811 debug!(
812 "Encoded block {block_number_to_archive} has size of {}",
813 ByteSize::b(encoded_block.len() as u64).display().iec(),
814 );
815
816 let block_outcome = archiver
817 .add_block(encoded_block, block_object_mappings)
818 .expect("Block is never empty and doesn't exceed u32; qed");
819 for archived_segment in block_outcome.archived_segments {
826 let segment_header = archived_segment.segment_header;
827
828 segment_headers_store.add_segment_headers(slice::from_ref(&segment_header))?;
829
830 send_archived_segment_notification(archived_segment_notification_sender, archived_segment)
831 .await;
832 }
833
834 Ok((block_root_to_archive, block_number_to_archive))
835}
836
837async fn send_archived_segment_notification(
856 archived_segment_notification_sender: &mut mpsc::Sender<ArchivedSegmentNotification>,
857 archived_segment: NewArchivedSegment,
858) {
859 let segment_index = archived_segment.segment_header.segment_index;
860 let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(1);
861 let archived_segment = Arc::new(archived_segment);
864 let archived_segment_notification = ArchivedSegmentNotification {
865 archived_segment: Arc::clone(&archived_segment),
866 acknowledgement_sender,
867 };
868
869 if let Err(error) = archived_segment_notification_sender
870 .send(archived_segment_notification)
871 .await
872 {
873 warn!(
874 %error,
875 "Failed to send archived segment notification"
876 );
877 }
878
879 let wait_fut = async {
880 while acknowledgement_receiver.next().await.is_some() {
881 debug!(
882 "Archived segment notification acknowledged: {}",
883 segment_index
884 );
885 }
886 };
887
888 if tokio::time::timeout(ACKNOWLEDGEMENT_TIMEOUT, wait_fut)
889 .await
890 .is_err()
891 {
892 warn!(
893 "Archived segment notification was not acknowledged and reached timeout, continue \
894 regardless"
895 );
896 }
897}