Skip to main content

ab_archiving/
archiver.rs

1use crate::objects::{BlockObject, GlobalObject};
2use ab_core_primitives::block::BlockNumber;
3use ab_core_primitives::hashes::Blake3Hash;
4use ab_core_primitives::pieces::{
5    PieceHeader, PiecePosition, Record, RecordChunksRoot, RecordProof,
6};
7use ab_core_primitives::segments::{
8    ArchivedBlockProgress, ArchivedHistorySegment, LastArchivedBlock, LocalSegmentIndex,
9    RecordedHistorySegment, SegmentHeader, SegmentRoot,
10};
11use ab_core_primitives::shard::ShardIndex;
12use ab_erasure_coding::ErasureCoding;
13use ab_merkle_tree::balanced::BalancedMerkleTree;
14use alloc::collections::VecDeque;
15use alloc::vec;
16use alloc::vec::Vec;
17use core::cmp::Ordering;
18use core::num::NonZeroU32;
19use core::ops::Deref;
20use parity_scale_codec::{Decode, Encode, Input, Output};
21#[cfg(feature = "parallel")]
22use rayon::prelude::*;
23
24struct ArchivedHistorySegmentOutput<'a> {
25    segment: &'a mut ArchivedHistorySegment,
26    offset: usize,
27}
28
29impl Output for ArchivedHistorySegmentOutput<'_> {
30    #[inline]
31    fn write(&mut self, mut bytes: &[u8]) {
32        while !bytes.is_empty() {
33            let piece = self
34                .segment
35                .get_mut(self.offset / Record::SIZE)
36                .expect("Encoding never exceeds the segment size; qed");
37            let output = &mut piece.record.as_flattened_mut()[self.offset % Record::SIZE..];
38            let bytes_to_write = output.len().min(bytes.len());
39            output[..bytes_to_write].copy_from_slice(&bytes[..bytes_to_write]);
40            self.offset += bytes_to_write;
41            bytes = &bytes[bytes_to_write..];
42        }
43    }
44}
45
46/// The segment represents a collection of items stored in archival history
47#[derive(Debug, Default, Clone, Eq, PartialEq)]
48pub struct Segment {
49    /// Segment items
50    pub items: Vec<SegmentItem>,
51}
52
53impl Encode for Segment {
54    #[inline(always)]
55    fn size_hint(&self) -> usize {
56        RecordedHistorySegment::SIZE
57    }
58
59    #[inline]
60    fn encode_to<O: Output + ?Sized>(&self, dest: &mut O) {
61        for item in &self.items {
62            item.encode_to(dest);
63        }
64    }
65}
66
67impl Decode for Segment {
68    #[inline]
69    fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
70        let mut items = Vec::new();
71        loop {
72            match input.remaining_len()? {
73                Some(0) => {
74                    break;
75                }
76                Some(_) => {
77                    // Processing continues below
78                }
79                None => {
80                    return Err(
81                        "Source doesn't report remaining length, decoding not possible".into(),
82                    );
83                }
84            }
85
86            match SegmentItem::decode(input) {
87                Ok(item) => {
88                    items.push(item);
89                }
90                Err(error) => {
91                    return Err(error.chain("Could not decode `Segment::items`"));
92                }
93            }
94        }
95
96        Ok(Self { items })
97    }
98}
99
100/// Similar to `Vec<u8>`, but when encoded with SCALE codec uses fixed size length encoding (as
101/// little-endian `u32`)
102#[derive(Debug, Clone, Eq, PartialEq)]
103pub struct BlockBytes(Vec<u8>);
104
105impl Deref for BlockBytes {
106    type Target = [u8];
107
108    #[inline(always)]
109    fn deref(&self) -> &Self::Target {
110        &self.0
111    }
112}
113
114impl From<BlockBytes> for Vec<u8> {
115    #[inline(always)]
116    fn from(value: BlockBytes) -> Self {
117        value.0
118    }
119}
120
121impl Encode for BlockBytes {
122    #[inline(always)]
123    fn size_hint(&self) -> usize {
124        size_of::<u32>() + self.0.len()
125    }
126
127    #[inline]
128    fn encode_to<O: Output + ?Sized>(&self, dest: &mut O) {
129        let length = u32::try_from(self.0.len())
130            .expect("All constructors guarantee the size doesn't exceed `u32`; qed");
131
132        length.encode_to(dest);
133        dest.write(&self.0);
134    }
135}
136
137impl Decode for BlockBytes {
138    #[inline]
139    fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
140        let length = u32::decode(input)?;
141        if length as usize > (RecordedHistorySegment::SIZE - size_of::<u32>()) {
142            return Err("Segment item size is impossibly large".into());
143        }
144        // TODO: It is inefficient to zero it, but there is no API for it right now and actually
145        //  implementation in `parity-scale-codec` itself is unsound:
146        //  https://github.com/paritytech/parity-scale-codec/pull/605#discussion_r2076151291
147        let mut bytes = vec![0; length as usize];
148        input.read(&mut bytes)?;
149        Ok(Self(bytes))
150    }
151}
152
153impl BlockBytes {
154    #[inline(always)]
155    fn truncate(&mut self, size: usize) {
156        self.0.truncate(size)
157    }
158}
159
160/// Kinds of items that are contained within a segment
161#[derive(Debug, Clone, Eq, PartialEq, Encode, Decode)]
162pub enum SegmentItem {
163    /// Special dummy enum variant only used as an implementation detail for padding purposes
164    #[codec(index = 0)]
165    Padding,
166    /// Contains a full block inside
167    #[codec(index = 1)]
168    Block {
169        /// Block bytes
170        bytes: BlockBytes,
171        /// This is a convenience implementation detail and will not be available on decoding
172        #[doc(hidden)]
173        #[codec(skip)]
174        block_objects: Vec<BlockObject>,
175    },
176    /// Contains the beginning of the block inside, the remainder will be found in the following
177    /// segments
178    #[codec(index = 2)]
179    BlockStart {
180        /// Block bytes
181        bytes: BlockBytes,
182        /// This is a convenience implementation detail and will not be available on decoding
183        #[doc(hidden)]
184        #[codec(skip)]
185        block_objects: Vec<BlockObject>,
186    },
187    /// Continuation of the partial block spilled over into the next segment
188    #[codec(index = 3)]
189    BlockContinuation {
190        /// Block bytes
191        bytes: BlockBytes,
192        /// This is a convenience implementation detail and will not be available on decoding
193        #[doc(hidden)]
194        #[codec(skip)]
195        block_objects: Vec<BlockObject>,
196    },
197    /// Segment header of the parent
198    #[codec(index = 4)]
199    ParentSegmentHeader(SegmentHeader),
200}
201
202/// Newly archived segment as a combination of a segment header and corresponding archived history
203/// segment containing pieces
204#[derive(Debug, Clone, Eq, PartialEq)]
205pub struct NewArchivedSegment {
206    /// Segment header
207    pub segment_header: SegmentHeader,
208    /// Segment of archived history containing pieces
209    pub pieces: ArchivedHistorySegment,
210}
211
212/// The outcome of adding a block to the archiver.
213#[derive(Debug, Clone, Eq, PartialEq)]
214pub struct ArchiveBlockOutcome {
215    /// The new segments archived after adding the block.
216    /// There can be zero or more segments created after each block.
217    pub archived_segments: Vec<NewArchivedSegment>,
218
219    // TODO: The API for global objects needs to change and be associated with segments rather than
220    //  blocks
221    /// The new object mappings for those segments.
222    /// There can be zero or more mappings created after each block.
223    pub global_objects: Vec<GlobalObject>,
224}
225
226/// Archiver instantiation error
227#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, thiserror::Error)]
228pub enum ArchiverInstantiationError {
229    /// Invalid last archived block, its size is the same as the encoded block
230    /// (so it should have been completely archived, not partially archived)
231    #[error("Invalid last archived block, its size {0} bytes is the same as the encoded block")]
232    InvalidLastArchivedBlock(u32),
233    /// Invalid block, its size is smaller than the already archived number of bytes
234    #[error(
235        "Invalid block, its size {block_bytes} bytes is smaller than the already archived block \
236        {archived_block_bytes} bytes"
237    )]
238    InvalidBlockSmallSize {
239        /// Full block size
240        block_bytes: u32,
241        /// Already archived portion of the block
242        archived_block_bytes: u32,
243    },
244}
245
246/// Block archiver.
247///
248/// It takes new confirmed (at `K` depth) blocks and concatenates them into a buffer, buffer is
249/// sliced into segments of [`RecordedHistorySegment::SIZE`] size, segments are sliced into source
250/// records of [`Record::SIZE`], records are erasure coded, committed to, then roots with proofs are
251/// appended and records become pieces that are returned alongside the corresponding segment header.
252///
253/// ## Panics
254/// Panics when operating on blocks, whose length doesn't fit into u32 (should never be the case in
255/// blockchain context anyway).
256#[derive(Debug, Clone)]
257pub struct Archiver {
258    shard_index: ShardIndex,
259    /// Buffer containing blocks and other buffered items that are pending to be included into the
260    /// next segment
261    buffer: VecDeque<SegmentItem>,
262    /// Erasure coding data structure
263    erasure_coding: ErasureCoding,
264    /// An index of the current segment
265    segment_index: LocalSegmentIndex,
266    /// Hash of the segment header of the previous segment
267    prev_segment_header_hash: Blake3Hash,
268    /// Last archived block
269    last_archived_block: Option<LastArchivedBlock>,
270}
271
272impl Archiver {
273    /// Create a new instance
274    pub fn new(shard_index: ShardIndex, erasure_coding: ErasureCoding) -> Self {
275        Self {
276            shard_index,
277            buffer: VecDeque::default(),
278            erasure_coding,
279            segment_index: LocalSegmentIndex::ZERO,
280            prev_segment_header_hash: Blake3Hash::default(),
281            last_archived_block: None,
282        }
283    }
284
285    /// Create a new instance of the archiver with the initial state in case of restart.
286    ///
287    /// `block` corresponds to `last_archived_block` and will be processed according to its state.
288    pub fn with_initial_state(
289        shard_index: ShardIndex,
290        erasure_coding: ErasureCoding,
291        segment_header: SegmentHeader,
292        encoded_block: &[u8],
293        mut block_objects: Vec<BlockObject>,
294    ) -> Result<Self, ArchiverInstantiationError> {
295        let mut archiver = Self::new(shard_index, erasure_coding);
296
297        archiver.segment_index = segment_header.index.as_inner() + LocalSegmentIndex::ONE;
298        archiver.prev_segment_header_hash = segment_header.hash();
299        archiver.last_archived_block = Some(segment_header.last_archived_block);
300
301        // The first thing in the buffer should be segment header
302        archiver
303            .buffer
304            .push_back(SegmentItem::ParentSegmentHeader(segment_header));
305
306        if let Some(archived_block_bytes) = archiver
307            .last_archived_block
308            .expect("Just inserted; qed")
309            .partial_archived()
310        {
311            let archived_block_bytes = archived_block_bytes.get();
312            let encoded_block_bytes = u32::try_from(encoded_block.len())
313                .expect("Blocks length is never bigger than u32; qed");
314
315            match encoded_block_bytes.cmp(&archived_block_bytes) {
316                Ordering::Less => {
317                    return Err(ArchiverInstantiationError::InvalidBlockSmallSize {
318                        block_bytes: encoded_block_bytes,
319                        archived_block_bytes,
320                    });
321                }
322                Ordering::Equal => {
323                    return Err(ArchiverInstantiationError::InvalidLastArchivedBlock(
324                        encoded_block_bytes,
325                    ));
326                }
327                Ordering::Greater => {
328                    // Take part of the encoded block that wasn't archived yet and push to the
329                    // buffer as a block continuation
330                    block_objects.retain_mut(|block_object: &mut BlockObject| {
331                        if block_object.offset >= archived_block_bytes {
332                            block_object.offset -= archived_block_bytes;
333                            true
334                        } else {
335                            false
336                        }
337                    });
338                    archiver.buffer.push_back(SegmentItem::BlockContinuation {
339                        bytes: BlockBytes(
340                            encoded_block[(archived_block_bytes as usize)..].to_vec(),
341                        ),
342                        block_objects,
343                    });
344                }
345            }
346        }
347
348        Ok(archiver)
349    }
350
351    /// Get the last archived block if there was any
352    pub fn last_archived_block_number(&self) -> Option<BlockNumber> {
353        self.last_archived_block
354            .map(|last_archived_block| last_archived_block.number())
355    }
356
357    /// Adds a new block to the internal buffer, potentially producing pieces, segment headers, and
358    /// object mappings.
359    ///
360    /// NOTE:
361    /// * pieces inside [`NewArchivedSegment`] are shared initially for efficient memory reuse when
362    ///   working with pieces
363    /// * pieces do not have a segment position and a segment proof filled in because it is not yet
364    ///   known at the time of local segment creation
365    ///
366    /// Returns `None` if the block is empty or larger than [`u32::MAX`].
367    pub fn add_block(
368        &mut self,
369        bytes: Vec<u8>,
370        block_objects: Vec<BlockObject>,
371    ) -> Option<ArchiveBlockOutcome> {
372        if !(1..u32::MAX as usize).contains(&bytes.len()) {
373            return None;
374        }
375
376        // Append new block to the buffer
377        self.buffer.push_back(SegmentItem::Block {
378            bytes: BlockBytes(bytes),
379            block_objects,
380        });
381
382        let mut archived_segments = Vec::new();
383        let mut object_mapping = Vec::new();
384
385        // Add completed segments and their mappings for this block
386        while let Some(mut segment) = self.produce_segment() {
387            // Produce any segment mappings that haven't already been produced.
388            object_mapping.extend(Self::produce_object_mappings(segment.items.iter_mut()));
389            archived_segments.push(self.produce_archived_segment(segment));
390        }
391
392        // Produce any next segment buffer mappings that haven't already been produced.
393        object_mapping.extend(self.produce_next_segment_mappings());
394
395        Some(ArchiveBlockOutcome {
396            archived_segments,
397            global_objects: object_mapping,
398        })
399    }
400
401    /// Try to slice buffer contents into segments if there is enough data, producing one segment at
402    /// a time
403    fn produce_segment(&mut self) -> Option<Segment> {
404        let mut segment = Segment {
405            items: Vec::with_capacity(self.buffer.len()),
406        };
407
408        let mut last_archived_block = self.last_archived_block;
409
410        let mut segment_size = segment.encoded_size();
411
412        // TODO: It is possible to simplify this whole loop to `if` in case "in progress" segment
413        //  with precomputed size is stored somewhere already
414        // 6 bytes is just large enough to encode a segment item (1 byte for enum variant, 4 bytes
415        // for length and 1 for the actual data, while segment header item is never the last one)
416        while RecordedHistorySegment::SIZE.saturating_sub(segment_size) >= 6 {
417            let segment_item = match self.buffer.pop_front() {
418                Some(segment_item) => segment_item,
419                None => {
420                    // Push all the items back into the buffer, we don't have enough data yet
421                    for segment_item in segment.items.into_iter().rev() {
422                        self.buffer.push_front(segment_item);
423                    }
424
425                    return None;
426                }
427            };
428
429            let segment_item_encoded_size = segment_item.encoded_size();
430            segment_size += segment_item_encoded_size;
431
432            // Check if there is an excess of data that should be spilled over into the next segment
433            let spill_over = segment_size.saturating_sub(RecordedHistorySegment::SIZE);
434
435            let segment_item = match segment_item {
436                SegmentItem::Padding => {
437                    unreachable!("Buffer never contains SegmentItem::Padding; qed");
438                }
439                SegmentItem::Block {
440                    mut bytes,
441                    mut block_objects,
442                } => {
443                    let last_archived_block =
444                        if let Some(last_archived_block) = &mut last_archived_block {
445                            // Increase the archived block number and assume the whole block was
446                            // archived (spill over checked below)
447                            last_archived_block
448                                .number
449                                .replace(last_archived_block.number() + BlockNumber::ONE);
450                            last_archived_block.set_complete();
451                            last_archived_block
452                        } else {
453                            // Genesis block
454                            last_archived_block.insert(LastArchivedBlock {
455                                number: BlockNumber::ZERO.into(),
456                                archived_progress: ArchivedBlockProgress::new_complete(),
457                            })
458                        };
459
460                    if spill_over == 0 {
461                        SegmentItem::Block {
462                            bytes,
463                            block_objects,
464                        }
465                    } else {
466                        let split_point = bytes.len() - spill_over;
467
468                        {
469                            let continuation_bytes = bytes[split_point..].to_vec();
470                            let continuation_block_objects = block_objects
471                                .extract_if(.., |block_object: &mut BlockObject| {
472                                    if block_object.offset >= split_point as u32 {
473                                        block_object.offset -= split_point as u32;
474                                        true
475                                    } else {
476                                        false
477                                    }
478                                })
479                                .collect();
480
481                            // Push a continuation element back into the buffer where the removed
482                            // segment item was
483                            self.buffer.push_front(SegmentItem::BlockContinuation {
484                                bytes: BlockBytes(continuation_bytes),
485                                block_objects: continuation_block_objects,
486                            });
487                        }
488
489                        bytes.truncate(split_point);
490                        // Update last archived block to include partial archiving info
491                        let archived_bytes = u32::try_from(split_point)
492                            .ok()
493                            .and_then(NonZeroU32::new)
494                            .expect(
495                                "`::add_block()` method ensures block is not empty and doesn't \
496                                exceed `u32::MAX`; qed",
497                            );
498                        last_archived_block.set_partial_archived(archived_bytes);
499
500                        SegmentItem::BlockStart {
501                            bytes,
502                            block_objects,
503                        }
504                    }
505                }
506                SegmentItem::BlockStart { .. } => {
507                    unreachable!("Buffer never contains SegmentItem::BlockStart; qed");
508                }
509                SegmentItem::BlockContinuation {
510                    mut bytes,
511                    mut block_objects,
512                } => {
513                    let last_archived_block = last_archived_block.as_mut().expect(
514                        "Block continuation implies that there are some bytes archived \
515                        already; qed",
516                    );
517
518                    let previously_archived_bytes = last_archived_block.partial_archived().expect(
519                        "Block continuation implies that there are some bytes archived \
520                        already; qed",
521                    );
522
523                    if spill_over == 0 {
524                        last_archived_block.set_complete();
525
526                        SegmentItem::BlockContinuation {
527                            bytes,
528                            block_objects,
529                        }
530                    } else {
531                        let split_point = bytes.len() - spill_over;
532
533                        {
534                            let continuation_bytes = bytes[split_point..].to_vec();
535                            let continuation_block_objects = block_objects
536                                .extract_if(.., |block_object: &mut BlockObject| {
537                                    if block_object.offset >= split_point as u32 {
538                                        block_object.offset -= split_point as u32;
539                                        true
540                                    } else {
541                                        false
542                                    }
543                                })
544                                .collect();
545                            // Push a continuation element back into the buffer where the removed
546                            // segment item was
547                            self.buffer.push_front(SegmentItem::BlockContinuation {
548                                bytes: BlockBytes(continuation_bytes),
549                                block_objects: continuation_block_objects,
550                            });
551                        }
552
553                        bytes.truncate(split_point);
554                        // Update last archived block to include partial archiving info
555                        let archived_bytes = previously_archived_bytes.get()
556                            + u32::try_from(split_point).expect(
557                                "`::add_block()` method ensures block length doesn't \
558                                    exceed `u32::MAX`; qed",
559                            );
560                        let archived_bytes = NonZeroU32::new(archived_bytes).expect(
561                            "Spillover means non-zero length of the block was archived; qed",
562                        );
563                        last_archived_block.set_partial_archived(archived_bytes);
564
565                        SegmentItem::BlockContinuation {
566                            bytes,
567                            block_objects,
568                        }
569                    }
570                }
571                SegmentItem::ParentSegmentHeader(parent_segment_header) => {
572                    // We are not interested in segment header here
573                    SegmentItem::ParentSegmentHeader(parent_segment_header)
574                }
575            };
576
577            segment.items.push(segment_item);
578        }
579
580        self.last_archived_block = last_archived_block;
581
582        Some(segment)
583    }
584
585    /// Produce object mappings for the buffered items for the next segment. Then remove the
586    /// mappings in those items.
587    ///
588    /// Must only be called after all complete segments for a block have been produced. Before
589    /// that, the buffer can contain a `BlockContinuation` which spans multiple segments.
590    fn produce_next_segment_mappings(&mut self) -> Vec<GlobalObject> {
591        Self::produce_object_mappings(self.buffer.iter_mut())
592    }
593
594    /// Produce object mappings for `items` in `segment_index`. Then remove the mappings from those
595    /// items.
596    ///
597    /// This method can be called on a `Segment`’s items, or on the `Archiver`'s internal buffer.
598    fn produce_object_mappings<'a>(
599        items: impl Iterator<Item = &'a mut SegmentItem>,
600    ) -> Vec<GlobalObject> {
601        let mut corrected_object_mapping = Vec::new();
602        let mut base_offset_in_segment = Segment::default().encoded_size();
603        for segment_item in items {
604            match segment_item {
605                SegmentItem::Padding => {
606                    unreachable!(
607                        "Segment during archiving never contains SegmentItem::Padding; qed"
608                    );
609                }
610                SegmentItem::Block {
611                    bytes: _,
612                    block_objects,
613                }
614                | SegmentItem::BlockStart {
615                    bytes: _,
616                    block_objects,
617                }
618                | SegmentItem::BlockContinuation {
619                    bytes: _,
620                    block_objects,
621                } => {
622                    for block_object in block_objects.drain(..) {
623                        // `+1` corresponds to `SegmentItem::X {}` enum variant encoding
624                        let offset_in_segment = base_offset_in_segment
625                            + 1
626                            + u32::encoded_fixed_size().expect("Fixed size; qed")
627                            + block_object.offset as usize;
628                        let raw_piece_offset = (offset_in_segment % Record::SIZE)
629                            .try_into()
630                            .expect("Offset within piece should always fit in 32-bit integer; qed");
631                        corrected_object_mapping.push(GlobalObject {
632                            hash: block_object.hash,
633                            piece_position: PiecePosition::from(
634                                (offset_in_segment / Record::SIZE) as u8,
635                            ),
636                            offset: raw_piece_offset,
637                        });
638                    }
639                }
640                SegmentItem::ParentSegmentHeader(_) => {
641                    // Ignore, no object mappings here
642                }
643            }
644
645            base_offset_in_segment += segment_item.encoded_size();
646        }
647
648        corrected_object_mapping
649    }
650
651    /// Take a segment as an input, apply necessary transformations, and produce an archived segment
652    fn produce_archived_segment(&mut self, segment: Segment) -> NewArchivedSegment {
653        let mut pieces = {
654            let mut pieces = ArchivedHistorySegment::default();
655
656            segment.encode_to(&mut ArchivedHistorySegmentOutput {
657                segment: &mut pieces,
658                offset: 0,
659            });
660            // Segment is quite big and no longer necessary
661            drop(segment);
662
663            let (source_shards, parity_shards) =
664                pieces.split_at_mut(RecordedHistorySegment::NUM_RAW_RECORDS);
665
666            self.erasure_coding
667                .extend(
668                    source_shards.iter().map(|shard| &shard.record),
669                    parity_shards.iter_mut().map(|shard| &mut shard.record),
670                )
671                .expect("Statically correct parameters; qed");
672
673            pieces
674        };
675
676        // Collect hashes to roots from all records
677        let record_roots = {
678            #[cfg(not(feature = "parallel"))]
679            let source_pieces = pieces.iter_mut();
680            #[cfg(feature = "parallel")]
681            let source_pieces = pieces.par_iter_mut();
682
683            // Here we build a tree of record chunks, with the first half being source chunks as
684            // they are originally and the second half being parity chunks. While we build tree
685            // threes here (for source chunks, parity chunks and combined for the whole record), it
686            // could have been a single tree, and it would end up with the same root. Building them
687            // separately requires less RAM and allows capturing parity chunks root more easily.
688            let iter = source_pieces.map(|piece| {
689                let [source_chunks_root, parity_chunks_root] = {
690                    let mut parity_chunks = Record::new_boxed();
691
692                    self.erasure_coding
693                        .extend(piece.record.iter(), parity_chunks.iter_mut())
694                        .expect(
695                            "Erasure coding instance is deliberately configured to support this \
696                            input; qed",
697                        );
698
699                    let source_chunks_root = *piece.record.source_chunks_root();
700                    let parity_chunks_root = BalancedMerkleTree::compute_root_only(&parity_chunks);
701
702                    [source_chunks_root, parity_chunks_root]
703                };
704
705                let record_root = BalancedMerkleTree::compute_root_only(&[
706                    source_chunks_root,
707                    parity_chunks_root,
708                ]);
709
710                piece.header.parity_chunks_root = RecordChunksRoot::from(parity_chunks_root);
711
712                record_root
713            });
714
715            iter.collect::<Vec<_>>()
716        };
717
718        let segment_merkle_tree =
719            BalancedMerkleTree::<{ ArchivedHistorySegment::NUM_PIECES }>::new_boxed(
720                record_roots
721                    .as_slice()
722                    .try_into()
723                    .expect("Statically guaranteed to have correct length; qed"),
724            );
725
726        let segment_root = SegmentRoot::from(segment_merkle_tree.root());
727
728        // Now produce segment header
729        let segment_header = SegmentHeader {
730            index: self.segment_index.into(),
731            root: segment_root,
732            prev_segment_header_hash: self.prev_segment_header_hash,
733            last_archived_block: self
734                .last_archived_block
735                .expect("Never empty by the time segment is produced; qed"),
736        };
737
738        // Create proof for every record and write it to corresponding piece.
739        pieces
740            .iter_mut()
741            .zip(segment_merkle_tree.all_proofs())
742            .for_each(|(piece, record_proof)| {
743                piece.header = PieceHeader {
744                    shard_index: self.shard_index.into(),
745                    local_segment_index: segment_header.index,
746                    segment_root: segment_header.root,
747                    record_proof: RecordProof::from(record_proof),
748                    ..piece.header
749                };
750            });
751
752        // Update state
753        self.segment_index += LocalSegmentIndex::ONE;
754        self.prev_segment_header_hash = segment_header.hash();
755
756        // Add segment header to the beginning of the buffer to be the first thing included in the
757        // next segment
758        self.buffer
759            .push_front(SegmentItem::ParentSegmentHeader(segment_header));
760
761        NewArchivedSegment {
762            segment_header,
763            pieces: pieces.to_shared(),
764        }
765    }
766}