1use crate::objects::{BlockObject, GlobalObject};
2use ab_core_primitives::block::BlockNumber;
3use ab_core_primitives::hashes::Blake3Hash;
4use ab_core_primitives::pieces::{
5 PieceHeader, PiecePosition, Record, RecordChunksRoot, RecordProof,
6};
7use ab_core_primitives::segments::{
8 ArchivedBlockProgress, ArchivedHistorySegment, LastArchivedBlock, LocalSegmentIndex,
9 RecordedHistorySegment, SegmentHeader, SegmentRoot,
10};
11use ab_core_primitives::shard::ShardIndex;
12use ab_erasure_coding::ErasureCoding;
13use ab_merkle_tree::balanced::BalancedMerkleTree;
14use alloc::collections::VecDeque;
15use alloc::vec;
16use alloc::vec::Vec;
17use core::cmp::Ordering;
18use core::num::NonZeroU32;
19use core::ops::Deref;
20use parity_scale_codec::{Decode, Encode, Input, Output};
21#[cfg(feature = "parallel")]
22use rayon::prelude::*;
23
24struct ArchivedHistorySegmentOutput<'a> {
25 segment: &'a mut ArchivedHistorySegment,
26 offset: usize,
27}
28
29impl Output for ArchivedHistorySegmentOutput<'_> {
30 #[inline]
31 fn write(&mut self, mut bytes: &[u8]) {
32 while !bytes.is_empty() {
33 let piece = self
34 .segment
35 .get_mut(self.offset / Record::SIZE)
36 .expect("Encoding never exceeds the segment size; qed");
37 let output = &mut piece.record.as_flattened_mut()[self.offset % Record::SIZE..];
38 let bytes_to_write = output.len().min(bytes.len());
39 output[..bytes_to_write].copy_from_slice(&bytes[..bytes_to_write]);
40 self.offset += bytes_to_write;
41 bytes = &bytes[bytes_to_write..];
42 }
43 }
44}
45
46#[derive(Debug, Default, Clone, Eq, PartialEq)]
48pub struct Segment {
49 pub items: Vec<SegmentItem>,
51}
52
53impl Encode for Segment {
54 #[inline(always)]
55 fn size_hint(&self) -> usize {
56 RecordedHistorySegment::SIZE
57 }
58
59 #[inline]
60 fn encode_to<O>(&self, dest: &mut O)
61 where
62 O: Output + ?Sized,
63 {
64 for item in &self.items {
65 item.encode_to(dest);
66 }
67 }
68}
69
70impl Decode for Segment {
71 #[inline]
72 fn decode<I>(input: &mut I) -> Result<Self, parity_scale_codec::Error>
73 where
74 I: Input,
75 {
76 let mut items = Vec::new();
77 loop {
78 match input.remaining_len()? {
79 Some(0) => {
80 break;
81 }
82 Some(_) => {
83 }
85 None => {
86 return Err(
87 "Source doesn't report remaining length, decoding not possible".into(),
88 );
89 }
90 }
91
92 match SegmentItem::decode(input) {
93 Ok(item) => {
94 items.push(item);
95 }
96 Err(error) => {
97 return Err(error.chain("Could not decode `Segment::items`"));
98 }
99 }
100 }
101
102 Ok(Self { items })
103 }
104}
105
106#[derive(Debug, Clone, Eq, PartialEq)]
109pub struct BlockBytes(Vec<u8>);
110
111impl Deref for BlockBytes {
112 type Target = [u8];
113
114 #[inline(always)]
115 fn deref(&self) -> &Self::Target {
116 &self.0
117 }
118}
119
120impl From<BlockBytes> for Vec<u8> {
121 #[inline(always)]
122 fn from(value: BlockBytes) -> Self {
123 value.0
124 }
125}
126
127impl Encode for BlockBytes {
128 #[inline(always)]
129 fn size_hint(&self) -> usize {
130 size_of::<u32>() + self.0.len()
131 }
132
133 #[inline]
134 fn encode_to<O>(&self, dest: &mut O)
135 where
136 O: Output + ?Sized,
137 {
138 let length = u32::try_from(self.0.len())
139 .expect("All constructors guarantee the size doesn't exceed `u32`; qed");
140
141 length.encode_to(dest);
142 dest.write(&self.0);
143 }
144}
145
146impl Decode for BlockBytes {
147 #[inline]
148 fn decode<I>(input: &mut I) -> Result<Self, parity_scale_codec::Error>
149 where
150 I: Input,
151 {
152 let length = u32::decode(input)?;
153 if length as usize > (RecordedHistorySegment::SIZE - size_of::<u32>()) {
154 return Err("Segment item size is impossibly large".into());
155 }
156 let mut bytes = vec![0; length as usize];
160 input.read(&mut bytes)?;
161 Ok(Self(bytes))
162 }
163}
164
165impl BlockBytes {
166 #[inline(always)]
167 fn truncate(&mut self, size: usize) {
168 self.0.truncate(size);
169 }
170}
171
172#[derive(Debug, Clone, Eq, PartialEq, Encode, Decode)]
174pub enum SegmentItem {
175 #[codec(index = 0)]
177 Padding,
178 #[codec(index = 1)]
180 Block {
181 bytes: BlockBytes,
183 #[doc(hidden)]
185 #[codec(skip)]
186 block_objects: Vec<BlockObject>,
187 },
188 #[codec(index = 2)]
191 BlockStart {
192 bytes: BlockBytes,
194 #[doc(hidden)]
196 #[codec(skip)]
197 block_objects: Vec<BlockObject>,
198 },
199 #[codec(index = 3)]
201 BlockContinuation {
202 bytes: BlockBytes,
204 #[doc(hidden)]
206 #[codec(skip)]
207 block_objects: Vec<BlockObject>,
208 },
209 #[codec(index = 4)]
211 ParentSegmentHeader(SegmentHeader),
212}
213
214#[derive(Debug, Clone, Eq, PartialEq)]
217pub struct NewArchivedSegment {
218 pub segment_header: SegmentHeader,
220 pub pieces: ArchivedHistorySegment,
222}
223
224#[derive(Debug, Clone, Eq, PartialEq)]
226pub struct ArchiveBlockOutcome {
227 pub archived_segments: Vec<NewArchivedSegment>,
230
231 pub global_objects: Vec<GlobalObject>,
236}
237
238#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, thiserror::Error)]
240pub enum ArchiverInstantiationError {
241 #[error("Invalid last archived block, its size {0} bytes is the same as the encoded block")]
244 InvalidLastArchivedBlock(u32),
245 #[error(
247 "Invalid block, its size {block_bytes} bytes is smaller than the already archived block \
248 {archived_block_bytes} bytes"
249 )]
250 InvalidBlockSmallSize {
251 block_bytes: u32,
253 archived_block_bytes: u32,
255 },
256}
257
258#[derive(Debug, Clone)]
269pub struct Archiver {
270 shard_index: ShardIndex,
271 buffer: VecDeque<SegmentItem>,
274 erasure_coding: ErasureCoding,
276 segment_index: LocalSegmentIndex,
278 prev_segment_header_hash: Blake3Hash,
280 last_archived_block: Option<LastArchivedBlock>,
282}
283
284impl Archiver {
285 pub fn new(shard_index: ShardIndex, erasure_coding: ErasureCoding) -> Self {
287 Self {
288 shard_index,
289 buffer: VecDeque::default(),
290 erasure_coding,
291 segment_index: LocalSegmentIndex::ZERO,
292 prev_segment_header_hash: Blake3Hash::default(),
293 last_archived_block: None,
294 }
295 }
296
297 pub fn with_initial_state(
301 shard_index: ShardIndex,
302 erasure_coding: ErasureCoding,
303 segment_header: SegmentHeader,
304 encoded_block: &[u8],
305 mut block_objects: Vec<BlockObject>,
306 ) -> Result<Self, ArchiverInstantiationError> {
307 let mut archiver = Self::new(shard_index, erasure_coding);
308
309 archiver.segment_index = segment_header.index.as_inner() + LocalSegmentIndex::ONE;
310 archiver.prev_segment_header_hash = segment_header.hash();
311 archiver.last_archived_block = Some(segment_header.last_archived_block);
312
313 archiver
315 .buffer
316 .push_back(SegmentItem::ParentSegmentHeader(segment_header));
317
318 if let Some(archived_block_bytes) = archiver
319 .last_archived_block
320 .expect("Just inserted; qed")
321 .partial_archived()
322 {
323 let archived_block_bytes = archived_block_bytes.get();
324 let encoded_block_bytes = u32::try_from(encoded_block.len())
325 .expect("Blocks length is never bigger than u32; qed");
326
327 match encoded_block_bytes.cmp(&archived_block_bytes) {
328 Ordering::Less => {
329 return Err(ArchiverInstantiationError::InvalidBlockSmallSize {
330 block_bytes: encoded_block_bytes,
331 archived_block_bytes,
332 });
333 }
334 Ordering::Equal => {
335 return Err(ArchiverInstantiationError::InvalidLastArchivedBlock(
336 encoded_block_bytes,
337 ));
338 }
339 Ordering::Greater => {
340 block_objects.retain_mut(|block_object: &mut BlockObject| {
343 if block_object.offset >= archived_block_bytes {
344 block_object.offset -= archived_block_bytes;
345 true
346 } else {
347 false
348 }
349 });
350 archiver.buffer.push_back(SegmentItem::BlockContinuation {
351 bytes: BlockBytes(
352 encoded_block[(archived_block_bytes as usize)..].to_vec(),
353 ),
354 block_objects,
355 });
356 }
357 }
358 }
359
360 Ok(archiver)
361 }
362
363 pub fn last_archived_block_number(&self) -> Option<BlockNumber> {
365 self.last_archived_block
366 .map(|last_archived_block| last_archived_block.number())
367 }
368
369 pub fn add_block(
380 &mut self,
381 bytes: Vec<u8>,
382 block_objects: Vec<BlockObject>,
383 ) -> Option<ArchiveBlockOutcome> {
384 if !(1..u32::MAX as usize).contains(&bytes.len()) {
385 return None;
386 }
387
388 self.buffer.push_back(SegmentItem::Block {
390 bytes: BlockBytes(bytes),
391 block_objects,
392 });
393
394 let mut archived_segments = Vec::new();
395 let mut object_mapping = Vec::new();
396
397 while let Some(mut segment) = self.produce_segment() {
399 object_mapping.extend(Self::produce_object_mappings(segment.items.iter_mut()));
401 archived_segments.push(self.produce_archived_segment(segment));
402 }
403
404 object_mapping.extend(self.produce_next_segment_mappings());
406
407 Some(ArchiveBlockOutcome {
408 archived_segments,
409 global_objects: object_mapping,
410 })
411 }
412
413 fn produce_segment(&mut self) -> Option<Segment> {
416 let mut segment = Segment {
417 items: Vec::with_capacity(self.buffer.len()),
418 };
419
420 let mut last_archived_block = self.last_archived_block;
421
422 let mut segment_size = segment.encoded_size();
423
424 while RecordedHistorySegment::SIZE.saturating_sub(segment_size) >= 6 {
429 let Some(segment_item) = self.buffer.pop_front() else {
430 for segment_item in segment.items.into_iter().rev() {
432 self.buffer.push_front(segment_item);
433 }
434
435 return None;
436 };
437
438 let segment_item_encoded_size = segment_item.encoded_size();
439 segment_size += segment_item_encoded_size;
440
441 let spill_over = segment_size.saturating_sub(RecordedHistorySegment::SIZE);
443
444 let segment_item = match segment_item {
445 SegmentItem::Padding => {
446 unreachable!("Buffer never contains SegmentItem::Padding; qed");
447 }
448 SegmentItem::Block {
449 mut bytes,
450 mut block_objects,
451 } => {
452 let last_archived_block =
453 if let Some(last_archived_block) = &mut last_archived_block {
454 last_archived_block
457 .number
458 .replace(last_archived_block.number() + BlockNumber::ONE);
459 last_archived_block.set_complete();
460 last_archived_block
461 } else {
462 last_archived_block.insert(LastArchivedBlock {
464 number: BlockNumber::ZERO.into(),
465 archived_progress: ArchivedBlockProgress::new_complete(),
466 })
467 };
468
469 if spill_over == 0 {
470 SegmentItem::Block {
471 bytes,
472 block_objects,
473 }
474 } else {
475 let split_point = bytes.len() - spill_over;
476
477 {
478 let continuation_bytes = bytes[split_point..].to_vec();
479 let continuation_block_objects = block_objects
480 .extract_if(.., |block_object: &mut BlockObject| {
481 if block_object.offset >= split_point as u32 {
482 block_object.offset -= split_point as u32;
483 true
484 } else {
485 false
486 }
487 })
488 .collect();
489
490 self.buffer.push_front(SegmentItem::BlockContinuation {
493 bytes: BlockBytes(continuation_bytes),
494 block_objects: continuation_block_objects,
495 });
496 }
497
498 bytes.truncate(split_point);
499 let archived_bytes = u32::try_from(split_point)
501 .ok()
502 .and_then(NonZeroU32::new)
503 .expect(
504 "`::add_block()` method ensures block is not empty and doesn't \
505 exceed `u32::MAX`; qed",
506 );
507 last_archived_block.set_partial_archived(archived_bytes);
508
509 SegmentItem::BlockStart {
510 bytes,
511 block_objects,
512 }
513 }
514 }
515 SegmentItem::BlockStart { .. } => {
516 unreachable!("Buffer never contains SegmentItem::BlockStart; qed");
517 }
518 SegmentItem::BlockContinuation {
519 mut bytes,
520 mut block_objects,
521 } => {
522 let last_archived_block = last_archived_block.as_mut().expect(
523 "Block continuation implies that there are some bytes archived \
524 already; qed",
525 );
526
527 let previously_archived_bytes = last_archived_block.partial_archived().expect(
528 "Block continuation implies that there are some bytes archived \
529 already; qed",
530 );
531
532 if spill_over == 0 {
533 last_archived_block.set_complete();
534
535 SegmentItem::BlockContinuation {
536 bytes,
537 block_objects,
538 }
539 } else {
540 let split_point = bytes.len() - spill_over;
541
542 {
543 let continuation_bytes = bytes[split_point..].to_vec();
544 let continuation_block_objects = block_objects
545 .extract_if(.., |block_object: &mut BlockObject| {
546 if block_object.offset >= split_point as u32 {
547 block_object.offset -= split_point as u32;
548 true
549 } else {
550 false
551 }
552 })
553 .collect();
554 self.buffer.push_front(SegmentItem::BlockContinuation {
557 bytes: BlockBytes(continuation_bytes),
558 block_objects: continuation_block_objects,
559 });
560 }
561
562 bytes.truncate(split_point);
563 let archived_bytes = previously_archived_bytes.get()
565 + u32::try_from(split_point).expect(
566 "`::add_block()` method ensures block length doesn't \
567 exceed `u32::MAX`; qed",
568 );
569 let archived_bytes = NonZeroU32::new(archived_bytes).expect(
570 "Spillover means non-zero length of the block was archived; qed",
571 );
572 last_archived_block.set_partial_archived(archived_bytes);
573
574 SegmentItem::BlockContinuation {
575 bytes,
576 block_objects,
577 }
578 }
579 }
580 SegmentItem::ParentSegmentHeader(parent_segment_header) => {
581 SegmentItem::ParentSegmentHeader(parent_segment_header)
583 }
584 };
585
586 segment.items.push(segment_item);
587 }
588
589 self.last_archived_block = last_archived_block;
590
591 Some(segment)
592 }
593
594 fn produce_next_segment_mappings(&mut self) -> Vec<GlobalObject> {
600 Self::produce_object_mappings(self.buffer.iter_mut())
601 }
602
603 fn produce_object_mappings<'a>(
608 items: impl Iterator<Item = &'a mut SegmentItem>,
609 ) -> Vec<GlobalObject> {
610 let mut corrected_object_mapping = Vec::new();
611 let mut base_offset_in_segment = Segment::default().encoded_size();
612 for segment_item in items {
613 match segment_item {
614 SegmentItem::Padding => {
615 unreachable!(
616 "Segment during archiving never contains SegmentItem::Padding; qed"
617 );
618 }
619 SegmentItem::Block {
620 bytes: _,
621 block_objects,
622 }
623 | SegmentItem::BlockStart {
624 bytes: _,
625 block_objects,
626 }
627 | SegmentItem::BlockContinuation {
628 bytes: _,
629 block_objects,
630 } => {
631 for block_object in block_objects.drain(..) {
632 let offset_in_segment = base_offset_in_segment
634 + 1
635 + u32::encoded_fixed_size().expect("Fixed size; qed")
636 + block_object.offset as usize;
637 let raw_piece_offset = (offset_in_segment % Record::SIZE)
638 .try_into()
639 .expect("Offset within piece should always fit in 32-bit integer; qed");
640 corrected_object_mapping.push(GlobalObject {
641 hash: block_object.hash,
642 piece_position: PiecePosition::from(
643 (offset_in_segment / Record::SIZE) as u8,
644 ),
645 offset: raw_piece_offset,
646 });
647 }
648 }
649 SegmentItem::ParentSegmentHeader(_) => {
650 }
652 }
653
654 base_offset_in_segment += segment_item.encoded_size();
655 }
656
657 corrected_object_mapping
658 }
659
660 fn produce_archived_segment(&mut self, segment: Segment) -> NewArchivedSegment {
662 let mut pieces = {
663 let mut pieces = ArchivedHistorySegment::default();
664
665 segment.encode_to(&mut ArchivedHistorySegmentOutput {
666 segment: &mut pieces,
667 offset: 0,
668 });
669 drop(segment);
671
672 let (source_shards, parity_shards) =
673 pieces.split_at_mut(RecordedHistorySegment::NUM_RAW_RECORDS);
674
675 self.erasure_coding
676 .extend(
677 source_shards.iter().map(|shard| &shard.record),
678 parity_shards.iter_mut().map(|shard| &mut shard.record),
679 )
680 .expect("Statically correct parameters; qed");
681
682 pieces
683 };
684
685 let record_roots = {
687 #[cfg(not(feature = "parallel"))]
688 let source_pieces = pieces.iter_mut();
689 #[cfg(feature = "parallel")]
690 let source_pieces = pieces.par_iter_mut();
691
692 let iter = source_pieces.map(|piece| {
698 let [source_chunks_root, parity_chunks_root] = {
699 let mut parity_chunks = Record::new_boxed();
700
701 self.erasure_coding
702 .extend(piece.record.iter(), parity_chunks.iter_mut())
703 .expect(
704 "Erasure coding instance is deliberately configured to support this \
705 input; qed",
706 );
707
708 let source_chunks_root = *piece.record.source_chunks_root();
709 let parity_chunks_root = BalancedMerkleTree::compute_root_only(&parity_chunks);
710
711 [source_chunks_root, parity_chunks_root]
712 };
713
714 let record_root = BalancedMerkleTree::compute_root_only(&[
715 source_chunks_root,
716 parity_chunks_root,
717 ]);
718
719 piece.header.parity_chunks_root = RecordChunksRoot::from(parity_chunks_root);
720
721 record_root
722 });
723
724 iter.collect::<Vec<_>>()
725 };
726
727 let segment_merkle_tree =
728 BalancedMerkleTree::<{ ArchivedHistorySegment::NUM_PIECES }>::new_boxed(
729 record_roots
730 .as_slice()
731 .try_into()
732 .expect("Statically guaranteed to have correct length; qed"),
733 );
734
735 let segment_root = SegmentRoot::from(segment_merkle_tree.root());
736
737 let segment_header = SegmentHeader {
739 index: self.segment_index.into(),
740 root: segment_root,
741 prev_segment_header_hash: self.prev_segment_header_hash,
742 last_archived_block: self
743 .last_archived_block
744 .expect("Never empty by the time segment is produced; qed"),
745 };
746
747 pieces
749 .iter_mut()
750 .zip(segment_merkle_tree.all_proofs())
751 .for_each(|(piece, record_proof)| {
752 piece.header = PieceHeader {
753 shard_index: self.shard_index.into(),
754 local_segment_index: segment_header.index,
755 segment_root: segment_header.root,
756 record_proof: RecordProof::from(record_proof),
757 ..piece.header
758 };
759 });
760
761 self.segment_index += LocalSegmentIndex::ONE;
763 self.prev_segment_header_hash = segment_header.hash();
764
765 self.buffer
768 .push_front(SegmentItem::ParentSegmentHeader(segment_header));
769
770 NewArchivedSegment {
771 segment_header,
772 pieces: pieces.to_shared(),
773 }
774 }
775}