1use ab_aligned_buffer::SharedAlignedBuffer;
22use ab_archiving::archiver::{Archiver, ArchiverInstantiationError, NewArchivedSegment};
23use ab_client_api::{ChainInfo, ChainInfoWrite, PersistSegmentHeadersError};
24use ab_client_consensus_common::{BlockImportingNotification, ConsensusConstants};
25use ab_core_primitives::block::body::owned::GenericOwnedBlockBody;
26use ab_core_primitives::block::header::GenericBlockHeader;
27use ab_core_primitives::block::header::owned::GenericOwnedBlockHeader;
28use ab_core_primitives::block::owned::GenericOwnedBlock;
29use ab_core_primitives::block::{BlockNumber, BlockRoot, GenericBlock};
30use ab_core_primitives::segments::{LocalSegmentIndex, RecordedHistorySegment, SegmentHeader};
31use ab_core_primitives::shard::RealShardKind;
32use ab_erasure_coding::ErasureCoding;
33use bytesize::ByteSize;
34use chacha20::ChaCha8Rng;
35use chacha20::rand_core::{Rng, SeedableRng};
36use futures::channel::mpsc;
37use futures::prelude::*;
38use std::sync::Arc;
39use std::time::Duration;
40use tracing::{debug, info, trace, warn};
41
42const ACKNOWLEDGEMENT_TIMEOUT: Duration = Duration::from_mins(2);
44
45#[derive(Debug)]
58pub struct ArchivedSegmentNotification {
59 pub archived_segment: Arc<NewArchivedSegment>,
61 pub acknowledgement_sender: mpsc::Sender<()>,
65}
66
67async fn find_last_archived_block<Block, CI>(
68 chain_info: &CI,
69 best_block_number_to_archive: BlockNumber,
70 best_block_root: &BlockRoot,
71) -> Option<(SegmentHeader, Block)>
72where
73 Block: GenericOwnedBlock,
74 CI: ChainInfo<Block>,
75{
76 let max_local_segment_index = chain_info.last_segment_header()?.index.as_inner();
77
78 if max_local_segment_index == LocalSegmentIndex::ZERO {
79 return None;
81 }
82
83 for segment_header in (LocalSegmentIndex::ZERO..=max_local_segment_index)
84 .rev()
85 .filter_map(|segment_index| chain_info.get_segment_header(segment_index))
86 {
87 let last_archived_block_number = segment_header.last_archived_block.number();
88
89 if last_archived_block_number > best_block_number_to_archive {
90 continue;
94 }
95
96 let Some(last_archived_block_header) =
97 chain_info.ancestor_header(last_archived_block_number, best_block_root)
98 else {
99 continue;
102 };
103
104 let last_archived_block_root = &*last_archived_block_header.header().root();
105
106 let Ok(last_archived_block) = chain_info.block(last_archived_block_root).await else {
107 continue;
109 };
110
111 return Some((segment_header, last_archived_block));
112 }
113
114 None
115}
116
117pub fn encode_block<Block>(block: &Block) -> Vec<u8>
119where
120 Block: GenericOwnedBlock,
121{
122 let is_beacon_chain_genesis_block = Block::Block::SHARD_KIND == RealShardKind::BeaconChain
123 && block.header().header().prefix.number == BlockNumber::ZERO;
124 let header_buffer = block.header().buffer();
125 let body_buffer = block.body().buffer();
126
127 let mut encoded_block = Vec::with_capacity(if is_beacon_chain_genesis_block {
129 RecordedHistorySegment::SIZE
130 } else {
131 size_of::<u32>() * 2 + header_buffer.len() as usize + body_buffer.len() as usize
132 });
133
134 encoded_block.extend_from_slice(&header_buffer.len().to_le_bytes());
135 encoded_block.extend_from_slice(&body_buffer.len().to_le_bytes());
136 encoded_block.extend_from_slice(header_buffer);
137 encoded_block.extend_from_slice(body_buffer);
138
139 if is_beacon_chain_genesis_block {
140 let encoded_block_length = encoded_block.len();
141
142 encoded_block.resize(RecordedHistorySegment::SIZE, 0);
148 let mut rng = ChaCha8Rng::from_seed(*block.header().header().result.state_root);
149 rng.fill_bytes(&mut encoded_block[encoded_block_length..]);
150 }
151
152 encoded_block
153}
154
155pub fn decode_block<Block>(mut encoded_block: &[u8]) -> Option<Block>
157where
158 Block: GenericOwnedBlock,
159{
160 let header_length = {
161 let header_length = encoded_block.split_off(..size_of::<u32>())?;
162 u32::from_le_bytes([
163 header_length[0],
164 header_length[1],
165 header_length[2],
166 header_length[3],
167 ])
168 };
169 let body_length = {
170 let body_length = encoded_block.split_off(..size_of::<u32>())?;
171 u32::from_le_bytes([
172 body_length[0],
173 body_length[1],
174 body_length[2],
175 body_length[3],
176 ])
177 };
178
179 let header_buffer = encoded_block.split_off(..header_length as usize)?;
180 let body_buffer = encoded_block.split_off(..body_length as usize)?;
181
182 let header_buffer = SharedAlignedBuffer::from_bytes(header_buffer);
183 let body_buffer = SharedAlignedBuffer::from_bytes(body_buffer);
184
185 Block::from_buffers(header_buffer, body_buffer)
186}
187
188#[derive(Debug, thiserror::Error)]
190pub enum SegmentArchiverTaskError {
191 #[error("Archiver instantiation error: {error}")]
193 Instantiation {
194 #[from]
196 error: ArchiverInstantiationError,
197 },
198 #[error("Failed to persist a new segment header: {error}")]
200 PersistSegmentHeaders {
201 #[from]
203 error: PersistSegmentHeadersError,
204 },
205 #[error(
207 "Attempt to switch to a different fork beyond archiving depth: parent block root \
208 {parent_block_root}, best archived block root {best_archived_block_root}"
209 )]
210 ArchivingReorg {
211 parent_block_root: BlockRoot,
213 best_archived_block_root: BlockRoot,
215 },
216 #[error(
219 "There was a gap in blockchain history, and the last contiguous series of blocks doesn't \
220 start with the archived segment (best archived block number {best_archived_block_number}, \
221 block number to archive {block_number_to_archive}), block about to be imported \
222 {importing_block_number})"
223 )]
224 BlockGap {
225 best_archived_block_number: BlockNumber,
227 block_number_to_archive: BlockNumber,
229 importing_block_number: BlockNumber,
231 },
232}
233
234struct InitializedArchiver {
235 archiver: Archiver,
236 best_archived_block: (BlockRoot, BlockNumber),
237}
238
239async fn initialize_archiver<Block, CI>(
240 chain_info: &CI,
241 block_confirmation_depth: BlockNumber,
242 erasure_coding: ErasureCoding,
243) -> Result<InitializedArchiver, SegmentArchiverTaskError>
244where
245 Block: GenericOwnedBlock,
246 CI: ChainInfoWrite<Block>,
247{
248 let best_block_header = chain_info.best_header();
249 let best_block_root = *best_block_header.header().root();
250 let best_block_number: BlockNumber = best_block_header.header().prefix.number;
251
252 let mut best_block_to_archive = best_block_number.saturating_sub(block_confirmation_depth);
253
254 if (best_block_to_archive..best_block_number).any(|block_number| {
255 chain_info
256 .ancestor_header(block_number, &best_block_root)
257 .is_none()
258 }) {
259 best_block_to_archive = best_block_number;
263 }
264
265 let maybe_last_archived_block =
266 find_last_archived_block(chain_info, best_block_to_archive, &best_block_root).await;
267
268 let have_last_segment_header = maybe_last_archived_block.is_some();
269 let mut best_archived_block = None::<(BlockRoot, BlockNumber)>;
270
271 let mut archiver =
272 if let Some((last_segment_header, last_archived_block)) = maybe_last_archived_block {
273 let last_archived_block_number = last_segment_header.last_archived_block.number;
275 info!(
276 %last_archived_block_number,
277 "Resuming archiver from last archived block",
278 );
279
280 let last_archived_block_header = last_archived_block.header().header();
281 best_archived_block.replace((
284 *last_archived_block_header.root(),
285 last_archived_block_header.prefix.number,
286 ));
287
288 let last_archived_block_encoded = encode_block(&last_archived_block);
289
290 Archiver::with_initial_state(
291 best_block_header.header().prefix.shard_index,
292 erasure_coding,
293 last_segment_header,
294 &last_archived_block_encoded,
295 Vec::new(),
296 )?
297 } else {
298 info!("Starting archiving from genesis");
299
300 Archiver::new(
301 best_block_header.header().prefix.shard_index,
302 erasure_coding,
303 )
304 };
305
306 {
308 let blocks_to_archive_from = archiver
309 .last_archived_block_number()
310 .map(|n| n + BlockNumber::ONE)
311 .unwrap_or_default();
312 let blocks_to_archive_to = best_block_number
313 .checked_sub(block_confirmation_depth)
314 .filter(|&blocks_to_archive_to| blocks_to_archive_to >= blocks_to_archive_from)
315 .or({
316 if have_last_segment_header {
317 None
318 } else {
319 Some(BlockNumber::ZERO)
321 }
322 });
323
324 if let Some(blocks_to_archive_to) = blocks_to_archive_to {
325 info!(
326 "Archiving already produced blocks {}..={}",
327 blocks_to_archive_from, blocks_to_archive_to,
328 );
329
330 for block_number_to_archive in blocks_to_archive_from..=blocks_to_archive_to {
331 let header = chain_info
332 .ancestor_header(block_number_to_archive, &best_block_root)
333 .expect("All blocks since last archived must be present; qed");
334
335 let block = chain_info
336 .block(&header.header().root())
337 .await
338 .expect("All blocks since last archived must be present; qed");
339
340 let encoded_block = encode_block(&block);
341
342 debug!(
343 "Encoded block {} has size of {}",
344 block_number_to_archive,
345 ByteSize::b(encoded_block.len() as u64).display().iec(),
346 );
347
348 let block_outcome = archiver
349 .add_block(encoded_block, Vec::new())
350 .expect("Block is never empty and doesn't exceed u32; qed");
351 let new_segment_headers: Vec<SegmentHeader> = block_outcome
352 .archived_segments
353 .iter()
354 .map(|archived_segment| archived_segment.segment_header)
355 .collect();
356
357 if !new_segment_headers.is_empty() {
358 chain_info
359 .persist_segment_headers(new_segment_headers)
360 .await?;
361 }
362
363 if block_number_to_archive == blocks_to_archive_to {
364 best_archived_block.replace((*header.header().root(), block_number_to_archive));
365 }
366 }
367 }
368 }
369
370 Ok(InitializedArchiver {
371 archiver,
372 best_archived_block: best_archived_block
373 .expect("Must always set if there is no logical error; qed"),
374 })
375}
376
377pub async fn create_segment_archiver_task<Block, CI>(
400 chain_info: CI,
401 mut block_importing_notification_receiver: mpsc::Receiver<BlockImportingNotification>,
402 mut archived_segment_notification_sender: mpsc::Sender<ArchivedSegmentNotification>,
403 consensus_constants: ConsensusConstants,
404 erasure_coding: ErasureCoding,
405) -> Result<
406 impl Future<Output = Result<(), SegmentArchiverTaskError>> + Send + 'static,
407 SegmentArchiverTaskError,
408>
409where
410 Block: GenericOwnedBlock,
411 CI: ChainInfoWrite<Block> + 'static,
412{
413 let maybe_archiver = if chain_info.last_segment_header().is_none() {
414 let initialize_archiver_fut = initialize_archiver(
415 &chain_info,
416 consensus_constants.block_confirmation_depth,
417 erasure_coding.clone(),
418 );
419 Some(initialize_archiver_fut.await?)
420 } else {
421 None
422 };
423
424 Ok(async move {
425 let archiver = match maybe_archiver {
426 Some(archiver) => archiver,
427 None => {
428 let initialize_archiver_fut = initialize_archiver(
429 &chain_info,
430 consensus_constants.block_confirmation_depth,
431 erasure_coding.clone(),
432 );
433 initialize_archiver_fut.await?
434 }
435 };
436
437 let InitializedArchiver {
438 mut archiver,
439 best_archived_block,
440 } = archiver;
441 let (mut best_archived_block_root, mut best_archived_block_number) = best_archived_block;
442
443 while let Some(block_importing_notification) =
444 block_importing_notification_receiver.next().await
445 {
446 let importing_block_number = block_importing_notification.block_number;
447 let block_number_to_archive = match importing_block_number
448 .checked_sub(consensus_constants.block_confirmation_depth)
449 {
450 Some(block_number_to_archive) => block_number_to_archive,
451 None => {
452 continue;
454 }
455 };
456
457 let last_archived_block_number = chain_info
458 .last_segment_header()
459 .expect("Exists after archiver initialization; qed")
460 .last_archived_block
461 .number();
462 trace!(
463 %importing_block_number,
464 %block_number_to_archive,
465 %best_archived_block_number,
466 %last_archived_block_number,
467 "Checking if block needs to be skipped"
468 );
469
470 let skip_last_archived_blocks = last_archived_block_number > block_number_to_archive;
472 if best_archived_block_number >= block_number_to_archive || skip_last_archived_blocks {
473 debug!(
475 %importing_block_number,
476 %block_number_to_archive,
477 %best_archived_block_number,
478 %last_archived_block_number,
479 "Skipping already archived block",
480 );
481 continue;
482 }
483
484 let best_block_root = chain_info.best_root();
485
486 if best_archived_block_number + BlockNumber::ONE != block_number_to_archive {
491 let initialize_archiver_fut = initialize_archiver(
492 &chain_info,
493 consensus_constants.block_confirmation_depth,
494 erasure_coding.clone(),
495 );
496 InitializedArchiver {
497 archiver,
498 best_archived_block: (best_archived_block_root, best_archived_block_number),
499 } = initialize_archiver_fut.await?;
500
501 if best_archived_block_number + BlockNumber::ONE == block_number_to_archive {
502 } else if best_archived_block_number >= block_number_to_archive {
504 continue;
507 } else if chain_info
508 .ancestor_header(importing_block_number - BlockNumber::ONE, &best_block_root)
509 .is_none()
510 {
511 continue;
516 } else {
517 return Err(SegmentArchiverTaskError::BlockGap {
518 best_archived_block_number,
519 block_number_to_archive,
520 importing_block_number,
521 });
522 }
523 }
524
525 (best_archived_block_root, best_archived_block_number) = archive_block(
526 &mut archiver,
527 &chain_info,
528 &mut archived_segment_notification_sender,
529 best_archived_block_root,
530 block_number_to_archive,
531 &best_block_root,
532 )
533 .await?;
534 }
535
536 Ok(())
537 })
538}
539
540async fn archive_block<Block, CI>(
542 archiver: &mut Archiver,
543 chain_info: &CI,
544 archived_segment_notification_sender: &mut mpsc::Sender<ArchivedSegmentNotification>,
545 best_archived_block_root: BlockRoot,
546 block_number_to_archive: BlockNumber,
547 best_block_root: &BlockRoot,
548) -> Result<(BlockRoot, BlockNumber), SegmentArchiverTaskError>
549where
550 Block: GenericOwnedBlock,
551 CI: ChainInfoWrite<Block>,
552{
553 let header = chain_info
554 .ancestor_header(block_number_to_archive, best_block_root)
555 .expect("All blocks since last archived must be present; qed");
556
557 let parent_block_root = header.header().prefix.parent_root;
558 if parent_block_root != best_archived_block_root {
559 return Err(SegmentArchiverTaskError::ArchivingReorg {
560 parent_block_root,
561 best_archived_block_root,
562 });
563 }
564
565 let block_root_to_archive = *header.header().root();
566
567 let block = chain_info
568 .block(&block_root_to_archive)
569 .await
570 .expect("All blocks since last archived must be present; qed");
571
572 debug!("Archiving block {block_number_to_archive} ({block_root_to_archive})");
573
574 let encoded_block = encode_block(&block);
575 debug!(
576 "Encoded block {block_number_to_archive} has size of {}",
577 ByteSize::b(encoded_block.len() as u64).display().iec(),
578 );
579
580 let block_outcome = archiver
581 .add_block(encoded_block, Vec::new())
582 .expect("Block is never empty and doesn't exceed u32; qed");
583 for archived_segment in block_outcome.archived_segments {
584 let segment_header = archived_segment.segment_header;
585
586 chain_info
587 .persist_segment_headers(vec![segment_header])
588 .await?;
589
590 send_archived_segment_notification(archived_segment_notification_sender, archived_segment)
591 .await;
592 }
593
594 Ok((block_root_to_archive, block_number_to_archive))
595}
596
597async fn send_archived_segment_notification(
598 archived_segment_notification_sender: &mut mpsc::Sender<ArchivedSegmentNotification>,
599 archived_segment: NewArchivedSegment,
600) {
601 let segment_index = archived_segment.segment_header.index;
602 let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(1);
603 let archived_segment = Arc::new(archived_segment);
606 let archived_segment_notification = ArchivedSegmentNotification {
607 archived_segment: Arc::clone(&archived_segment),
608 acknowledgement_sender,
609 };
610
611 if let Err(error) = archived_segment_notification_sender
612 .send(archived_segment_notification)
613 .await
614 {
615 warn!(
616 %error,
617 "Failed to send archived segment notification"
618 );
619 }
620
621 let wait_fut = async {
622 while acknowledgement_receiver.next().await.is_some() {
623 debug!(
624 "Archived segment notification acknowledged: {}",
625 segment_index
626 );
627 }
628 };
629
630 if tokio::time::timeout(ACKNOWLEDGEMENT_TIMEOUT, wait_fut)
631 .await
632 .is_err()
633 {
634 warn!(
635 "Archived segment notification was not acknowledged and reached timeout, continue \
636 regardless"
637 );
638 }
639}