Skip to main content

ab_archiving/
archiver.rs

1use crate::objects::{BlockObject, GlobalObject};
2use ab_core_primitives::block::BlockNumber;
3use ab_core_primitives::hashes::Blake3Hash;
4use ab_core_primitives::pieces::{
5    PieceHeader, PiecePosition, Record, RecordChunksRoot, RecordProof,
6};
7use ab_core_primitives::segments::{
8    ArchivedBlockProgress, ArchivedHistorySegment, LastArchivedBlock, LocalSegmentIndex,
9    RecordedHistorySegment, SegmentHeader, SegmentRoot,
10};
11use ab_core_primitives::shard::ShardIndex;
12use ab_erasure_coding::ErasureCoding;
13use ab_merkle_tree::balanced::BalancedMerkleTree;
14use alloc::collections::VecDeque;
15use alloc::vec;
16use alloc::vec::Vec;
17use core::cmp::Ordering;
18use core::num::NonZeroU32;
19use core::ops::Deref;
20use parity_scale_codec::{Decode, Encode, Input, Output};
21#[cfg(feature = "parallel")]
22use rayon::prelude::*;
23
24struct ArchivedHistorySegmentOutput<'a> {
25    segment: &'a mut ArchivedHistorySegment,
26    offset: usize,
27}
28
29impl Output for ArchivedHistorySegmentOutput<'_> {
30    #[inline]
31    fn write(&mut self, mut bytes: &[u8]) {
32        while !bytes.is_empty() {
33            let piece = self
34                .segment
35                .get_mut(self.offset / Record::SIZE)
36                .expect("Encoding never exceeds the segment size; qed");
37            let output = &mut piece.record.as_flattened_mut()[self.offset % Record::SIZE..];
38            let bytes_to_write = output.len().min(bytes.len());
39            output[..bytes_to_write].copy_from_slice(&bytes[..bytes_to_write]);
40            self.offset += bytes_to_write;
41            bytes = &bytes[bytes_to_write..];
42        }
43    }
44}
45
46/// The segment represents a collection of items stored in archival history
47#[derive(Debug, Default, Clone, Eq, PartialEq)]
48pub struct Segment {
49    /// Segment items
50    pub items: Vec<SegmentItem>,
51}
52
53impl Encode for Segment {
54    #[inline(always)]
55    fn size_hint(&self) -> usize {
56        RecordedHistorySegment::SIZE
57    }
58
59    #[inline]
60    fn encode_to<O>(&self, dest: &mut O)
61    where
62        O: Output + ?Sized,
63    {
64        for item in &self.items {
65            item.encode_to(dest);
66        }
67    }
68}
69
70impl Decode for Segment {
71    #[inline]
72    fn decode<I>(input: &mut I) -> Result<Self, parity_scale_codec::Error>
73    where
74        I: Input,
75    {
76        let mut items = Vec::new();
77        loop {
78            match input.remaining_len()? {
79                Some(0) => {
80                    break;
81                }
82                Some(_) => {
83                    // Processing continues below
84                }
85                None => {
86                    return Err(
87                        "Source doesn't report remaining length, decoding not possible".into(),
88                    );
89                }
90            }
91
92            match SegmentItem::decode(input) {
93                Ok(item) => {
94                    items.push(item);
95                }
96                Err(error) => {
97                    return Err(error.chain("Could not decode `Segment::items`"));
98                }
99            }
100        }
101
102        Ok(Self { items })
103    }
104}
105
106/// Similar to `Vec<u8>`, but when encoded with SCALE codec uses fixed size length encoding (as
107/// little-endian `u32`)
108#[derive(Debug, Clone, Eq, PartialEq)]
109pub struct BlockBytes(Vec<u8>);
110
111impl Deref for BlockBytes {
112    type Target = [u8];
113
114    #[inline(always)]
115    fn deref(&self) -> &Self::Target {
116        &self.0
117    }
118}
119
120impl From<BlockBytes> for Vec<u8> {
121    #[inline(always)]
122    fn from(value: BlockBytes) -> Self {
123        value.0
124    }
125}
126
127impl Encode for BlockBytes {
128    #[inline(always)]
129    fn size_hint(&self) -> usize {
130        size_of::<u32>() + self.0.len()
131    }
132
133    #[inline]
134    fn encode_to<O>(&self, dest: &mut O)
135    where
136        O: Output + ?Sized,
137    {
138        let length = u32::try_from(self.0.len())
139            .expect("All constructors guarantee the size doesn't exceed `u32`; qed");
140
141        length.encode_to(dest);
142        dest.write(&self.0);
143    }
144}
145
146impl Decode for BlockBytes {
147    #[inline]
148    fn decode<I>(input: &mut I) -> Result<Self, parity_scale_codec::Error>
149    where
150        I: Input,
151    {
152        let length = u32::decode(input)?;
153        if length as usize > (RecordedHistorySegment::SIZE - size_of::<u32>()) {
154            return Err("Segment item size is impossibly large".into());
155        }
156        // TODO: It is inefficient to zero it, but there is no API for it right now and actually
157        //  implementation in `parity-scale-codec` itself is unsound:
158        //  https://github.com/paritytech/parity-scale-codec/pull/605#discussion_r2076151291
159        let mut bytes = vec![0; length as usize];
160        input.read(&mut bytes)?;
161        Ok(Self(bytes))
162    }
163}
164
165impl BlockBytes {
166    #[inline(always)]
167    fn truncate(&mut self, size: usize) {
168        self.0.truncate(size);
169    }
170}
171
172/// Kinds of items that are contained within a segment
173#[derive(Debug, Clone, Eq, PartialEq, Encode, Decode)]
174pub enum SegmentItem {
175    /// Special dummy enum variant only used as an implementation detail for padding purposes
176    #[codec(index = 0)]
177    Padding,
178    /// Contains a full block inside
179    #[codec(index = 1)]
180    Block {
181        /// Block bytes
182        bytes: BlockBytes,
183        /// This is a convenience implementation detail and will not be available on decoding
184        #[doc(hidden)]
185        #[codec(skip)]
186        block_objects: Vec<BlockObject>,
187    },
188    /// Contains the beginning of the block inside, the remainder will be found in the following
189    /// segments
190    #[codec(index = 2)]
191    BlockStart {
192        /// Block bytes
193        bytes: BlockBytes,
194        /// This is a convenience implementation detail and will not be available on decoding
195        #[doc(hidden)]
196        #[codec(skip)]
197        block_objects: Vec<BlockObject>,
198    },
199    /// Continuation of the partial block spilled over into the next segment
200    #[codec(index = 3)]
201    BlockContinuation {
202        /// Block bytes
203        bytes: BlockBytes,
204        /// This is a convenience implementation detail and will not be available on decoding
205        #[doc(hidden)]
206        #[codec(skip)]
207        block_objects: Vec<BlockObject>,
208    },
209    /// Segment header of the parent
210    #[codec(index = 4)]
211    ParentSegmentHeader(SegmentHeader),
212}
213
214/// Newly archived segment as a combination of a segment header and corresponding archived history
215/// segment containing pieces
216#[derive(Debug, Clone, Eq, PartialEq)]
217pub struct NewArchivedSegment {
218    /// Segment header
219    pub segment_header: SegmentHeader,
220    /// Segment of archived history containing pieces
221    pub pieces: ArchivedHistorySegment,
222}
223
224/// The outcome of adding a block to the archiver.
225#[derive(Debug, Clone, Eq, PartialEq)]
226pub struct ArchiveBlockOutcome {
227    /// The new segments archived after adding the block.
228    /// There can be zero or more segments created after each block.
229    pub archived_segments: Vec<NewArchivedSegment>,
230
231    // TODO: The API for global objects needs to change and be associated with segments rather than
232    //  blocks
233    /// The new object mappings for those segments.
234    /// There can be zero or more mappings created after each block.
235    pub global_objects: Vec<GlobalObject>,
236}
237
238/// Archiver instantiation error
239#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, thiserror::Error)]
240pub enum ArchiverInstantiationError {
241    /// Invalid last archived block, its size is the same as the encoded block
242    /// (so it should have been completely archived, not partially archived)
243    #[error("Invalid last archived block, its size {0} bytes is the same as the encoded block")]
244    InvalidLastArchivedBlock(u32),
245    /// Invalid block, its size is smaller than the already archived number of bytes
246    #[error(
247        "Invalid block, its size {block_bytes} bytes is smaller than the already archived block \
248        {archived_block_bytes} bytes"
249    )]
250    InvalidBlockSmallSize {
251        /// Full block size
252        block_bytes: u32,
253        /// Already archived portion of the block
254        archived_block_bytes: u32,
255    },
256}
257
258/// Block archiver.
259///
260/// It takes new confirmed (at `K` depth) blocks and concatenates them into a buffer, buffer is
261/// sliced into segments of [`RecordedHistorySegment::SIZE`] size, segments are sliced into source
262/// records of [`Record::SIZE`], records are erasure coded, committed to, then roots with proofs are
263/// appended and records become pieces that are returned alongside the corresponding segment header.
264///
265/// ## Panics
266/// Panics when operating on blocks, whose length doesn't fit into u32 (should never be the case in
267/// blockchain context anyway).
268#[derive(Debug, Clone)]
269pub struct Archiver {
270    shard_index: ShardIndex,
271    /// Buffer containing blocks and other buffered items that are pending to be included into the
272    /// next segment
273    buffer: VecDeque<SegmentItem>,
274    /// Erasure coding data structure
275    erasure_coding: ErasureCoding,
276    /// An index of the current segment
277    segment_index: LocalSegmentIndex,
278    /// Hash of the segment header of the previous segment
279    prev_segment_header_hash: Blake3Hash,
280    /// Last archived block
281    last_archived_block: Option<LastArchivedBlock>,
282}
283
284impl Archiver {
285    /// Create a new instance
286    pub fn new(shard_index: ShardIndex, erasure_coding: ErasureCoding) -> Self {
287        Self {
288            shard_index,
289            buffer: VecDeque::default(),
290            erasure_coding,
291            segment_index: LocalSegmentIndex::ZERO,
292            prev_segment_header_hash: Blake3Hash::default(),
293            last_archived_block: None,
294        }
295    }
296
297    /// Create a new instance of the archiver with the initial state in case of restart.
298    ///
299    /// `block` corresponds to `last_archived_block` and will be processed according to its state.
300    pub fn with_initial_state(
301        shard_index: ShardIndex,
302        erasure_coding: ErasureCoding,
303        segment_header: SegmentHeader,
304        encoded_block: &[u8],
305        mut block_objects: Vec<BlockObject>,
306    ) -> Result<Self, ArchiverInstantiationError> {
307        let mut archiver = Self::new(shard_index, erasure_coding);
308
309        archiver.segment_index = segment_header.index.as_inner() + LocalSegmentIndex::ONE;
310        archiver.prev_segment_header_hash = segment_header.hash();
311        archiver.last_archived_block = Some(segment_header.last_archived_block);
312
313        // The first thing in the buffer should be segment header
314        archiver
315            .buffer
316            .push_back(SegmentItem::ParentSegmentHeader(segment_header));
317
318        if let Some(archived_block_bytes) = archiver
319            .last_archived_block
320            .expect("Just inserted; qed")
321            .partial_archived()
322        {
323            let archived_block_bytes = archived_block_bytes.get();
324            let encoded_block_bytes = u32::try_from(encoded_block.len())
325                .expect("Blocks length is never bigger than u32; qed");
326
327            match encoded_block_bytes.cmp(&archived_block_bytes) {
328                Ordering::Less => {
329                    return Err(ArchiverInstantiationError::InvalidBlockSmallSize {
330                        block_bytes: encoded_block_bytes,
331                        archived_block_bytes,
332                    });
333                }
334                Ordering::Equal => {
335                    return Err(ArchiverInstantiationError::InvalidLastArchivedBlock(
336                        encoded_block_bytes,
337                    ));
338                }
339                Ordering::Greater => {
340                    // Take part of the encoded block that wasn't archived yet and push to the
341                    // buffer as a block continuation
342                    block_objects.retain_mut(|block_object: &mut BlockObject| {
343                        if block_object.offset >= archived_block_bytes {
344                            block_object.offset -= archived_block_bytes;
345                            true
346                        } else {
347                            false
348                        }
349                    });
350                    archiver.buffer.push_back(SegmentItem::BlockContinuation {
351                        bytes: BlockBytes(
352                            encoded_block[(archived_block_bytes as usize)..].to_vec(),
353                        ),
354                        block_objects,
355                    });
356                }
357            }
358        }
359
360        Ok(archiver)
361    }
362
363    /// Get the last archived block if there was any
364    pub fn last_archived_block_number(&self) -> Option<BlockNumber> {
365        self.last_archived_block
366            .map(|last_archived_block| last_archived_block.number())
367    }
368
369    /// Adds a new block to the internal buffer, potentially producing pieces, segment headers, and
370    /// object mappings.
371    ///
372    /// NOTE:
373    /// * pieces inside [`NewArchivedSegment`] are shared initially for efficient memory reuse when
374    ///   working with pieces
375    /// * pieces do not have a segment position and a segment proof filled in because it is not yet
376    ///   known at the time of local segment creation
377    ///
378    /// Returns `None` if the block is empty or larger than [`u32::MAX`].
379    pub fn add_block(
380        &mut self,
381        bytes: Vec<u8>,
382        block_objects: Vec<BlockObject>,
383    ) -> Option<ArchiveBlockOutcome> {
384        if !(1..u32::MAX as usize).contains(&bytes.len()) {
385            return None;
386        }
387
388        // Append new block to the buffer
389        self.buffer.push_back(SegmentItem::Block {
390            bytes: BlockBytes(bytes),
391            block_objects,
392        });
393
394        let mut archived_segments = Vec::new();
395        let mut object_mapping = Vec::new();
396
397        // Add completed segments and their mappings for this block
398        while let Some(mut segment) = self.produce_segment() {
399            // Produce any segment mappings that haven't already been produced.
400            object_mapping.extend(Self::produce_object_mappings(segment.items.iter_mut()));
401            archived_segments.push(self.produce_archived_segment(segment));
402        }
403
404        // Produce any next segment buffer mappings that haven't already been produced.
405        object_mapping.extend(self.produce_next_segment_mappings());
406
407        Some(ArchiveBlockOutcome {
408            archived_segments,
409            global_objects: object_mapping,
410        })
411    }
412
413    /// Try to slice buffer contents into segments if there is enough data, producing one segment at
414    /// a time
415    fn produce_segment(&mut self) -> Option<Segment> {
416        let mut segment = Segment {
417            items: Vec::with_capacity(self.buffer.len()),
418        };
419
420        let mut last_archived_block = self.last_archived_block;
421
422        let mut segment_size = segment.encoded_size();
423
424        // TODO: It is possible to simplify this whole loop to `if` in case "in progress" segment
425        //  with precomputed size is stored somewhere already
426        // 6 bytes is just large enough to encode a segment item (1 byte for enum variant, 4 bytes
427        // for length and 1 for the actual data, while segment header item is never the last one)
428        while RecordedHistorySegment::SIZE.saturating_sub(segment_size) >= 6 {
429            let Some(segment_item) = self.buffer.pop_front() else {
430                // Push all the items back into the buffer, we don't have enough data yet
431                for segment_item in segment.items.into_iter().rev() {
432                    self.buffer.push_front(segment_item);
433                }
434
435                return None;
436            };
437
438            let segment_item_encoded_size = segment_item.encoded_size();
439            segment_size += segment_item_encoded_size;
440
441            // Check if there is an excess of data that should be spilled over into the next segment
442            let spill_over = segment_size.saturating_sub(RecordedHistorySegment::SIZE);
443
444            let segment_item = match segment_item {
445                SegmentItem::Padding => {
446                    unreachable!("Buffer never contains SegmentItem::Padding; qed");
447                }
448                SegmentItem::Block {
449                    mut bytes,
450                    mut block_objects,
451                } => {
452                    let last_archived_block =
453                        if let Some(last_archived_block) = &mut last_archived_block {
454                            // Increase the archived block number and assume the whole block was
455                            // archived (spill over checked below)
456                            last_archived_block
457                                .number
458                                .replace(last_archived_block.number() + BlockNumber::ONE);
459                            last_archived_block.set_complete();
460                            last_archived_block
461                        } else {
462                            // Genesis block
463                            last_archived_block.insert(LastArchivedBlock {
464                                number: BlockNumber::ZERO.into(),
465                                archived_progress: ArchivedBlockProgress::new_complete(),
466                            })
467                        };
468
469                    if spill_over == 0 {
470                        SegmentItem::Block {
471                            bytes,
472                            block_objects,
473                        }
474                    } else {
475                        let split_point = bytes.len() - spill_over;
476
477                        {
478                            let continuation_bytes = bytes[split_point..].to_vec();
479                            let continuation_block_objects = block_objects
480                                .extract_if(.., |block_object: &mut BlockObject| {
481                                    if block_object.offset >= split_point as u32 {
482                                        block_object.offset -= split_point as u32;
483                                        true
484                                    } else {
485                                        false
486                                    }
487                                })
488                                .collect();
489
490                            // Push a continuation element back into the buffer where the removed
491                            // segment item was
492                            self.buffer.push_front(SegmentItem::BlockContinuation {
493                                bytes: BlockBytes(continuation_bytes),
494                                block_objects: continuation_block_objects,
495                            });
496                        }
497
498                        bytes.truncate(split_point);
499                        // Update last archived block to include partial archiving info
500                        let archived_bytes = u32::try_from(split_point)
501                            .ok()
502                            .and_then(NonZeroU32::new)
503                            .expect(
504                                "`::add_block()` method ensures block is not empty and doesn't \
505                                exceed `u32::MAX`; qed",
506                            );
507                        last_archived_block.set_partial_archived(archived_bytes);
508
509                        SegmentItem::BlockStart {
510                            bytes,
511                            block_objects,
512                        }
513                    }
514                }
515                SegmentItem::BlockStart { .. } => {
516                    unreachable!("Buffer never contains SegmentItem::BlockStart; qed");
517                }
518                SegmentItem::BlockContinuation {
519                    mut bytes,
520                    mut block_objects,
521                } => {
522                    let last_archived_block = last_archived_block.as_mut().expect(
523                        "Block continuation implies that there are some bytes archived \
524                        already; qed",
525                    );
526
527                    let previously_archived_bytes = last_archived_block.partial_archived().expect(
528                        "Block continuation implies that there are some bytes archived \
529                        already; qed",
530                    );
531
532                    if spill_over == 0 {
533                        last_archived_block.set_complete();
534
535                        SegmentItem::BlockContinuation {
536                            bytes,
537                            block_objects,
538                        }
539                    } else {
540                        let split_point = bytes.len() - spill_over;
541
542                        {
543                            let continuation_bytes = bytes[split_point..].to_vec();
544                            let continuation_block_objects = block_objects
545                                .extract_if(.., |block_object: &mut BlockObject| {
546                                    if block_object.offset >= split_point as u32 {
547                                        block_object.offset -= split_point as u32;
548                                        true
549                                    } else {
550                                        false
551                                    }
552                                })
553                                .collect();
554                            // Push a continuation element back into the buffer where the removed
555                            // segment item was
556                            self.buffer.push_front(SegmentItem::BlockContinuation {
557                                bytes: BlockBytes(continuation_bytes),
558                                block_objects: continuation_block_objects,
559                            });
560                        }
561
562                        bytes.truncate(split_point);
563                        // Update last archived block to include partial archiving info
564                        let archived_bytes = previously_archived_bytes.get()
565                            + u32::try_from(split_point).expect(
566                                "`::add_block()` method ensures block length doesn't \
567                                    exceed `u32::MAX`; qed",
568                            );
569                        let archived_bytes = NonZeroU32::new(archived_bytes).expect(
570                            "Spillover means non-zero length of the block was archived; qed",
571                        );
572                        last_archived_block.set_partial_archived(archived_bytes);
573
574                        SegmentItem::BlockContinuation {
575                            bytes,
576                            block_objects,
577                        }
578                    }
579                }
580                SegmentItem::ParentSegmentHeader(parent_segment_header) => {
581                    // We are not interested in segment header here
582                    SegmentItem::ParentSegmentHeader(parent_segment_header)
583                }
584            };
585
586            segment.items.push(segment_item);
587        }
588
589        self.last_archived_block = last_archived_block;
590
591        Some(segment)
592    }
593
594    /// Produce object mappings for the buffered items for the next segment. Then remove the
595    /// mappings in those items.
596    ///
597    /// Must only be called after all complete segments for a block have been produced. Before
598    /// that, the buffer can contain a `BlockContinuation` which spans multiple segments.
599    fn produce_next_segment_mappings(&mut self) -> Vec<GlobalObject> {
600        Self::produce_object_mappings(self.buffer.iter_mut())
601    }
602
603    /// Produce object mappings for `items` in `segment_index`. Then remove the mappings from those
604    /// items.
605    ///
606    /// This method can be called on a `Segment`’s items, or on the `Archiver`'s internal buffer.
607    fn produce_object_mappings<'a>(
608        items: impl Iterator<Item = &'a mut SegmentItem>,
609    ) -> Vec<GlobalObject> {
610        let mut corrected_object_mapping = Vec::new();
611        let mut base_offset_in_segment = Segment::default().encoded_size();
612        for segment_item in items {
613            match segment_item {
614                SegmentItem::Padding => {
615                    unreachable!(
616                        "Segment during archiving never contains SegmentItem::Padding; qed"
617                    );
618                }
619                SegmentItem::Block {
620                    bytes: _,
621                    block_objects,
622                }
623                | SegmentItem::BlockStart {
624                    bytes: _,
625                    block_objects,
626                }
627                | SegmentItem::BlockContinuation {
628                    bytes: _,
629                    block_objects,
630                } => {
631                    for block_object in block_objects.drain(..) {
632                        // `+1` corresponds to `SegmentItem::X {}` enum variant encoding
633                        let offset_in_segment = base_offset_in_segment
634                            + 1
635                            + u32::encoded_fixed_size().expect("Fixed size; qed")
636                            + block_object.offset as usize;
637                        let raw_piece_offset = (offset_in_segment % Record::SIZE)
638                            .try_into()
639                            .expect("Offset within piece should always fit in 32-bit integer; qed");
640                        corrected_object_mapping.push(GlobalObject {
641                            hash: block_object.hash,
642                            piece_position: PiecePosition::from(
643                                (offset_in_segment / Record::SIZE) as u8,
644                            ),
645                            offset: raw_piece_offset,
646                        });
647                    }
648                }
649                SegmentItem::ParentSegmentHeader(_) => {
650                    // Ignore, no object mappings here
651                }
652            }
653
654            base_offset_in_segment += segment_item.encoded_size();
655        }
656
657        corrected_object_mapping
658    }
659
660    /// Take a segment as an input, apply necessary transformations, and produce an archived segment
661    fn produce_archived_segment(&mut self, segment: Segment) -> NewArchivedSegment {
662        let mut pieces = {
663            let mut pieces = ArchivedHistorySegment::default();
664
665            segment.encode_to(&mut ArchivedHistorySegmentOutput {
666                segment: &mut pieces,
667                offset: 0,
668            });
669            // Segment is quite big and no longer necessary
670            drop(segment);
671
672            let (source_shards, parity_shards) =
673                pieces.split_at_mut(RecordedHistorySegment::NUM_RAW_RECORDS);
674
675            self.erasure_coding
676                .extend(
677                    source_shards.iter().map(|shard| &shard.record),
678                    parity_shards.iter_mut().map(|shard| &mut shard.record),
679                )
680                .expect("Statically correct parameters; qed");
681
682            pieces
683        };
684
685        // Collect hashes to roots from all records
686        let record_roots = {
687            #[cfg(not(feature = "parallel"))]
688            let source_pieces = pieces.iter_mut();
689            #[cfg(feature = "parallel")]
690            let source_pieces = pieces.par_iter_mut();
691
692            // Here we build a tree of record chunks, with the first half being source chunks as
693            // they are originally and the second half being parity chunks. While we build tree
694            // threes here (for source chunks, parity chunks and combined for the whole record), it
695            // could have been a single tree, and it would end up with the same root. Building them
696            // separately requires less RAM and allows capturing parity chunks root more easily.
697            let iter = source_pieces.map(|piece| {
698                let [source_chunks_root, parity_chunks_root] = {
699                    let mut parity_chunks = Record::new_boxed();
700
701                    self.erasure_coding
702                        .extend(piece.record.iter(), parity_chunks.iter_mut())
703                        .expect(
704                            "Erasure coding instance is deliberately configured to support this \
705                            input; qed",
706                        );
707
708                    let source_chunks_root = *piece.record.source_chunks_root();
709                    let parity_chunks_root = BalancedMerkleTree::compute_root_only(&parity_chunks);
710
711                    [source_chunks_root, parity_chunks_root]
712                };
713
714                let record_root = BalancedMerkleTree::compute_root_only(&[
715                    source_chunks_root,
716                    parity_chunks_root,
717                ]);
718
719                piece.header.parity_chunks_root = RecordChunksRoot::from(parity_chunks_root);
720
721                record_root
722            });
723
724            iter.collect::<Vec<_>>()
725        };
726
727        let segment_merkle_tree =
728            BalancedMerkleTree::<{ ArchivedHistorySegment::NUM_PIECES }>::new_boxed(
729                record_roots
730                    .as_slice()
731                    .try_into()
732                    .expect("Statically guaranteed to have correct length; qed"),
733            );
734
735        let segment_root = SegmentRoot::from(segment_merkle_tree.root());
736
737        // Now produce segment header
738        let segment_header = SegmentHeader {
739            index: self.segment_index.into(),
740            root: segment_root,
741            prev_segment_header_hash: self.prev_segment_header_hash,
742            last_archived_block: self
743                .last_archived_block
744                .expect("Never empty by the time segment is produced; qed"),
745        };
746
747        // Create proof for every record and write it to corresponding piece.
748        pieces
749            .iter_mut()
750            .zip(segment_merkle_tree.all_proofs())
751            .for_each(|(piece, record_proof)| {
752                piece.header = PieceHeader {
753                    shard_index: self.shard_index.into(),
754                    local_segment_index: segment_header.index,
755                    segment_root: segment_header.root,
756                    record_proof: RecordProof::from(record_proof),
757                    ..piece.header
758                };
759            });
760
761        // Update state
762        self.segment_index += LocalSegmentIndex::ONE;
763        self.prev_segment_header_hash = segment_header.hash();
764
765        // Add segment header to the beginning of the buffer to be the first thing included in the
766        // next segment
767        self.buffer
768            .push_front(SegmentItem::ParentSegmentHeader(segment_header));
769
770        NewArchivedSegment {
771            segment_header,
772            pieces: pieces.to_shared(),
773        }
774    }
775}