1use crate::objects::{BlockObject, GlobalObject};
2use ab_core_primitives::block::BlockNumber;
3use ab_core_primitives::hashes::Blake3Hash;
4use ab_core_primitives::pieces::{
5 PieceHeader, PiecePosition, Record, RecordChunksRoot, RecordProof,
6};
7use ab_core_primitives::segments::{
8 ArchivedBlockProgress, ArchivedHistorySegment, LastArchivedBlock, LocalSegmentIndex,
9 RecordedHistorySegment, SegmentHeader, SegmentRoot,
10};
11use ab_core_primitives::shard::ShardIndex;
12use ab_erasure_coding::ErasureCoding;
13use ab_merkle_tree::balanced::BalancedMerkleTree;
14use alloc::collections::VecDeque;
15use alloc::vec;
16use alloc::vec::Vec;
17use core::cmp::Ordering;
18use core::num::NonZeroU32;
19use core::ops::Deref;
20use parity_scale_codec::{Decode, Encode, Input, Output};
21#[cfg(feature = "parallel")]
22use rayon::prelude::*;
23
24struct ArchivedHistorySegmentOutput<'a> {
25 segment: &'a mut ArchivedHistorySegment,
26 offset: usize,
27}
28
29impl Output for ArchivedHistorySegmentOutput<'_> {
30 #[inline]
31 fn write(&mut self, mut bytes: &[u8]) {
32 while !bytes.is_empty() {
33 let piece = self
34 .segment
35 .get_mut(self.offset / Record::SIZE)
36 .expect("Encoding never exceeds the segment size; qed");
37 let output = &mut piece.record.as_flattened_mut()[self.offset % Record::SIZE..];
38 let bytes_to_write = output.len().min(bytes.len());
39 output[..bytes_to_write].copy_from_slice(&bytes[..bytes_to_write]);
40 self.offset += bytes_to_write;
41 bytes = &bytes[bytes_to_write..];
42 }
43 }
44}
45
46#[derive(Debug, Default, Clone, Eq, PartialEq)]
48pub struct Segment {
49 pub items: Vec<SegmentItem>,
51}
52
53impl Encode for Segment {
54 #[inline(always)]
55 fn size_hint(&self) -> usize {
56 RecordedHistorySegment::SIZE
57 }
58
59 #[inline]
60 fn encode_to<O: Output + ?Sized>(&self, dest: &mut O) {
61 for item in &self.items {
62 item.encode_to(dest);
63 }
64 }
65}
66
67impl Decode for Segment {
68 #[inline]
69 fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
70 let mut items = Vec::new();
71 loop {
72 match input.remaining_len()? {
73 Some(0) => {
74 break;
75 }
76 Some(_) => {
77 }
79 None => {
80 return Err(
81 "Source doesn't report remaining length, decoding not possible".into(),
82 );
83 }
84 }
85
86 match SegmentItem::decode(input) {
87 Ok(item) => {
88 items.push(item);
89 }
90 Err(error) => {
91 return Err(error.chain("Could not decode `Segment::items`"));
92 }
93 }
94 }
95
96 Ok(Self { items })
97 }
98}
99
100#[derive(Debug, Clone, Eq, PartialEq)]
103pub struct BlockBytes(Vec<u8>);
104
105impl Deref for BlockBytes {
106 type Target = [u8];
107
108 #[inline(always)]
109 fn deref(&self) -> &Self::Target {
110 &self.0
111 }
112}
113
114impl From<BlockBytes> for Vec<u8> {
115 #[inline(always)]
116 fn from(value: BlockBytes) -> Self {
117 value.0
118 }
119}
120
121impl Encode for BlockBytes {
122 #[inline(always)]
123 fn size_hint(&self) -> usize {
124 size_of::<u32>() + self.0.len()
125 }
126
127 #[inline]
128 fn encode_to<O: Output + ?Sized>(&self, dest: &mut O) {
129 let length = u32::try_from(self.0.len())
130 .expect("All constructors guarantee the size doesn't exceed `u32`; qed");
131
132 length.encode_to(dest);
133 dest.write(&self.0);
134 }
135}
136
137impl Decode for BlockBytes {
138 #[inline]
139 fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
140 let length = u32::decode(input)?;
141 if length as usize > (RecordedHistorySegment::SIZE - size_of::<u32>()) {
142 return Err("Segment item size is impossibly large".into());
143 }
144 let mut bytes = vec![0; length as usize];
148 input.read(&mut bytes)?;
149 Ok(Self(bytes))
150 }
151}
152
153impl BlockBytes {
154 #[inline(always)]
155 fn truncate(&mut self, size: usize) {
156 self.0.truncate(size)
157 }
158}
159
160#[derive(Debug, Clone, Eq, PartialEq, Encode, Decode)]
162pub enum SegmentItem {
163 #[codec(index = 0)]
165 Padding,
166 #[codec(index = 1)]
168 Block {
169 bytes: BlockBytes,
171 #[doc(hidden)]
173 #[codec(skip)]
174 block_objects: Vec<BlockObject>,
175 },
176 #[codec(index = 2)]
179 BlockStart {
180 bytes: BlockBytes,
182 #[doc(hidden)]
184 #[codec(skip)]
185 block_objects: Vec<BlockObject>,
186 },
187 #[codec(index = 3)]
189 BlockContinuation {
190 bytes: BlockBytes,
192 #[doc(hidden)]
194 #[codec(skip)]
195 block_objects: Vec<BlockObject>,
196 },
197 #[codec(index = 4)]
199 ParentSegmentHeader(SegmentHeader),
200}
201
202#[derive(Debug, Clone, Eq, PartialEq)]
205pub struct NewArchivedSegment {
206 pub segment_header: SegmentHeader,
208 pub pieces: ArchivedHistorySegment,
210}
211
212#[derive(Debug, Clone, Eq, PartialEq)]
214pub struct ArchiveBlockOutcome {
215 pub archived_segments: Vec<NewArchivedSegment>,
218
219 pub global_objects: Vec<GlobalObject>,
222}
223
224#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, thiserror::Error)]
226pub enum ArchiverInstantiationError {
227 #[error("Invalid last archived block, its size {0} bytes is the same as the encoded block")]
230 InvalidLastArchivedBlock(u32),
231 #[error(
233 "Invalid block, its size {block_bytes} bytes is smaller than the already archived block \
234 {archived_block_bytes} bytes"
235 )]
236 InvalidBlockSmallSize {
237 block_bytes: u32,
239 archived_block_bytes: u32,
241 },
242}
243
244#[derive(Debug, Clone)]
255pub struct Archiver {
256 shard_index: ShardIndex,
257 buffer: VecDeque<SegmentItem>,
260 erasure_coding: ErasureCoding,
262 segment_index: LocalSegmentIndex,
264 prev_segment_header_hash: Blake3Hash,
266 last_archived_block: Option<LastArchivedBlock>,
268}
269
270impl Archiver {
271 pub fn new(shard_index: ShardIndex, erasure_coding: ErasureCoding) -> Self {
273 Self {
274 shard_index,
275 buffer: VecDeque::default(),
276 erasure_coding,
277 segment_index: LocalSegmentIndex::ZERO,
278 prev_segment_header_hash: Blake3Hash::default(),
279 last_archived_block: None,
280 }
281 }
282
283 pub fn with_initial_state(
287 shard_index: ShardIndex,
288 erasure_coding: ErasureCoding,
289 segment_header: SegmentHeader,
290 encoded_block: &[u8],
291 mut block_objects: Vec<BlockObject>,
292 ) -> Result<Self, ArchiverInstantiationError> {
293 let mut archiver = Self::new(shard_index, erasure_coding);
294
295 archiver.segment_index = segment_header.local_segment_index() + LocalSegmentIndex::ONE;
296 archiver.prev_segment_header_hash = segment_header.hash();
297 archiver.last_archived_block = Some(segment_header.last_archived_block);
298
299 archiver
301 .buffer
302 .push_back(SegmentItem::ParentSegmentHeader(segment_header));
303
304 if let Some(archived_block_bytes) = archiver
305 .last_archived_block
306 .expect("Just inserted; qed")
307 .partial_archived()
308 {
309 let archived_block_bytes = archived_block_bytes.get();
310 let encoded_block_bytes = u32::try_from(encoded_block.len())
311 .expect("Blocks length is never bigger than u32; qed");
312
313 match encoded_block_bytes.cmp(&archived_block_bytes) {
314 Ordering::Less => {
315 return Err(ArchiverInstantiationError::InvalidBlockSmallSize {
316 block_bytes: encoded_block_bytes,
317 archived_block_bytes,
318 });
319 }
320 Ordering::Equal => {
321 return Err(ArchiverInstantiationError::InvalidLastArchivedBlock(
322 encoded_block_bytes,
323 ));
324 }
325 Ordering::Greater => {
326 block_objects.retain_mut(|block_object: &mut BlockObject| {
329 if block_object.offset >= archived_block_bytes {
330 block_object.offset -= archived_block_bytes;
331 true
332 } else {
333 false
334 }
335 });
336 archiver.buffer.push_back(SegmentItem::BlockContinuation {
337 bytes: BlockBytes(
338 encoded_block[(archived_block_bytes as usize)..].to_vec(),
339 ),
340 block_objects,
341 });
342 }
343 }
344 }
345
346 Ok(archiver)
347 }
348
349 pub fn last_archived_block_number(&self) -> Option<BlockNumber> {
351 self.last_archived_block
352 .map(|last_archived_block| last_archived_block.number())
353 }
354
355 pub fn add_block(
366 &mut self,
367 bytes: Vec<u8>,
368 block_objects: Vec<BlockObject>,
369 ) -> Option<ArchiveBlockOutcome> {
370 if !(1..u32::MAX as usize).contains(&bytes.len()) {
371 return None;
372 }
373
374 self.buffer.push_back(SegmentItem::Block {
376 bytes: BlockBytes(bytes),
377 block_objects,
378 });
379
380 let mut archived_segments = Vec::new();
381 let mut object_mapping = Vec::new();
382
383 while let Some(mut segment) = self.produce_segment() {
385 object_mapping.extend(Self::produce_object_mappings(segment.items.iter_mut()));
387 archived_segments.push(self.produce_archived_segment(segment));
388 }
389
390 object_mapping.extend(self.produce_next_segment_mappings());
392
393 Some(ArchiveBlockOutcome {
394 archived_segments,
395 global_objects: object_mapping,
396 })
397 }
398
399 fn produce_segment(&mut self) -> Option<Segment> {
402 let mut segment = Segment {
403 items: Vec::with_capacity(self.buffer.len()),
404 };
405
406 let mut last_archived_block = self.last_archived_block;
407
408 let mut segment_size = segment.encoded_size();
409
410 while RecordedHistorySegment::SIZE.saturating_sub(segment_size) >= 6 {
415 let segment_item = match self.buffer.pop_front() {
416 Some(segment_item) => segment_item,
417 None => {
418 for segment_item in segment.items.into_iter().rev() {
420 self.buffer.push_front(segment_item);
421 }
422
423 return None;
424 }
425 };
426
427 let segment_item_encoded_size = segment_item.encoded_size();
428 segment_size += segment_item_encoded_size;
429
430 let spill_over = segment_size.saturating_sub(RecordedHistorySegment::SIZE);
432
433 let segment_item = match segment_item {
434 SegmentItem::Padding => {
435 unreachable!("Buffer never contains SegmentItem::Padding; qed");
436 }
437 SegmentItem::Block {
438 mut bytes,
439 mut block_objects,
440 } => {
441 let last_archived_block =
442 if let Some(last_archived_block) = &mut last_archived_block {
443 last_archived_block
446 .number
447 .replace(last_archived_block.number() + BlockNumber::ONE);
448 last_archived_block.set_complete();
449 last_archived_block
450 } else {
451 last_archived_block.insert(LastArchivedBlock {
453 number: BlockNumber::ZERO.into(),
454 archived_progress: ArchivedBlockProgress::new_complete(),
455 })
456 };
457
458 if spill_over == 0 {
459 SegmentItem::Block {
460 bytes,
461 block_objects,
462 }
463 } else {
464 let split_point = bytes.len() - spill_over;
465
466 {
467 let continuation_bytes = bytes[split_point..].to_vec();
468 let continuation_block_objects = block_objects
469 .extract_if(.., |block_object: &mut BlockObject| {
470 if block_object.offset >= split_point as u32 {
471 block_object.offset -= split_point as u32;
472 true
473 } else {
474 false
475 }
476 })
477 .collect();
478
479 self.buffer.push_front(SegmentItem::BlockContinuation {
482 bytes: BlockBytes(continuation_bytes),
483 block_objects: continuation_block_objects,
484 });
485 }
486
487 bytes.truncate(split_point);
488 let archived_bytes = u32::try_from(split_point)
490 .ok()
491 .and_then(NonZeroU32::new)
492 .expect(
493 "`::add_block()` method ensures block is not empty and doesn't \
494 exceed `u32::MAX`; qed",
495 );
496 last_archived_block.set_partial_archived(archived_bytes);
497
498 SegmentItem::BlockStart {
499 bytes,
500 block_objects,
501 }
502 }
503 }
504 SegmentItem::BlockStart { .. } => {
505 unreachable!("Buffer never contains SegmentItem::BlockStart; qed");
506 }
507 SegmentItem::BlockContinuation {
508 mut bytes,
509 mut block_objects,
510 } => {
511 let last_archived_block = last_archived_block.as_mut().expect(
512 "Block continuation implies that there are some bytes archived \
513 already; qed",
514 );
515
516 let previously_archived_bytes = last_archived_block.partial_archived().expect(
517 "Block continuation implies that there are some bytes archived \
518 already; qed",
519 );
520
521 if spill_over == 0 {
522 last_archived_block.set_complete();
523
524 SegmentItem::BlockContinuation {
525 bytes,
526 block_objects,
527 }
528 } else {
529 let split_point = bytes.len() - spill_over;
530
531 {
532 let continuation_bytes = bytes[split_point..].to_vec();
533 let continuation_block_objects = block_objects
534 .extract_if(.., |block_object: &mut BlockObject| {
535 if block_object.offset >= split_point as u32 {
536 block_object.offset -= split_point as u32;
537 true
538 } else {
539 false
540 }
541 })
542 .collect();
543 self.buffer.push_front(SegmentItem::BlockContinuation {
546 bytes: BlockBytes(continuation_bytes),
547 block_objects: continuation_block_objects,
548 });
549 }
550
551 bytes.truncate(split_point);
552 let archived_bytes = previously_archived_bytes.get()
554 + u32::try_from(split_point).expect(
555 "`::add_block()` method ensures block length doesn't \
556 exceed `u32::MAX`; qed",
557 );
558 let archived_bytes = NonZeroU32::new(archived_bytes).expect(
559 "Spillover means non-zero length of the block was archived; qed",
560 );
561 last_archived_block.set_partial_archived(archived_bytes);
562
563 SegmentItem::BlockContinuation {
564 bytes,
565 block_objects,
566 }
567 }
568 }
569 SegmentItem::ParentSegmentHeader(parent_segment_header) => {
570 SegmentItem::ParentSegmentHeader(parent_segment_header)
572 }
573 };
574
575 segment.items.push(segment_item);
576 }
577
578 self.last_archived_block = last_archived_block;
579
580 Some(segment)
581 }
582
583 fn produce_next_segment_mappings(&mut self) -> Vec<GlobalObject> {
589 Self::produce_object_mappings(self.buffer.iter_mut())
590 }
591
592 fn produce_object_mappings<'a>(
597 items: impl Iterator<Item = &'a mut SegmentItem>,
598 ) -> Vec<GlobalObject> {
599 let mut corrected_object_mapping = Vec::new();
600 let mut base_offset_in_segment = Segment::default().encoded_size();
601 for segment_item in items {
602 match segment_item {
603 SegmentItem::Padding => {
604 unreachable!(
605 "Segment during archiving never contains SegmentItem::Padding; qed"
606 );
607 }
608 SegmentItem::Block {
609 bytes: _,
610 block_objects,
611 }
612 | SegmentItem::BlockStart {
613 bytes: _,
614 block_objects,
615 }
616 | SegmentItem::BlockContinuation {
617 bytes: _,
618 block_objects,
619 } => {
620 for block_object in block_objects.drain(..) {
621 let offset_in_segment = base_offset_in_segment
623 + 1
624 + u32::encoded_fixed_size().expect("Fixed size; qed")
625 + block_object.offset as usize;
626 let raw_piece_offset = (offset_in_segment % Record::SIZE)
627 .try_into()
628 .expect("Offset within piece should always fit in 32-bit integer; qed");
629 corrected_object_mapping.push(GlobalObject {
630 hash: block_object.hash,
631 piece_position: PiecePosition::from(
632 (offset_in_segment / Record::SIZE) as u8,
633 ),
634 offset: raw_piece_offset,
635 });
636 }
637 }
638 SegmentItem::ParentSegmentHeader(_) => {
639 }
641 }
642
643 base_offset_in_segment += segment_item.encoded_size();
644 }
645
646 corrected_object_mapping
647 }
648
649 fn produce_archived_segment(&mut self, segment: Segment) -> NewArchivedSegment {
651 let mut pieces = {
652 let mut pieces = ArchivedHistorySegment::default();
653
654 segment.encode_to(&mut ArchivedHistorySegmentOutput {
655 segment: &mut pieces,
656 offset: 0,
657 });
658 drop(segment);
660
661 let (source_shards, parity_shards) =
662 pieces.split_at_mut(RecordedHistorySegment::NUM_RAW_RECORDS);
663
664 self.erasure_coding
665 .extend(
666 source_shards.iter().map(|shard| &shard.record),
667 parity_shards.iter_mut().map(|shard| &mut shard.record),
668 )
669 .expect("Statically correct parameters; qed");
670
671 pieces
672 };
673
674 let record_roots = {
676 #[cfg(not(feature = "parallel"))]
677 let source_pieces = pieces.iter_mut();
678 #[cfg(feature = "parallel")]
679 let source_pieces = pieces.par_iter_mut();
680
681 let iter = source_pieces.map(|piece| {
687 let [source_chunks_root, parity_chunks_root] = {
688 let mut parity_chunks = Record::new_boxed();
689
690 self.erasure_coding
691 .extend(piece.record.iter(), parity_chunks.iter_mut())
692 .expect(
693 "Erasure coding instance is deliberately configured to support this \
694 input; qed",
695 );
696
697 let source_chunks_root = *piece.record.source_chunks_root();
698 let parity_chunks_root = BalancedMerkleTree::compute_root_only(&parity_chunks);
699
700 [source_chunks_root, parity_chunks_root]
701 };
702
703 let record_root = BalancedMerkleTree::compute_root_only(&[
704 source_chunks_root,
705 parity_chunks_root,
706 ]);
707
708 piece.header.parity_chunks_root = RecordChunksRoot::from(parity_chunks_root);
709
710 record_root
711 });
712
713 iter.collect::<Vec<_>>()
714 };
715
716 let segment_merkle_tree =
717 BalancedMerkleTree::<{ ArchivedHistorySegment::NUM_PIECES }>::new_boxed(
718 record_roots
719 .as_slice()
720 .try_into()
721 .expect("Statically guaranteed to have correct length; qed"),
722 );
723
724 let segment_root = SegmentRoot::from(segment_merkle_tree.root());
725
726 let segment_header = SegmentHeader {
728 segment_index: self.segment_index.into(),
729 segment_root,
730 prev_segment_header_hash: self.prev_segment_header_hash,
731 last_archived_block: self
732 .last_archived_block
733 .expect("Never empty by the time segment is produced; qed"),
734 };
735
736 pieces
738 .iter_mut()
739 .zip(segment_merkle_tree.all_proofs())
740 .for_each(|(piece, record_proof)| {
741 piece.header = PieceHeader {
742 shard_index: self.shard_index.into(),
743 local_segment_index: segment_header.segment_index,
744 segment_root: segment_header.segment_root,
745 record_proof: RecordProof::from(record_proof),
746 ..piece.header
747 };
748 });
749
750 self.segment_index += LocalSegmentIndex::ONE;
752 self.prev_segment_header_hash = segment_header.hash();
753
754 self.buffer
757 .push_front(SegmentItem::ParentSegmentHeader(segment_header));
758
759 NewArchivedSegment {
760 segment_header,
761 pieces: pieces.to_shared(),
762 }
763 }
764}