1use ab_aligned_buffer::SharedAlignedBuffer;
22use ab_archiving::archiver::{Archiver, ArchiverInstantiationError, NewArchivedSegment};
23use ab_client_api::{ChainInfo, ChainInfoWrite, PersistSegmentHeadersError};
24use ab_client_consensus_common::{BlockImportingNotification, ConsensusConstants};
25use ab_core_primitives::block::body::owned::GenericOwnedBlockBody;
26use ab_core_primitives::block::header::GenericBlockHeader;
27use ab_core_primitives::block::header::owned::GenericOwnedBlockHeader;
28use ab_core_primitives::block::owned::GenericOwnedBlock;
29use ab_core_primitives::block::{BlockNumber, BlockRoot, GenericBlock};
30use ab_core_primitives::segments::{LocalSegmentIndex, RecordedHistorySegment, SegmentHeader};
31use ab_core_primitives::shard::RealShardKind;
32use ab_erasure_coding::ErasureCoding;
33use bytesize::ByteSize;
34use chacha20::ChaCha8Rng;
35use chacha20::rand_core::{Rng, SeedableRng};
36use futures::channel::mpsc;
37use futures::prelude::*;
38use std::sync::Arc;
39use std::time::Duration;
40use tracing::{debug, info, trace, warn};
41
42const ACKNOWLEDGEMENT_TIMEOUT: Duration = Duration::from_mins(2);
44
45#[derive(Debug)]
58pub struct ArchivedSegmentNotification {
59 pub archived_segment: Arc<NewArchivedSegment>,
61 pub acknowledgement_sender: mpsc::Sender<()>,
65}
66
67async fn find_last_archived_block<Block, CI>(
68 chain_info: &CI,
69 best_block_number_to_archive: BlockNumber,
70 best_block_root: &BlockRoot,
71) -> Option<(SegmentHeader, Block)>
72where
73 Block: GenericOwnedBlock,
74 CI: ChainInfo<Block>,
75{
76 let max_local_segment_index = chain_info.last_segment_header()?.index.as_inner();
77
78 if max_local_segment_index == LocalSegmentIndex::ZERO {
79 return None;
81 }
82
83 for segment_header in (LocalSegmentIndex::ZERO..=max_local_segment_index)
84 .rev()
85 .filter_map(|segment_index| chain_info.get_segment_header(segment_index))
86 {
87 let last_archived_block_number = segment_header.last_archived_block.number();
88
89 if last_archived_block_number > best_block_number_to_archive {
90 continue;
94 }
95
96 let Some(last_archived_block_header) =
97 chain_info.ancestor_header(last_archived_block_number, best_block_root)
98 else {
99 continue;
102 };
103
104 let last_archived_block_root = &*last_archived_block_header.header().root();
105
106 let Ok(last_archived_block) = chain_info.block(last_archived_block_root).await else {
107 continue;
109 };
110
111 return Some((segment_header, last_archived_block));
112 }
113
114 None
115}
116
117pub fn encode_block<Block>(block: &Block) -> Vec<u8>
119where
120 Block: GenericOwnedBlock,
121{
122 let is_beacon_chain_genesis_block = Block::Block::SHARD_KIND == RealShardKind::BeaconChain
123 && block.header().header().prefix.number == BlockNumber::ZERO;
124 let header_buffer = block.header().buffer();
125 let body_buffer = block.body().buffer();
126
127 let mut encoded_block = Vec::with_capacity(if is_beacon_chain_genesis_block {
129 RecordedHistorySegment::SIZE
130 } else {
131 size_of::<u32>() * 2 + header_buffer.len() as usize + body_buffer.len() as usize
132 });
133
134 encoded_block.extend_from_slice(&header_buffer.len().to_le_bytes());
135 encoded_block.extend_from_slice(&body_buffer.len().to_le_bytes());
136 encoded_block.extend_from_slice(header_buffer);
137 encoded_block.extend_from_slice(body_buffer);
138
139 if is_beacon_chain_genesis_block {
140 let encoded_block_length = encoded_block.len();
141
142 encoded_block.resize(RecordedHistorySegment::SIZE, 0);
148 let mut rng = ChaCha8Rng::from_seed(*block.header().header().result.state_root);
149 rng.fill_bytes(&mut encoded_block[encoded_block_length..]);
150 }
151
152 encoded_block
153}
154
155pub fn decode_block<Block>(mut encoded_block: &[u8]) -> Option<Block>
157where
158 Block: GenericOwnedBlock,
159{
160 let header_length = {
161 let header_length = encoded_block.split_off(..size_of::<u32>())?;
162 u32::from_le_bytes([
163 header_length[0],
164 header_length[1],
165 header_length[2],
166 header_length[3],
167 ])
168 };
169 let body_length = {
170 let body_length = encoded_block.split_off(..size_of::<u32>())?;
171 u32::from_le_bytes([
172 body_length[0],
173 body_length[1],
174 body_length[2],
175 body_length[3],
176 ])
177 };
178
179 let header_buffer = encoded_block.split_off(..header_length as usize)?;
180 let body_buffer = encoded_block.split_off(..body_length as usize)?;
181
182 let header_buffer = SharedAlignedBuffer::from_bytes(header_buffer);
183 let body_buffer = SharedAlignedBuffer::from_bytes(body_buffer);
184
185 Block::from_buffers(header_buffer, body_buffer)
186}
187
188#[derive(Debug, thiserror::Error)]
190pub enum SegmentArchiverTaskError {
191 #[error("Archiver instantiation error: {error}")]
193 Instantiation {
194 #[from]
196 error: ArchiverInstantiationError,
197 },
198 #[error("Failed to persist a new segment header: {error}")]
200 PersistSegmentHeaders {
201 #[from]
203 error: PersistSegmentHeadersError,
204 },
205 #[error(
207 "Attempt to switch to a different fork beyond archiving depth: parent block root \
208 {parent_block_root}, best archived block root {best_archived_block_root}"
209 )]
210 ArchivingReorg {
211 parent_block_root: BlockRoot,
213 best_archived_block_root: BlockRoot,
215 },
216 #[error(
219 "There was a gap in blockchain history, and the last contiguous series of blocks doesn't \
220 start with the archived segment (best archived block number {best_archived_block_number}, \
221 block number to archive {block_number_to_archive}), block about to be imported \
222 {importing_block_number})"
223 )]
224 BlockGap {
225 best_archived_block_number: BlockNumber,
227 block_number_to_archive: BlockNumber,
229 importing_block_number: BlockNumber,
231 },
232}
233
234struct InitializedArchiver {
235 archiver: Archiver,
236 best_archived_block: (BlockRoot, BlockNumber),
237}
238
239async fn initialize_archiver<Block, CI>(
240 chain_info: &CI,
241 block_confirmation_depth: BlockNumber,
242 erasure_coding: ErasureCoding,
243) -> Result<InitializedArchiver, SegmentArchiverTaskError>
244where
245 Block: GenericOwnedBlock,
246 CI: ChainInfoWrite<Block>,
247{
248 let best_block_header = chain_info.best_header();
249 let best_block_root = *best_block_header.header().root();
250 let best_block_number: BlockNumber = best_block_header.header().prefix.number;
251
252 let mut best_block_to_archive = best_block_number.saturating_sub(block_confirmation_depth);
253
254 if (best_block_to_archive..best_block_number).any(|block_number| {
255 chain_info
256 .ancestor_header(block_number, &best_block_root)
257 .is_none()
258 }) {
259 best_block_to_archive = best_block_number;
263 }
264
265 let maybe_last_archived_block =
266 find_last_archived_block(chain_info, best_block_to_archive, &best_block_root).await;
267
268 let have_last_segment_header = maybe_last_archived_block.is_some();
269 let mut best_archived_block = None::<(BlockRoot, BlockNumber)>;
270
271 let mut archiver =
272 if let Some((last_segment_header, last_archived_block)) = maybe_last_archived_block {
273 let last_archived_block_number = last_segment_header.last_archived_block.number;
275 info!(
276 %last_archived_block_number,
277 "Resuming archiver from last archived block",
278 );
279
280 let last_archived_block_header = last_archived_block.header().header();
281 best_archived_block.replace((
284 *last_archived_block_header.root(),
285 last_archived_block_header.prefix.number,
286 ));
287
288 let last_archived_block_encoded = encode_block(&last_archived_block);
289
290 Archiver::with_initial_state(
291 best_block_header.header().prefix.shard_index,
292 erasure_coding,
293 last_segment_header,
294 &last_archived_block_encoded,
295 Vec::new(),
296 )?
297 } else {
298 info!("Starting archiving from genesis");
299
300 Archiver::new(
301 best_block_header.header().prefix.shard_index,
302 erasure_coding,
303 )
304 };
305
306 {
308 let blocks_to_archive_from = archiver
309 .last_archived_block_number()
310 .map(|n| n + BlockNumber::ONE)
311 .unwrap_or_default();
312 let blocks_to_archive_to = best_block_number
313 .checked_sub(block_confirmation_depth)
314 .filter(|&blocks_to_archive_to| blocks_to_archive_to >= blocks_to_archive_from)
315 .or({
316 if have_last_segment_header {
317 None
318 } else {
319 Some(BlockNumber::ZERO)
321 }
322 });
323
324 if let Some(blocks_to_archive_to) = blocks_to_archive_to {
325 info!(
326 "Archiving already produced blocks {}..={}",
327 blocks_to_archive_from, blocks_to_archive_to,
328 );
329
330 for block_number_to_archive in blocks_to_archive_from..=blocks_to_archive_to {
331 let header = chain_info
332 .ancestor_header(block_number_to_archive, &best_block_root)
333 .expect("All blocks since last archived must be present; qed");
334
335 let block = chain_info
336 .block(&header.header().root())
337 .await
338 .expect("All blocks since last archived must be present; qed");
339
340 let encoded_block = encode_block(&block);
341
342 debug!(
343 "Encoded block {} has size of {}",
344 block_number_to_archive,
345 ByteSize::b(encoded_block.len() as u64).display().iec(),
346 );
347
348 let block_outcome = archiver
349 .add_block(encoded_block, Vec::new())
350 .expect("Block is never empty and doesn't exceed u32; qed");
351 let new_segment_headers: Vec<SegmentHeader> = block_outcome
352 .archived_segments
353 .iter()
354 .map(|archived_segment| archived_segment.segment_header)
355 .collect();
356
357 if !new_segment_headers.is_empty() {
358 chain_info
359 .persist_segment_headers(new_segment_headers)
360 .await?;
361 }
362
363 if block_number_to_archive == blocks_to_archive_to {
364 best_archived_block.replace((*header.header().root(), block_number_to_archive));
365 }
366 }
367 }
368 }
369
370 Ok(InitializedArchiver {
371 archiver,
372 best_archived_block: best_archived_block
373 .expect("Must always set if there is no logical error; qed"),
374 })
375}
376
377pub async fn create_segment_archiver_task<Block, CI>(
400 chain_info: CI,
401 mut block_importing_notification_receiver: mpsc::Receiver<BlockImportingNotification>,
402 mut archived_segment_notification_sender: mpsc::Sender<ArchivedSegmentNotification>,
403 consensus_constants: ConsensusConstants,
404 erasure_coding: ErasureCoding,
405) -> Result<
406 impl Future<Output = Result<(), SegmentArchiverTaskError>> + Send + 'static,
407 SegmentArchiverTaskError,
408>
409where
410 Block: GenericOwnedBlock,
411 CI: ChainInfoWrite<Block> + 'static,
412{
413 let maybe_archiver = if chain_info.last_segment_header().is_none() {
414 let initialize_archiver_fut = initialize_archiver(
415 &chain_info,
416 consensus_constants.block_confirmation_depth,
417 erasure_coding.clone(),
418 );
419 Some(initialize_archiver_fut.await?)
420 } else {
421 None
422 };
423
424 Ok(async move {
425 let archiver = if let Some(archiver) = maybe_archiver {
426 archiver
427 } else {
428 let initialize_archiver_fut = initialize_archiver(
429 &chain_info,
430 consensus_constants.block_confirmation_depth,
431 erasure_coding.clone(),
432 );
433 initialize_archiver_fut.await?
434 };
435
436 let InitializedArchiver {
437 mut archiver,
438 best_archived_block,
439 } = archiver;
440 let (mut best_archived_block_root, mut best_archived_block_number) = best_archived_block;
441
442 while let Some(block_importing_notification) =
443 block_importing_notification_receiver.next().await
444 {
445 let importing_block_number = block_importing_notification.block_number;
446 let Some(block_number_to_archive) =
447 importing_block_number.checked_sub(consensus_constants.block_confirmation_depth)
448 else {
449 continue;
451 };
452
453 let last_archived_block_number = chain_info
454 .last_segment_header()
455 .expect("Exists after archiver initialization; qed")
456 .last_archived_block
457 .number();
458 trace!(
459 %importing_block_number,
460 %block_number_to_archive,
461 %best_archived_block_number,
462 %last_archived_block_number,
463 "Checking if block needs to be skipped"
464 );
465
466 let skip_last_archived_blocks = last_archived_block_number > block_number_to_archive;
468 if best_archived_block_number >= block_number_to_archive || skip_last_archived_blocks {
469 debug!(
471 %importing_block_number,
472 %block_number_to_archive,
473 %best_archived_block_number,
474 %last_archived_block_number,
475 "Skipping already archived block",
476 );
477 continue;
478 }
479
480 let best_block_root = chain_info.best_root();
481
482 if best_archived_block_number + BlockNumber::ONE != block_number_to_archive {
487 let initialize_archiver_fut = initialize_archiver(
488 &chain_info,
489 consensus_constants.block_confirmation_depth,
490 erasure_coding.clone(),
491 );
492 InitializedArchiver {
493 archiver,
494 best_archived_block: (best_archived_block_root, best_archived_block_number),
495 } = initialize_archiver_fut.await?;
496
497 if best_archived_block_number + BlockNumber::ONE == block_number_to_archive {
498 } else if best_archived_block_number >= block_number_to_archive {
500 continue;
503 } else if chain_info
504 .ancestor_header(importing_block_number - BlockNumber::ONE, &best_block_root)
505 .is_none()
506 {
507 continue;
512 } else {
513 return Err(SegmentArchiverTaskError::BlockGap {
514 best_archived_block_number,
515 block_number_to_archive,
516 importing_block_number,
517 });
518 }
519 }
520
521 (best_archived_block_root, best_archived_block_number) = archive_block(
522 &mut archiver,
523 &chain_info,
524 &mut archived_segment_notification_sender,
525 best_archived_block_root,
526 block_number_to_archive,
527 &best_block_root,
528 )
529 .await?;
530 }
531
532 Ok(())
533 })
534}
535
536async fn archive_block<Block, CI>(
538 archiver: &mut Archiver,
539 chain_info: &CI,
540 archived_segment_notification_sender: &mut mpsc::Sender<ArchivedSegmentNotification>,
541 best_archived_block_root: BlockRoot,
542 block_number_to_archive: BlockNumber,
543 best_block_root: &BlockRoot,
544) -> Result<(BlockRoot, BlockNumber), SegmentArchiverTaskError>
545where
546 Block: GenericOwnedBlock,
547 CI: ChainInfoWrite<Block>,
548{
549 let header = chain_info
550 .ancestor_header(block_number_to_archive, best_block_root)
551 .expect("All blocks since last archived must be present; qed");
552
553 let parent_block_root = header.header().prefix.parent_root;
554 if parent_block_root != best_archived_block_root {
555 return Err(SegmentArchiverTaskError::ArchivingReorg {
556 parent_block_root,
557 best_archived_block_root,
558 });
559 }
560
561 let block_root_to_archive = *header.header().root();
562
563 let block = chain_info
564 .block(&block_root_to_archive)
565 .await
566 .expect("All blocks since last archived must be present; qed");
567
568 debug!("Archiving block {block_number_to_archive} ({block_root_to_archive})");
569
570 let encoded_block = encode_block(&block);
571 debug!(
572 "Encoded block {block_number_to_archive} has size of {}",
573 ByteSize::b(encoded_block.len() as u64).display().iec(),
574 );
575
576 let block_outcome = archiver
577 .add_block(encoded_block, Vec::new())
578 .expect("Block is never empty and doesn't exceed u32; qed");
579 for archived_segment in block_outcome.archived_segments {
580 let segment_header = archived_segment.segment_header;
581
582 chain_info
583 .persist_segment_headers(vec![segment_header])
584 .await?;
585
586 send_archived_segment_notification(archived_segment_notification_sender, archived_segment)
587 .await;
588 }
589
590 Ok((block_root_to_archive, block_number_to_archive))
591}
592
593async fn send_archived_segment_notification(
594 archived_segment_notification_sender: &mut mpsc::Sender<ArchivedSegmentNotification>,
595 archived_segment: NewArchivedSegment,
596) {
597 let segment_index = archived_segment.segment_header.index;
598 let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(1);
599 let archived_segment = Arc::new(archived_segment);
602 let archived_segment_notification = ArchivedSegmentNotification {
603 archived_segment: Arc::clone(&archived_segment),
604 acknowledgement_sender,
605 };
606
607 if let Err(error) = archived_segment_notification_sender
608 .send(archived_segment_notification)
609 .await
610 {
611 warn!(
612 %error,
613 "Failed to send archived segment notification"
614 );
615 }
616
617 let wait_fut = async {
618 while acknowledgement_receiver.next().await.is_some() {
619 debug!(
620 "Archived segment notification acknowledged: {}",
621 segment_index
622 );
623 }
624 };
625
626 if tokio::time::timeout(ACKNOWLEDGEMENT_TIMEOUT, wait_fut)
627 .await
628 .is_err()
629 {
630 warn!(
631 "Archived segment notification was not acknowledged and reached timeout, continue \
632 regardless"
633 );
634 }
635}