1use crate::objects::{BlockObject, GlobalObject};
2use ab_core_primitives::block::BlockNumber;
3use ab_core_primitives::hashes::Blake3Hash;
4use ab_core_primitives::pieces::Record;
5use ab_core_primitives::segments::{
6 ArchivedBlockProgress, ArchivedHistorySegment, LastArchivedBlock, RecordedHistorySegment,
7 SegmentHeader, SegmentIndex, SegmentRoot,
8};
9use ab_erasure_coding::ErasureCoding;
10use ab_merkle_tree::balanced_hashed::BalancedHashedMerkleTree;
11use alloc::collections::VecDeque;
12use alloc::vec;
13use alloc::vec::Vec;
14use core::cmp::Ordering;
15use core::num::NonZeroU32;
16use core::ops::Deref;
17use parity_scale_codec::{Decode, Encode, Input, Output};
18#[cfg(feature = "parallel")]
19use rayon::prelude::*;
20
21struct ArchivedHistorySegmentOutput<'a> {
22 segment: &'a mut ArchivedHistorySegment,
23 offset: usize,
24}
25
26impl Output for ArchivedHistorySegmentOutput<'_> {
27 #[inline]
28 fn write(&mut self, mut bytes: &[u8]) {
29 while !bytes.is_empty() {
30 let piece = self
31 .segment
32 .get_mut(self.offset / Record::SIZE)
33 .expect("Encoding never exceeds the segment size; qed");
34 let output = &mut piece.record_mut().as_flattened_mut()[self.offset % Record::SIZE..];
35 let bytes_to_write = output.len().min(bytes.len());
36 output[..bytes_to_write].copy_from_slice(&bytes[..bytes_to_write]);
37 self.offset += bytes_to_write;
38 bytes = &bytes[bytes_to_write..];
39 }
40 }
41}
42
43#[derive(Debug, Default, Clone, Eq, PartialEq)]
45pub struct Segment {
46 pub items: Vec<SegmentItem>,
48}
49
50impl Encode for Segment {
51 #[inline(always)]
52 fn size_hint(&self) -> usize {
53 RecordedHistorySegment::SIZE
54 }
55
56 #[inline]
57 fn encode_to<O: Output + ?Sized>(&self, dest: &mut O) {
58 for item in &self.items {
59 item.encode_to(dest);
60 }
61 }
62}
63
64impl Decode for Segment {
65 #[inline]
66 fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
67 let mut items = Vec::new();
68 loop {
69 match input.remaining_len()? {
70 Some(0) => {
71 break;
72 }
73 Some(_) => {
74 }
76 None => {
77 return Err(
78 "Source doesn't report remaining length, decoding not possible".into(),
79 );
80 }
81 }
82
83 match SegmentItem::decode(input) {
84 Ok(item) => {
85 items.push(item);
86 }
87 Err(error) => {
88 return Err(error.chain("Could not decode `Segment::items`"));
89 }
90 }
91 }
92
93 Ok(Self { items })
94 }
95}
96
97#[derive(Debug, Clone, Eq, PartialEq)]
100pub struct BlockBytes(Vec<u8>);
101
102impl Deref for BlockBytes {
103 type Target = [u8];
104
105 #[inline(always)]
106 fn deref(&self) -> &Self::Target {
107 &self.0
108 }
109}
110
111impl From<BlockBytes> for Vec<u8> {
112 #[inline(always)]
113 fn from(value: BlockBytes) -> Self {
114 value.0
115 }
116}
117
118impl Encode for BlockBytes {
119 #[inline(always)]
120 fn size_hint(&self) -> usize {
121 size_of::<u32>() + self.0.len()
122 }
123
124 #[inline]
125 fn encode_to<O: Output + ?Sized>(&self, dest: &mut O) {
126 let length = u32::try_from(self.0.len())
127 .expect("All constructors guarantee the size doesn't exceed `u32`; qed");
128
129 length.encode_to(dest);
130 dest.write(&self.0);
131 }
132}
133
134impl Decode for BlockBytes {
135 #[inline]
136 fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
137 let length = u32::decode(input)?;
138 if length as usize > (RecordedHistorySegment::SIZE - size_of::<u32>()) {
139 return Err("Segment item size is impossibly large".into());
140 }
141 let mut bytes = vec![0; length as usize];
145 input.read(&mut bytes)?;
146 Ok(Self(bytes))
147 }
148}
149
150impl BlockBytes {
151 #[inline(always)]
152 fn truncate(&mut self, size: usize) {
153 self.0.truncate(size)
154 }
155}
156
157#[derive(Debug, Clone, Eq, PartialEq, Encode, Decode)]
159pub enum SegmentItem {
160 #[codec(index = 0)]
162 Padding,
163 #[codec(index = 1)]
165 Block {
166 bytes: BlockBytes,
168 #[doc(hidden)]
170 #[codec(skip)]
171 block_objects: Vec<BlockObject>,
172 },
173 #[codec(index = 2)]
175 BlockStart {
176 bytes: BlockBytes,
178 #[doc(hidden)]
180 #[codec(skip)]
181 block_objects: Vec<BlockObject>,
182 },
183 #[codec(index = 3)]
185 BlockContinuation {
186 bytes: BlockBytes,
188 #[doc(hidden)]
190 #[codec(skip)]
191 block_objects: Vec<BlockObject>,
192 },
193 #[codec(index = 4)]
195 ParentSegmentHeader(SegmentHeader),
196}
197
198#[derive(Debug, Clone, Eq, PartialEq)]
201pub struct NewArchivedSegment {
202 pub segment_header: SegmentHeader,
204 pub pieces: ArchivedHistorySegment,
206}
207
208#[derive(Debug, Clone, Eq, PartialEq)]
210pub struct ArchiveBlockOutcome {
211 pub archived_segments: Vec<NewArchivedSegment>,
214
215 pub global_objects: Vec<GlobalObject>,
218}
219
220#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, thiserror::Error)]
222pub enum ArchiverInstantiationError {
223 #[error("Invalid last archived block, its size {0} bytes is the same as the encoded block")]
226 InvalidLastArchivedBlock(u32),
227 #[error(
229 "Invalid block, its size {block_bytes} bytes is smaller than the already archived block \
230 {archived_block_bytes} bytes"
231 )]
232 InvalidBlockSmallSize {
233 block_bytes: u32,
235 archived_block_bytes: u32,
237 },
238}
239
240#[derive(Debug, Clone)]
251pub struct Archiver {
252 buffer: VecDeque<SegmentItem>,
255 erasure_coding: ErasureCoding,
257 segment_index: SegmentIndex,
259 prev_segment_header_hash: Blake3Hash,
261 last_archived_block: Option<LastArchivedBlock>,
263}
264
265impl Archiver {
266 pub fn new(erasure_coding: ErasureCoding) -> Self {
268 Self {
269 buffer: VecDeque::default(),
270 erasure_coding,
271 segment_index: SegmentIndex::ZERO,
272 prev_segment_header_hash: Blake3Hash::default(),
273 last_archived_block: None,
274 }
275 }
276
277 pub fn with_initial_state(
281 erasure_coding: ErasureCoding,
282 segment_header: SegmentHeader,
283 encoded_block: &[u8],
284 mut block_objects: Vec<BlockObject>,
285 ) -> Result<Self, ArchiverInstantiationError> {
286 let mut archiver = Self::new(erasure_coding);
287
288 archiver.segment_index = segment_header.segment_index() + SegmentIndex::ONE;
289 archiver.prev_segment_header_hash = segment_header.hash();
290 archiver.last_archived_block = Some(segment_header.last_archived_block);
291
292 archiver
294 .buffer
295 .push_back(SegmentItem::ParentSegmentHeader(segment_header));
296
297 if let Some(archived_block_bytes) = archiver
298 .last_archived_block
299 .expect("Just inserted; qed")
300 .partial_archived()
301 {
302 let archived_block_bytes = archived_block_bytes.get();
303 let encoded_block_bytes = u32::try_from(encoded_block.len())
304 .expect("Blocks length is never bigger than u32; qed");
305
306 match encoded_block_bytes.cmp(&archived_block_bytes) {
307 Ordering::Less => {
308 return Err(ArchiverInstantiationError::InvalidBlockSmallSize {
309 block_bytes: encoded_block_bytes,
310 archived_block_bytes,
311 });
312 }
313 Ordering::Equal => {
314 return Err(ArchiverInstantiationError::InvalidLastArchivedBlock(
315 encoded_block_bytes,
316 ));
317 }
318 Ordering::Greater => {
319 block_objects.retain_mut(|block_object: &mut BlockObject| {
322 if block_object.offset >= archived_block_bytes {
323 block_object.offset -= archived_block_bytes;
324 true
325 } else {
326 false
327 }
328 });
329 archiver.buffer.push_back(SegmentItem::BlockContinuation {
330 bytes: BlockBytes(
331 encoded_block[(archived_block_bytes as usize)..].to_vec(),
332 ),
333 block_objects,
334 });
335 }
336 }
337 }
338
339 Ok(archiver)
340 }
341
342 pub fn last_archived_block_number(&self) -> Option<BlockNumber> {
344 self.last_archived_block
345 .map(|last_archived_block| last_archived_block.number())
346 }
347
348 pub fn add_block(
353 &mut self,
354 bytes: Vec<u8>,
355 block_objects: Vec<BlockObject>,
356 ) -> Option<ArchiveBlockOutcome> {
357 if !(1..u32::MAX as usize).contains(&bytes.len()) {
358 return None;
359 }
360
361 self.buffer.push_back(SegmentItem::Block {
363 bytes: BlockBytes(bytes),
364 block_objects,
365 });
366
367 let mut archived_segments = Vec::new();
368 let mut object_mapping = Vec::new();
369
370 while let Some(mut segment) = self.produce_segment() {
372 object_mapping.extend(Self::produce_object_mappings(
374 self.segment_index,
375 segment.items.iter_mut(),
376 ));
377 archived_segments.push(self.produce_archived_segment(segment));
378 }
379
380 object_mapping.extend(self.produce_next_segment_mappings());
382
383 Some(ArchiveBlockOutcome {
384 archived_segments,
385 global_objects: object_mapping,
386 })
387 }
388
389 fn produce_segment(&mut self) -> Option<Segment> {
392 let mut segment = Segment {
393 items: Vec::with_capacity(self.buffer.len()),
394 };
395
396 let mut last_archived_block = self.last_archived_block;
397
398 let mut segment_size = segment.encoded_size();
399
400 while RecordedHistorySegment::SIZE.saturating_sub(segment_size) >= 6 {
405 let segment_item = match self.buffer.pop_front() {
406 Some(segment_item) => segment_item,
407 None => {
408 for segment_item in segment.items.into_iter().rev() {
410 self.buffer.push_front(segment_item);
411 }
412
413 return None;
414 }
415 };
416
417 let segment_item_encoded_size = segment_item.encoded_size();
418 segment_size += segment_item_encoded_size;
419
420 let spill_over = segment_size
422 .checked_sub(RecordedHistorySegment::SIZE)
423 .unwrap_or_default();
424
425 let segment_item = match segment_item {
426 SegmentItem::Padding => {
427 unreachable!("Buffer never contains SegmentItem::Padding; qed");
428 }
429 SegmentItem::Block {
430 mut bytes,
431 mut block_objects,
432 } => {
433 let last_archived_block =
434 if let Some(last_archived_block) = &mut last_archived_block {
435 last_archived_block
438 .number
439 .replace(last_archived_block.number() + BlockNumber::ONE);
440 last_archived_block.set_complete();
441 last_archived_block
442 } else {
443 last_archived_block.insert(LastArchivedBlock {
445 number: BlockNumber::ZERO.into(),
446 archived_progress: ArchivedBlockProgress::new_complete(),
447 })
448 };
449
450 if spill_over == 0 {
451 SegmentItem::Block {
452 bytes,
453 block_objects,
454 }
455 } else {
456 let split_point = bytes.len() - spill_over;
457
458 {
459 let continuation_bytes = bytes[split_point..].to_vec();
460 let continuation_block_objects = block_objects
461 .extract_if(.., |block_object: &mut BlockObject| {
462 if block_object.offset >= split_point as u32 {
463 block_object.offset -= split_point as u32;
464 true
465 } else {
466 false
467 }
468 })
469 .collect();
470
471 self.buffer.push_front(SegmentItem::BlockContinuation {
474 bytes: BlockBytes(continuation_bytes),
475 block_objects: continuation_block_objects,
476 });
477 }
478
479 bytes.truncate(split_point);
480 let archived_bytes = u32::try_from(split_point)
482 .ok()
483 .and_then(NonZeroU32::new)
484 .expect(
485 "`::add_block()` method ensures block is not empty and doesn't \
486 exceed `u32::MAX`; qed",
487 );
488 last_archived_block.set_partial_archived(archived_bytes);
489
490 SegmentItem::BlockStart {
491 bytes,
492 block_objects,
493 }
494 }
495 }
496 SegmentItem::BlockStart { .. } => {
497 unreachable!("Buffer never contains SegmentItem::BlockStart; qed");
498 }
499 SegmentItem::BlockContinuation {
500 mut bytes,
501 mut block_objects,
502 } => {
503 let last_archived_block = last_archived_block.as_mut().expect(
504 "Block continuation implies that there are some bytes archived \
505 already; qed",
506 );
507
508 let previously_archived_bytes = last_archived_block.partial_archived().expect(
509 "Block continuation implies that there are some bytes archived \
510 already; qed",
511 );
512
513 if spill_over == 0 {
514 last_archived_block.set_complete();
515
516 SegmentItem::BlockContinuation {
517 bytes,
518 block_objects,
519 }
520 } else {
521 let split_point = bytes.len() - spill_over;
522
523 {
524 let continuation_bytes = bytes[split_point..].to_vec();
525 let continuation_block_objects = block_objects
526 .extract_if(.., |block_object: &mut BlockObject| {
527 if block_object.offset >= split_point as u32 {
528 block_object.offset -= split_point as u32;
529 true
530 } else {
531 false
532 }
533 })
534 .collect();
535 self.buffer.push_front(SegmentItem::BlockContinuation {
538 bytes: BlockBytes(continuation_bytes),
539 block_objects: continuation_block_objects,
540 });
541 }
542
543 bytes.truncate(split_point);
544 let archived_bytes = previously_archived_bytes.get()
546 + u32::try_from(split_point).expect(
547 "`::add_block()` method ensures block length doesn't \
548 exceed `u32::MAX`; qed",
549 );
550 let archived_bytes = NonZeroU32::new(archived_bytes).expect(
551 "Spillover means non-zero length of the block was archived; qed",
552 );
553 last_archived_block.set_partial_archived(archived_bytes);
554
555 SegmentItem::BlockContinuation {
556 bytes,
557 block_objects,
558 }
559 }
560 }
561 SegmentItem::ParentSegmentHeader(parent_segment_header) => {
562 SegmentItem::ParentSegmentHeader(parent_segment_header)
564 }
565 };
566
567 segment.items.push(segment_item);
568 }
569
570 self.last_archived_block = last_archived_block;
571
572 Some(segment)
573 }
574
575 fn produce_next_segment_mappings(&mut self) -> Vec<GlobalObject> {
581 Self::produce_object_mappings(self.segment_index, self.buffer.iter_mut())
582 }
583
584 fn produce_object_mappings<'a>(
589 segment_index: SegmentIndex,
590 items: impl Iterator<Item = &'a mut SegmentItem>,
591 ) -> Vec<GlobalObject> {
592 let source_piece_indexes =
593 &segment_index.segment_piece_indexes()[..RecordedHistorySegment::NUM_RAW_RECORDS];
594
595 let mut corrected_object_mapping = Vec::new();
596 let mut base_offset_in_segment = Segment::default().encoded_size();
597 for segment_item in items {
598 match segment_item {
599 SegmentItem::Padding => {
600 unreachable!(
601 "Segment during archiving never contains SegmentItem::Padding; qed"
602 );
603 }
604 SegmentItem::Block {
605 bytes: _,
606 block_objects,
607 }
608 | SegmentItem::BlockStart {
609 bytes: _,
610 block_objects,
611 }
612 | SegmentItem::BlockContinuation {
613 bytes: _,
614 block_objects,
615 } => {
616 for block_object in block_objects.drain(..) {
617 let offset_in_segment = base_offset_in_segment
619 + 1
620 + u32::encoded_fixed_size().expect("Fixed size; qed")
621 + block_object.offset as usize;
622 let raw_piece_offset = (offset_in_segment % Record::SIZE)
623 .try_into()
624 .expect("Offset within piece should always fit in 32-bit integer; qed");
625 corrected_object_mapping.push(GlobalObject {
626 hash: block_object.hash,
627 piece_index: source_piece_indexes[offset_in_segment / Record::SIZE],
628 offset: raw_piece_offset,
629 });
630 }
631 }
632 SegmentItem::ParentSegmentHeader(_) => {
633 }
635 }
636
637 base_offset_in_segment += segment_item.encoded_size();
638 }
639
640 corrected_object_mapping
641 }
642
643 fn produce_archived_segment(&mut self, segment: Segment) -> NewArchivedSegment {
645 let mut pieces = {
646 let mut pieces = ArchivedHistorySegment::default();
647
648 segment.encode_to(&mut ArchivedHistorySegmentOutput {
649 segment: &mut pieces,
650 offset: 0,
651 });
652 drop(segment);
654
655 let (source_shards, parity_shards) =
656 pieces.split_at_mut(RecordedHistorySegment::NUM_RAW_RECORDS);
657
658 self.erasure_coding
659 .extend(
660 source_shards.iter().map(|shard| shard.record()),
661 parity_shards.iter_mut().map(|shard| shard.record_mut()),
662 )
663 .expect("Statically correct parameters; qed");
664
665 pieces
666 };
667
668 let record_roots = {
670 #[cfg(not(feature = "parallel"))]
671 let source_pieces = pieces.iter_mut();
672 #[cfg(feature = "parallel")]
673 let source_pieces = pieces.par_iter_mut();
674
675 let iter = source_pieces.map(|piece| {
681 let [source_chunks_root, parity_chunks_root] = {
682 let mut parity_chunks = Record::new_boxed();
683
684 self.erasure_coding
685 .extend(piece.record().iter(), parity_chunks.iter_mut())
686 .expect(
687 "Erasure coding instance is deliberately configured to support this \
688 input; qed",
689 );
690
691 let source_chunks_root =
692 BalancedHashedMerkleTree::compute_root_only(piece.record());
693 let parity_chunks_root =
694 BalancedHashedMerkleTree::compute_root_only(&parity_chunks);
695
696 [source_chunks_root, parity_chunks_root]
697 };
698
699 let record_root = BalancedHashedMerkleTree::compute_root_only(&[
700 source_chunks_root,
701 parity_chunks_root,
702 ]);
703
704 piece.root_mut().copy_from_slice(&record_root);
705 piece
706 .parity_chunks_root_mut()
707 .copy_from_slice(&parity_chunks_root);
708
709 record_root
710 });
711
712 iter.collect::<Vec<_>>()
713 };
714
715 let segment_merkle_tree =
716 BalancedHashedMerkleTree::<{ ArchivedHistorySegment::NUM_PIECES }>::new_boxed(
717 record_roots
718 .as_slice()
719 .try_into()
720 .expect("Statically guaranteed to have correct length; qed"),
721 );
722
723 let segment_root = SegmentRoot::from(segment_merkle_tree.root());
724
725 pieces
727 .iter_mut()
728 .zip(segment_merkle_tree.all_proofs())
729 .for_each(|(piece, record_proof)| {
730 piece.proof_mut().copy_from_slice(&record_proof);
731 });
732
733 let segment_header = SegmentHeader {
735 segment_index: self.segment_index.into(),
736 segment_root,
737 prev_segment_header_hash: self.prev_segment_header_hash,
738 last_archived_block: self
739 .last_archived_block
740 .expect("Never empty by the time segment is produced; qed"),
741 };
742
743 self.segment_index += SegmentIndex::ONE;
745 self.prev_segment_header_hash = segment_header.hash();
746
747 self.buffer
750 .push_front(SegmentItem::ParentSegmentHeader(segment_header));
751
752 NewArchivedSegment {
753 segment_header,
754 pieces: pieces.to_shared(),
755 }
756 }
757}