1use crate::objects::{BlockObject, GlobalObject};
2use ab_core_primitives::block::BlockNumber;
3use ab_core_primitives::hashes::Blake3Hash;
4use ab_core_primitives::pieces::{
5 PieceHeader, PiecePosition, Record, RecordChunksRoot, RecordProof,
6};
7use ab_core_primitives::segments::{
8 ArchivedBlockProgress, ArchivedHistorySegment, LastArchivedBlock, LocalSegmentIndex,
9 RecordedHistorySegment, SegmentHeader, SegmentRoot,
10};
11use ab_core_primitives::shard::ShardIndex;
12use ab_erasure_coding::ErasureCoding;
13use ab_merkle_tree::balanced::BalancedMerkleTree;
14use alloc::collections::VecDeque;
15use alloc::vec;
16use alloc::vec::Vec;
17use core::cmp::Ordering;
18use core::num::NonZeroU32;
19use core::ops::Deref;
20use parity_scale_codec::{Decode, Encode, Input, Output};
21#[cfg(feature = "parallel")]
22use rayon::prelude::*;
23
24struct ArchivedHistorySegmentOutput<'a> {
25 segment: &'a mut ArchivedHistorySegment,
26 offset: usize,
27}
28
29impl Output for ArchivedHistorySegmentOutput<'_> {
30 #[inline]
31 fn write(&mut self, mut bytes: &[u8]) {
32 while !bytes.is_empty() {
33 let piece = self
34 .segment
35 .get_mut(self.offset / Record::SIZE)
36 .expect("Encoding never exceeds the segment size; qed");
37 let output = &mut piece.record.as_flattened_mut()[self.offset % Record::SIZE..];
38 let bytes_to_write = output.len().min(bytes.len());
39 output[..bytes_to_write].copy_from_slice(&bytes[..bytes_to_write]);
40 self.offset += bytes_to_write;
41 bytes = &bytes[bytes_to_write..];
42 }
43 }
44}
45
46#[derive(Debug, Default, Clone, Eq, PartialEq)]
48pub struct Segment {
49 pub items: Vec<SegmentItem>,
51}
52
53impl Encode for Segment {
54 #[inline(always)]
55 fn size_hint(&self) -> usize {
56 RecordedHistorySegment::SIZE
57 }
58
59 #[inline]
60 fn encode_to<O: Output + ?Sized>(&self, dest: &mut O) {
61 for item in &self.items {
62 item.encode_to(dest);
63 }
64 }
65}
66
67impl Decode for Segment {
68 #[inline]
69 fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
70 let mut items = Vec::new();
71 loop {
72 match input.remaining_len()? {
73 Some(0) => {
74 break;
75 }
76 Some(_) => {
77 }
79 None => {
80 return Err(
81 "Source doesn't report remaining length, decoding not possible".into(),
82 );
83 }
84 }
85
86 match SegmentItem::decode(input) {
87 Ok(item) => {
88 items.push(item);
89 }
90 Err(error) => {
91 return Err(error.chain("Could not decode `Segment::items`"));
92 }
93 }
94 }
95
96 Ok(Self { items })
97 }
98}
99
100#[derive(Debug, Clone, Eq, PartialEq)]
103pub struct BlockBytes(Vec<u8>);
104
105impl Deref for BlockBytes {
106 type Target = [u8];
107
108 #[inline(always)]
109 fn deref(&self) -> &Self::Target {
110 &self.0
111 }
112}
113
114impl From<BlockBytes> for Vec<u8> {
115 #[inline(always)]
116 fn from(value: BlockBytes) -> Self {
117 value.0
118 }
119}
120
121impl Encode for BlockBytes {
122 #[inline(always)]
123 fn size_hint(&self) -> usize {
124 size_of::<u32>() + self.0.len()
125 }
126
127 #[inline]
128 fn encode_to<O: Output + ?Sized>(&self, dest: &mut O) {
129 let length = u32::try_from(self.0.len())
130 .expect("All constructors guarantee the size doesn't exceed `u32`; qed");
131
132 length.encode_to(dest);
133 dest.write(&self.0);
134 }
135}
136
137impl Decode for BlockBytes {
138 #[inline]
139 fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
140 let length = u32::decode(input)?;
141 if length as usize > (RecordedHistorySegment::SIZE - size_of::<u32>()) {
142 return Err("Segment item size is impossibly large".into());
143 }
144 let mut bytes = vec![0; length as usize];
148 input.read(&mut bytes)?;
149 Ok(Self(bytes))
150 }
151}
152
153impl BlockBytes {
154 #[inline(always)]
155 fn truncate(&mut self, size: usize) {
156 self.0.truncate(size)
157 }
158}
159
160#[derive(Debug, Clone, Eq, PartialEq, Encode, Decode)]
162pub enum SegmentItem {
163 #[codec(index = 0)]
165 Padding,
166 #[codec(index = 1)]
168 Block {
169 bytes: BlockBytes,
171 #[doc(hidden)]
173 #[codec(skip)]
174 block_objects: Vec<BlockObject>,
175 },
176 #[codec(index = 2)]
179 BlockStart {
180 bytes: BlockBytes,
182 #[doc(hidden)]
184 #[codec(skip)]
185 block_objects: Vec<BlockObject>,
186 },
187 #[codec(index = 3)]
189 BlockContinuation {
190 bytes: BlockBytes,
192 #[doc(hidden)]
194 #[codec(skip)]
195 block_objects: Vec<BlockObject>,
196 },
197 #[codec(index = 4)]
199 ParentSegmentHeader(SegmentHeader),
200}
201
202#[derive(Debug, Clone, Eq, PartialEq)]
205pub struct NewArchivedSegment {
206 pub segment_header: SegmentHeader,
208 pub pieces: ArchivedHistorySegment,
210}
211
212#[derive(Debug, Clone, Eq, PartialEq)]
214pub struct ArchiveBlockOutcome {
215 pub archived_segments: Vec<NewArchivedSegment>,
218
219 pub global_objects: Vec<GlobalObject>,
224}
225
226#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, thiserror::Error)]
228pub enum ArchiverInstantiationError {
229 #[error("Invalid last archived block, its size {0} bytes is the same as the encoded block")]
232 InvalidLastArchivedBlock(u32),
233 #[error(
235 "Invalid block, its size {block_bytes} bytes is smaller than the already archived block \
236 {archived_block_bytes} bytes"
237 )]
238 InvalidBlockSmallSize {
239 block_bytes: u32,
241 archived_block_bytes: u32,
243 },
244}
245
246#[derive(Debug, Clone)]
257pub struct Archiver {
258 shard_index: ShardIndex,
259 buffer: VecDeque<SegmentItem>,
262 erasure_coding: ErasureCoding,
264 segment_index: LocalSegmentIndex,
266 prev_segment_header_hash: Blake3Hash,
268 last_archived_block: Option<LastArchivedBlock>,
270}
271
272impl Archiver {
273 pub fn new(shard_index: ShardIndex, erasure_coding: ErasureCoding) -> Self {
275 Self {
276 shard_index,
277 buffer: VecDeque::default(),
278 erasure_coding,
279 segment_index: LocalSegmentIndex::ZERO,
280 prev_segment_header_hash: Blake3Hash::default(),
281 last_archived_block: None,
282 }
283 }
284
285 pub fn with_initial_state(
289 shard_index: ShardIndex,
290 erasure_coding: ErasureCoding,
291 segment_header: SegmentHeader,
292 encoded_block: &[u8],
293 mut block_objects: Vec<BlockObject>,
294 ) -> Result<Self, ArchiverInstantiationError> {
295 let mut archiver = Self::new(shard_index, erasure_coding);
296
297 archiver.segment_index = segment_header.index.as_inner() + LocalSegmentIndex::ONE;
298 archiver.prev_segment_header_hash = segment_header.hash();
299 archiver.last_archived_block = Some(segment_header.last_archived_block);
300
301 archiver
303 .buffer
304 .push_back(SegmentItem::ParentSegmentHeader(segment_header));
305
306 if let Some(archived_block_bytes) = archiver
307 .last_archived_block
308 .expect("Just inserted; qed")
309 .partial_archived()
310 {
311 let archived_block_bytes = archived_block_bytes.get();
312 let encoded_block_bytes = u32::try_from(encoded_block.len())
313 .expect("Blocks length is never bigger than u32; qed");
314
315 match encoded_block_bytes.cmp(&archived_block_bytes) {
316 Ordering::Less => {
317 return Err(ArchiverInstantiationError::InvalidBlockSmallSize {
318 block_bytes: encoded_block_bytes,
319 archived_block_bytes,
320 });
321 }
322 Ordering::Equal => {
323 return Err(ArchiverInstantiationError::InvalidLastArchivedBlock(
324 encoded_block_bytes,
325 ));
326 }
327 Ordering::Greater => {
328 block_objects.retain_mut(|block_object: &mut BlockObject| {
331 if block_object.offset >= archived_block_bytes {
332 block_object.offset -= archived_block_bytes;
333 true
334 } else {
335 false
336 }
337 });
338 archiver.buffer.push_back(SegmentItem::BlockContinuation {
339 bytes: BlockBytes(
340 encoded_block[(archived_block_bytes as usize)..].to_vec(),
341 ),
342 block_objects,
343 });
344 }
345 }
346 }
347
348 Ok(archiver)
349 }
350
351 pub fn last_archived_block_number(&self) -> Option<BlockNumber> {
353 self.last_archived_block
354 .map(|last_archived_block| last_archived_block.number())
355 }
356
357 pub fn add_block(
368 &mut self,
369 bytes: Vec<u8>,
370 block_objects: Vec<BlockObject>,
371 ) -> Option<ArchiveBlockOutcome> {
372 if !(1..u32::MAX as usize).contains(&bytes.len()) {
373 return None;
374 }
375
376 self.buffer.push_back(SegmentItem::Block {
378 bytes: BlockBytes(bytes),
379 block_objects,
380 });
381
382 let mut archived_segments = Vec::new();
383 let mut object_mapping = Vec::new();
384
385 while let Some(mut segment) = self.produce_segment() {
387 object_mapping.extend(Self::produce_object_mappings(segment.items.iter_mut()));
389 archived_segments.push(self.produce_archived_segment(segment));
390 }
391
392 object_mapping.extend(self.produce_next_segment_mappings());
394
395 Some(ArchiveBlockOutcome {
396 archived_segments,
397 global_objects: object_mapping,
398 })
399 }
400
401 fn produce_segment(&mut self) -> Option<Segment> {
404 let mut segment = Segment {
405 items: Vec::with_capacity(self.buffer.len()),
406 };
407
408 let mut last_archived_block = self.last_archived_block;
409
410 let mut segment_size = segment.encoded_size();
411
412 while RecordedHistorySegment::SIZE.saturating_sub(segment_size) >= 6 {
417 let segment_item = match self.buffer.pop_front() {
418 Some(segment_item) => segment_item,
419 None => {
420 for segment_item in segment.items.into_iter().rev() {
422 self.buffer.push_front(segment_item);
423 }
424
425 return None;
426 }
427 };
428
429 let segment_item_encoded_size = segment_item.encoded_size();
430 segment_size += segment_item_encoded_size;
431
432 let spill_over = segment_size.saturating_sub(RecordedHistorySegment::SIZE);
434
435 let segment_item = match segment_item {
436 SegmentItem::Padding => {
437 unreachable!("Buffer never contains SegmentItem::Padding; qed");
438 }
439 SegmentItem::Block {
440 mut bytes,
441 mut block_objects,
442 } => {
443 let last_archived_block =
444 if let Some(last_archived_block) = &mut last_archived_block {
445 last_archived_block
448 .number
449 .replace(last_archived_block.number() + BlockNumber::ONE);
450 last_archived_block.set_complete();
451 last_archived_block
452 } else {
453 last_archived_block.insert(LastArchivedBlock {
455 number: BlockNumber::ZERO.into(),
456 archived_progress: ArchivedBlockProgress::new_complete(),
457 })
458 };
459
460 if spill_over == 0 {
461 SegmentItem::Block {
462 bytes,
463 block_objects,
464 }
465 } else {
466 let split_point = bytes.len() - spill_over;
467
468 {
469 let continuation_bytes = bytes[split_point..].to_vec();
470 let continuation_block_objects = block_objects
471 .extract_if(.., |block_object: &mut BlockObject| {
472 if block_object.offset >= split_point as u32 {
473 block_object.offset -= split_point as u32;
474 true
475 } else {
476 false
477 }
478 })
479 .collect();
480
481 self.buffer.push_front(SegmentItem::BlockContinuation {
484 bytes: BlockBytes(continuation_bytes),
485 block_objects: continuation_block_objects,
486 });
487 }
488
489 bytes.truncate(split_point);
490 let archived_bytes = u32::try_from(split_point)
492 .ok()
493 .and_then(NonZeroU32::new)
494 .expect(
495 "`::add_block()` method ensures block is not empty and doesn't \
496 exceed `u32::MAX`; qed",
497 );
498 last_archived_block.set_partial_archived(archived_bytes);
499
500 SegmentItem::BlockStart {
501 bytes,
502 block_objects,
503 }
504 }
505 }
506 SegmentItem::BlockStart { .. } => {
507 unreachable!("Buffer never contains SegmentItem::BlockStart; qed");
508 }
509 SegmentItem::BlockContinuation {
510 mut bytes,
511 mut block_objects,
512 } => {
513 let last_archived_block = last_archived_block.as_mut().expect(
514 "Block continuation implies that there are some bytes archived \
515 already; qed",
516 );
517
518 let previously_archived_bytes = last_archived_block.partial_archived().expect(
519 "Block continuation implies that there are some bytes archived \
520 already; qed",
521 );
522
523 if spill_over == 0 {
524 last_archived_block.set_complete();
525
526 SegmentItem::BlockContinuation {
527 bytes,
528 block_objects,
529 }
530 } else {
531 let split_point = bytes.len() - spill_over;
532
533 {
534 let continuation_bytes = bytes[split_point..].to_vec();
535 let continuation_block_objects = block_objects
536 .extract_if(.., |block_object: &mut BlockObject| {
537 if block_object.offset >= split_point as u32 {
538 block_object.offset -= split_point as u32;
539 true
540 } else {
541 false
542 }
543 })
544 .collect();
545 self.buffer.push_front(SegmentItem::BlockContinuation {
548 bytes: BlockBytes(continuation_bytes),
549 block_objects: continuation_block_objects,
550 });
551 }
552
553 bytes.truncate(split_point);
554 let archived_bytes = previously_archived_bytes.get()
556 + u32::try_from(split_point).expect(
557 "`::add_block()` method ensures block length doesn't \
558 exceed `u32::MAX`; qed",
559 );
560 let archived_bytes = NonZeroU32::new(archived_bytes).expect(
561 "Spillover means non-zero length of the block was archived; qed",
562 );
563 last_archived_block.set_partial_archived(archived_bytes);
564
565 SegmentItem::BlockContinuation {
566 bytes,
567 block_objects,
568 }
569 }
570 }
571 SegmentItem::ParentSegmentHeader(parent_segment_header) => {
572 SegmentItem::ParentSegmentHeader(parent_segment_header)
574 }
575 };
576
577 segment.items.push(segment_item);
578 }
579
580 self.last_archived_block = last_archived_block;
581
582 Some(segment)
583 }
584
585 fn produce_next_segment_mappings(&mut self) -> Vec<GlobalObject> {
591 Self::produce_object_mappings(self.buffer.iter_mut())
592 }
593
594 fn produce_object_mappings<'a>(
599 items: impl Iterator<Item = &'a mut SegmentItem>,
600 ) -> Vec<GlobalObject> {
601 let mut corrected_object_mapping = Vec::new();
602 let mut base_offset_in_segment = Segment::default().encoded_size();
603 for segment_item in items {
604 match segment_item {
605 SegmentItem::Padding => {
606 unreachable!(
607 "Segment during archiving never contains SegmentItem::Padding; qed"
608 );
609 }
610 SegmentItem::Block {
611 bytes: _,
612 block_objects,
613 }
614 | SegmentItem::BlockStart {
615 bytes: _,
616 block_objects,
617 }
618 | SegmentItem::BlockContinuation {
619 bytes: _,
620 block_objects,
621 } => {
622 for block_object in block_objects.drain(..) {
623 let offset_in_segment = base_offset_in_segment
625 + 1
626 + u32::encoded_fixed_size().expect("Fixed size; qed")
627 + block_object.offset as usize;
628 let raw_piece_offset = (offset_in_segment % Record::SIZE)
629 .try_into()
630 .expect("Offset within piece should always fit in 32-bit integer; qed");
631 corrected_object_mapping.push(GlobalObject {
632 hash: block_object.hash,
633 piece_position: PiecePosition::from(
634 (offset_in_segment / Record::SIZE) as u8,
635 ),
636 offset: raw_piece_offset,
637 });
638 }
639 }
640 SegmentItem::ParentSegmentHeader(_) => {
641 }
643 }
644
645 base_offset_in_segment += segment_item.encoded_size();
646 }
647
648 corrected_object_mapping
649 }
650
651 fn produce_archived_segment(&mut self, segment: Segment) -> NewArchivedSegment {
653 let mut pieces = {
654 let mut pieces = ArchivedHistorySegment::default();
655
656 segment.encode_to(&mut ArchivedHistorySegmentOutput {
657 segment: &mut pieces,
658 offset: 0,
659 });
660 drop(segment);
662
663 let (source_shards, parity_shards) =
664 pieces.split_at_mut(RecordedHistorySegment::NUM_RAW_RECORDS);
665
666 self.erasure_coding
667 .extend(
668 source_shards.iter().map(|shard| &shard.record),
669 parity_shards.iter_mut().map(|shard| &mut shard.record),
670 )
671 .expect("Statically correct parameters; qed");
672
673 pieces
674 };
675
676 let record_roots = {
678 #[cfg(not(feature = "parallel"))]
679 let source_pieces = pieces.iter_mut();
680 #[cfg(feature = "parallel")]
681 let source_pieces = pieces.par_iter_mut();
682
683 let iter = source_pieces.map(|piece| {
689 let [source_chunks_root, parity_chunks_root] = {
690 let mut parity_chunks = Record::new_boxed();
691
692 self.erasure_coding
693 .extend(piece.record.iter(), parity_chunks.iter_mut())
694 .expect(
695 "Erasure coding instance is deliberately configured to support this \
696 input; qed",
697 );
698
699 let source_chunks_root = *piece.record.source_chunks_root();
700 let parity_chunks_root = BalancedMerkleTree::compute_root_only(&parity_chunks);
701
702 [source_chunks_root, parity_chunks_root]
703 };
704
705 let record_root = BalancedMerkleTree::compute_root_only(&[
706 source_chunks_root,
707 parity_chunks_root,
708 ]);
709
710 piece.header.parity_chunks_root = RecordChunksRoot::from(parity_chunks_root);
711
712 record_root
713 });
714
715 iter.collect::<Vec<_>>()
716 };
717
718 let segment_merkle_tree =
719 BalancedMerkleTree::<{ ArchivedHistorySegment::NUM_PIECES }>::new_boxed(
720 record_roots
721 .as_slice()
722 .try_into()
723 .expect("Statically guaranteed to have correct length; qed"),
724 );
725
726 let segment_root = SegmentRoot::from(segment_merkle_tree.root());
727
728 let segment_header = SegmentHeader {
730 index: self.segment_index.into(),
731 root: segment_root,
732 prev_segment_header_hash: self.prev_segment_header_hash,
733 last_archived_block: self
734 .last_archived_block
735 .expect("Never empty by the time segment is produced; qed"),
736 };
737
738 pieces
740 .iter_mut()
741 .zip(segment_merkle_tree.all_proofs())
742 .for_each(|(piece, record_proof)| {
743 piece.header = PieceHeader {
744 shard_index: self.shard_index.into(),
745 local_segment_index: segment_header.index,
746 segment_root: segment_header.root,
747 record_proof: RecordProof::from(record_proof),
748 ..piece.header
749 };
750 });
751
752 self.segment_index += LocalSegmentIndex::ONE;
754 self.prev_segment_header_hash = segment_header.hash();
755
756 self.buffer
759 .push_front(SegmentItem::ParentSegmentHeader(segment_header));
760
761 NewArchivedSegment {
762 segment_header,
763 pieces: pieces.to_shared(),
764 }
765 }
766}