ab_archiving/
archiver.rs

1use crate::objects::{BlockObject, GlobalObject};
2use ab_core_primitives::block::BlockNumber;
3use ab_core_primitives::hashes::Blake3Hash;
4use ab_core_primitives::pieces::Record;
5use ab_core_primitives::segments::{
6    ArchivedBlockProgress, ArchivedHistorySegment, LastArchivedBlock, RecordedHistorySegment,
7    SegmentHeader, SegmentIndex, SegmentRoot,
8};
9use ab_erasure_coding::ErasureCoding;
10use ab_merkle_tree::balanced_hashed::BalancedHashedMerkleTree;
11use alloc::collections::VecDeque;
12use alloc::vec;
13use alloc::vec::Vec;
14use core::cmp::Ordering;
15use core::num::NonZeroU32;
16use core::ops::Deref;
17use parity_scale_codec::{Decode, Encode, Input, Output};
18#[cfg(feature = "parallel")]
19use rayon::prelude::*;
20
21struct ArchivedHistorySegmentOutput<'a> {
22    segment: &'a mut ArchivedHistorySegment,
23    offset: usize,
24}
25
26impl Output for ArchivedHistorySegmentOutput<'_> {
27    #[inline]
28    fn write(&mut self, mut bytes: &[u8]) {
29        while !bytes.is_empty() {
30            let piece = self
31                .segment
32                .get_mut(self.offset / Record::SIZE)
33                .expect("Encoding never exceeds the segment size; qed");
34            let output = &mut piece.record_mut().as_flattened_mut()[self.offset % Record::SIZE..];
35            let bytes_to_write = output.len().min(bytes.len());
36            output[..bytes_to_write].copy_from_slice(&bytes[..bytes_to_write]);
37            self.offset += bytes_to_write;
38            bytes = &bytes[bytes_to_write..];
39        }
40    }
41}
42
43/// Segment represents a collection of items stored in archival history of the Subspace blockchain
44#[derive(Debug, Default, Clone, Eq, PartialEq)]
45pub struct Segment {
46    /// Segment items
47    pub items: Vec<SegmentItem>,
48}
49
50impl Encode for Segment {
51    #[inline(always)]
52    fn size_hint(&self) -> usize {
53        RecordedHistorySegment::SIZE
54    }
55
56    #[inline]
57    fn encode_to<O: Output + ?Sized>(&self, dest: &mut O) {
58        for item in &self.items {
59            item.encode_to(dest);
60        }
61    }
62}
63
64impl Decode for Segment {
65    #[inline]
66    fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
67        let mut items = Vec::new();
68        loop {
69            match input.remaining_len()? {
70                Some(0) => {
71                    break;
72                }
73                Some(_) => {
74                    // Processing continues below
75                }
76                None => {
77                    return Err(
78                        "Source doesn't report remaining length, decoding not possible".into(),
79                    );
80                }
81            }
82
83            match SegmentItem::decode(input) {
84                Ok(item) => {
85                    items.push(item);
86                }
87                Err(error) => {
88                    return Err(error.chain("Could not decode `Segment::items`"));
89                }
90            }
91        }
92
93        Ok(Self { items })
94    }
95}
96
97/// Similar to `Vec<u8>`, but when encoded with SCALE codec uses fixed size length encoding (as
98/// little-endian `u32`)
99#[derive(Debug, Clone, Eq, PartialEq)]
100pub struct BlockBytes(Vec<u8>);
101
102impl Deref for BlockBytes {
103    type Target = [u8];
104
105    #[inline(always)]
106    fn deref(&self) -> &Self::Target {
107        &self.0
108    }
109}
110
111impl From<BlockBytes> for Vec<u8> {
112    #[inline(always)]
113    fn from(value: BlockBytes) -> Self {
114        value.0
115    }
116}
117
118impl Encode for BlockBytes {
119    #[inline(always)]
120    fn size_hint(&self) -> usize {
121        size_of::<u32>() + self.0.len()
122    }
123
124    #[inline]
125    fn encode_to<O: Output + ?Sized>(&self, dest: &mut O) {
126        let length = u32::try_from(self.0.len())
127            .expect("All constructors guarantee the size doesn't exceed `u32`; qed");
128
129        length.encode_to(dest);
130        dest.write(&self.0);
131    }
132}
133
134impl Decode for BlockBytes {
135    #[inline]
136    fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
137        let length = u32::decode(input)?;
138        if length as usize > (RecordedHistorySegment::SIZE - size_of::<u32>()) {
139            return Err("Segment item size is impossibly large".into());
140        }
141        // TODO: It is inefficient to zero it, but there is no API for it right now and actually
142        //  implementation in `parity-scale-codec` itself is unsound:
143        //  https://github.com/paritytech/parity-scale-codec/pull/605#discussion_r2076151291
144        let mut bytes = vec![0; length as usize];
145        input.read(&mut bytes)?;
146        Ok(Self(bytes))
147    }
148}
149
150impl BlockBytes {
151    #[inline(always)]
152    fn truncate(&mut self, size: usize) {
153        self.0.truncate(size)
154    }
155}
156
157/// Kinds of items that are contained within a segment
158#[derive(Debug, Clone, Eq, PartialEq, Encode, Decode)]
159pub enum SegmentItem {
160    /// Special dummy enum variant only used as an implementation detail for padding purposes
161    #[codec(index = 0)]
162    Padding,
163    /// Contains full block inside
164    #[codec(index = 1)]
165    Block {
166        /// Block bytes
167        bytes: BlockBytes,
168        /// This is a convenience implementation detail and will not be available on decoding
169        #[doc(hidden)]
170        #[codec(skip)]
171        block_objects: Vec<BlockObject>,
172    },
173    /// Contains the beginning of the block inside, remainder will be found in subsequent segments
174    #[codec(index = 2)]
175    BlockStart {
176        /// Block bytes
177        bytes: BlockBytes,
178        /// This is a convenience implementation detail and will not be available on decoding
179        #[doc(hidden)]
180        #[codec(skip)]
181        block_objects: Vec<BlockObject>,
182    },
183    /// Continuation of the partial block spilled over into the next segment
184    #[codec(index = 3)]
185    BlockContinuation {
186        /// Block bytes
187        bytes: BlockBytes,
188        /// This is a convenience implementation detail and will not be available on decoding
189        #[doc(hidden)]
190        #[codec(skip)]
191        block_objects: Vec<BlockObject>,
192    },
193    /// Segment header of the parent
194    #[codec(index = 4)]
195    ParentSegmentHeader(SegmentHeader),
196}
197
198/// Newly archived segment as a combination of segment header and corresponding archived history
199/// segment containing pieces
200#[derive(Debug, Clone, Eq, PartialEq)]
201pub struct NewArchivedSegment {
202    /// Segment header
203    pub segment_header: SegmentHeader,
204    /// Segment of archived history containing pieces
205    pub pieces: ArchivedHistorySegment,
206}
207
208/// The outcome of adding a block to the archiver.
209#[derive(Debug, Clone, Eq, PartialEq)]
210pub struct ArchiveBlockOutcome {
211    /// The new segments archived after adding the block.
212    /// There can be zero or more segments created after each block.
213    pub archived_segments: Vec<NewArchivedSegment>,
214
215    /// The new object mappings for those segments.
216    /// There can be zero or more mappings created after each block.
217    pub global_objects: Vec<GlobalObject>,
218}
219
220/// Archiver instantiation error
221#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, thiserror::Error)]
222pub enum ArchiverInstantiationError {
223    /// Invalid last archived block, its size is the same as the encoded block
224    /// (so it should have been completely archived, not partially archived)
225    #[error("Invalid last archived block, its size {0} bytes is the same as the encoded block")]
226    InvalidLastArchivedBlock(u32),
227    /// Invalid block, its size is smaller than the already archived number of bytes
228    #[error(
229        "Invalid block, its size {block_bytes} bytes is smaller than the already archived block \
230        {archived_block_bytes} bytes"
231    )]
232    InvalidBlockSmallSize {
233        /// Full block size
234        block_bytes: u32,
235        /// Already archived portion of the block
236        archived_block_bytes: u32,
237    },
238}
239
240/// Block archiver for Subspace blockchain.
241///
242/// It takes new confirmed (at `K` depth) blocks and concatenates them into a buffer, buffer is
243/// sliced into segments of [`RecordedHistorySegment::SIZE`] size, segments are sliced into source
244/// records of [`Record::SIZE`], records are erasure coded, committed to, then roots with proofs are
245/// appended and records become pieces that are returned alongside the corresponding segment header.
246///
247/// ## Panics
248/// Panics when operating on blocks, whose length doesn't fit into u32 (should never be the case in
249/// blockchain context anyway).
250#[derive(Debug, Clone)]
251pub struct Archiver {
252    /// Buffer containing blocks and other buffered items that are pending to be included into the
253    /// next segment
254    buffer: VecDeque<SegmentItem>,
255    /// Erasure coding data structure
256    erasure_coding: ErasureCoding,
257    /// An index of the current segment
258    segment_index: SegmentIndex,
259    /// Hash of the segment header of the previous segment
260    prev_segment_header_hash: Blake3Hash,
261    /// Last archived block
262    last_archived_block: Option<LastArchivedBlock>,
263}
264
265impl Archiver {
266    /// Create a new instance
267    pub fn new(erasure_coding: ErasureCoding) -> Self {
268        Self {
269            buffer: VecDeque::default(),
270            erasure_coding,
271            segment_index: SegmentIndex::ZERO,
272            prev_segment_header_hash: Blake3Hash::default(),
273            last_archived_block: None,
274        }
275    }
276
277    /// Create a new instance of the archiver with initial state in case of restart.
278    ///
279    /// `block` corresponds to `last_archived_block` and will be processed according to its state.
280    pub fn with_initial_state(
281        erasure_coding: ErasureCoding,
282        segment_header: SegmentHeader,
283        encoded_block: &[u8],
284        mut block_objects: Vec<BlockObject>,
285    ) -> Result<Self, ArchiverInstantiationError> {
286        let mut archiver = Self::new(erasure_coding);
287
288        archiver.segment_index = segment_header.segment_index() + SegmentIndex::ONE;
289        archiver.prev_segment_header_hash = segment_header.hash();
290        archiver.last_archived_block = Some(segment_header.last_archived_block);
291
292        // The first thing in the buffer should be segment header
293        archiver
294            .buffer
295            .push_back(SegmentItem::ParentSegmentHeader(segment_header));
296
297        if let Some(archived_block_bytes) = archiver
298            .last_archived_block
299            .expect("Just inserted; qed")
300            .partial_archived()
301        {
302            let archived_block_bytes = archived_block_bytes.get();
303            let encoded_block_bytes = u32::try_from(encoded_block.len())
304                .expect("Blocks length is never bigger than u32; qed");
305
306            match encoded_block_bytes.cmp(&archived_block_bytes) {
307                Ordering::Less => {
308                    return Err(ArchiverInstantiationError::InvalidBlockSmallSize {
309                        block_bytes: encoded_block_bytes,
310                        archived_block_bytes,
311                    });
312                }
313                Ordering::Equal => {
314                    return Err(ArchiverInstantiationError::InvalidLastArchivedBlock(
315                        encoded_block_bytes,
316                    ));
317                }
318                Ordering::Greater => {
319                    // Take part of the encoded block that wasn't archived yet and push to the
320                    // buffer as a block continuation
321                    block_objects.retain_mut(|block_object: &mut BlockObject| {
322                        if block_object.offset >= archived_block_bytes {
323                            block_object.offset -= archived_block_bytes;
324                            true
325                        } else {
326                            false
327                        }
328                    });
329                    archiver.buffer.push_back(SegmentItem::BlockContinuation {
330                        bytes: BlockBytes(
331                            encoded_block[(archived_block_bytes as usize)..].to_vec(),
332                        ),
333                        block_objects,
334                    });
335                }
336            }
337        }
338
339        Ok(archiver)
340    }
341
342    /// Get last archived block if there was any
343    pub fn last_archived_block_number(&self) -> Option<BlockNumber> {
344        self.last_archived_block
345            .map(|last_archived_block| last_archived_block.number())
346    }
347
348    /// Adds new block to internal buffer, potentially producing pieces, segment headers, and
349    /// object mappings.
350    ///
351    /// Returns `None` if block is empty or larger than `u32::MAX`.
352    pub fn add_block(
353        &mut self,
354        bytes: Vec<u8>,
355        block_objects: Vec<BlockObject>,
356    ) -> Option<ArchiveBlockOutcome> {
357        if !(1..u32::MAX as usize).contains(&bytes.len()) {
358            return None;
359        }
360
361        // Append new block to the buffer
362        self.buffer.push_back(SegmentItem::Block {
363            bytes: BlockBytes(bytes),
364            block_objects,
365        });
366
367        let mut archived_segments = Vec::new();
368        let mut object_mapping = Vec::new();
369
370        // Add completed segments and their mappings for this block.
371        while let Some(mut segment) = self.produce_segment() {
372            // Produce any segment mappings that haven't already been produced.
373            object_mapping.extend(Self::produce_object_mappings(
374                self.segment_index,
375                segment.items.iter_mut(),
376            ));
377            archived_segments.push(self.produce_archived_segment(segment));
378        }
379
380        // Produce any next segment buffer mappings that haven't already been produced.
381        object_mapping.extend(self.produce_next_segment_mappings());
382
383        Some(ArchiveBlockOutcome {
384            archived_segments,
385            global_objects: object_mapping,
386        })
387    }
388
389    /// Try to slice buffer contents into segments if there is enough data, producing one segment at
390    /// a time
391    fn produce_segment(&mut self) -> Option<Segment> {
392        let mut segment = Segment {
393            items: Vec::with_capacity(self.buffer.len()),
394        };
395
396        let mut last_archived_block = self.last_archived_block;
397
398        let mut segment_size = segment.encoded_size();
399
400        // TODO: It is possible to simplify this whole loop to `if` in case "in progress" segment
401        //  with precomputed size is stored somewhere already
402        // 6 bytes is just large enough to encode a segment item (1 byte for enum variant, 4 bytes
403        // for length and 1 for the actual data, while segment header item is never the last one)
404        while RecordedHistorySegment::SIZE.saturating_sub(segment_size) >= 6 {
405            let segment_item = match self.buffer.pop_front() {
406                Some(segment_item) => segment_item,
407                None => {
408                    // Push all of the items back into the buffer, we don't have enough data yet
409                    for segment_item in segment.items.into_iter().rev() {
410                        self.buffer.push_front(segment_item);
411                    }
412
413                    return None;
414                }
415            };
416
417            let segment_item_encoded_size = segment_item.encoded_size();
418            segment_size += segment_item_encoded_size;
419
420            // Check if there is an excess of data that should be spilled over into the next segment
421            let spill_over = segment_size
422                .checked_sub(RecordedHistorySegment::SIZE)
423                .unwrap_or_default();
424
425            let segment_item = match segment_item {
426                SegmentItem::Padding => {
427                    unreachable!("Buffer never contains SegmentItem::Padding; qed");
428                }
429                SegmentItem::Block {
430                    mut bytes,
431                    mut block_objects,
432                } => {
433                    let last_archived_block =
434                        if let Some(last_archived_block) = &mut last_archived_block {
435                            // Increase the archived block number and assume the whole block was
436                            // archived (spill over checked below)
437                            last_archived_block
438                                .number
439                                .replace(last_archived_block.number() + BlockNumber::ONE);
440                            last_archived_block.set_complete();
441                            last_archived_block
442                        } else {
443                            // Genesis block
444                            last_archived_block.insert(LastArchivedBlock {
445                                number: BlockNumber::ZERO.into(),
446                                archived_progress: ArchivedBlockProgress::new_complete(),
447                            })
448                        };
449
450                    if spill_over == 0 {
451                        SegmentItem::Block {
452                            bytes,
453                            block_objects,
454                        }
455                    } else {
456                        let split_point = bytes.len() - spill_over;
457
458                        {
459                            let continuation_bytes = bytes[split_point..].to_vec();
460                            let continuation_block_objects = block_objects
461                                .extract_if(.., |block_object: &mut BlockObject| {
462                                    if block_object.offset >= split_point as u32 {
463                                        block_object.offset -= split_point as u32;
464                                        true
465                                    } else {
466                                        false
467                                    }
468                                })
469                                .collect();
470
471                            // Push a continuation element back into the buffer where the removed
472                            // segment item was
473                            self.buffer.push_front(SegmentItem::BlockContinuation {
474                                bytes: BlockBytes(continuation_bytes),
475                                block_objects: continuation_block_objects,
476                            });
477                        }
478
479                        bytes.truncate(split_point);
480                        // Update last archived block to include partial archiving info
481                        let archived_bytes = u32::try_from(split_point)
482                            .ok()
483                            .and_then(NonZeroU32::new)
484                            .expect(
485                                "`::add_block()` method ensures block is not empty and doesn't \
486                                exceed `u32::MAX`; qed",
487                            );
488                        last_archived_block.set_partial_archived(archived_bytes);
489
490                        SegmentItem::BlockStart {
491                            bytes,
492                            block_objects,
493                        }
494                    }
495                }
496                SegmentItem::BlockStart { .. } => {
497                    unreachable!("Buffer never contains SegmentItem::BlockStart; qed");
498                }
499                SegmentItem::BlockContinuation {
500                    mut bytes,
501                    mut block_objects,
502                } => {
503                    let last_archived_block = last_archived_block.as_mut().expect(
504                        "Block continuation implies that there are some bytes archived \
505                        already; qed",
506                    );
507
508                    let previously_archived_bytes = last_archived_block.partial_archived().expect(
509                        "Block continuation implies that there are some bytes archived \
510                        already; qed",
511                    );
512
513                    if spill_over == 0 {
514                        last_archived_block.set_complete();
515
516                        SegmentItem::BlockContinuation {
517                            bytes,
518                            block_objects,
519                        }
520                    } else {
521                        let split_point = bytes.len() - spill_over;
522
523                        {
524                            let continuation_bytes = bytes[split_point..].to_vec();
525                            let continuation_block_objects = block_objects
526                                .extract_if(.., |block_object: &mut BlockObject| {
527                                    if block_object.offset >= split_point as u32 {
528                                        block_object.offset -= split_point as u32;
529                                        true
530                                    } else {
531                                        false
532                                    }
533                                })
534                                .collect();
535                            // Push a continuation element back into the buffer where the removed
536                            // segment item was
537                            self.buffer.push_front(SegmentItem::BlockContinuation {
538                                bytes: BlockBytes(continuation_bytes),
539                                block_objects: continuation_block_objects,
540                            });
541                        }
542
543                        bytes.truncate(split_point);
544                        // Update last archived block to include partial archiving info
545                        let archived_bytes = previously_archived_bytes.get()
546                            + u32::try_from(split_point).expect(
547                                "`::add_block()` method ensures block length doesn't \
548                                    exceed `u32::MAX`; qed",
549                            );
550                        let archived_bytes = NonZeroU32::new(archived_bytes).expect(
551                            "Spillover means non-zero length of the block was archived; qed",
552                        );
553                        last_archived_block.set_partial_archived(archived_bytes);
554
555                        SegmentItem::BlockContinuation {
556                            bytes,
557                            block_objects,
558                        }
559                    }
560                }
561                SegmentItem::ParentSegmentHeader(parent_segment_header) => {
562                    // We are not interested in segment header here
563                    SegmentItem::ParentSegmentHeader(parent_segment_header)
564                }
565            };
566
567            segment.items.push(segment_item);
568        }
569
570        self.last_archived_block = last_archived_block;
571
572        Some(segment)
573    }
574
575    /// Produce object mappings for the buffered items for the next segment. Then remove the
576    /// mappings in those items.
577    ///
578    /// Must only be called after all complete segments for a block have been produced. Before
579    /// that, the buffer can contain a `BlockContinuation` which spans multiple segments.
580    fn produce_next_segment_mappings(&mut self) -> Vec<GlobalObject> {
581        Self::produce_object_mappings(self.segment_index, self.buffer.iter_mut())
582    }
583
584    /// Produce object mappings for `items` in `segment_index`. Then remove the mappings from those
585    /// items.
586    ///
587    /// This method can be called on a `Segment`’s items, or on the `Archiver`'s internal buffer.
588    fn produce_object_mappings<'a>(
589        segment_index: SegmentIndex,
590        items: impl Iterator<Item = &'a mut SegmentItem>,
591    ) -> Vec<GlobalObject> {
592        let source_piece_indexes =
593            &segment_index.segment_piece_indexes()[..RecordedHistorySegment::NUM_RAW_RECORDS];
594
595        let mut corrected_object_mapping = Vec::new();
596        let mut base_offset_in_segment = Segment::default().encoded_size();
597        for segment_item in items {
598            match segment_item {
599                SegmentItem::Padding => {
600                    unreachable!(
601                        "Segment during archiving never contains SegmentItem::Padding; qed"
602                    );
603                }
604                SegmentItem::Block {
605                    bytes: _,
606                    block_objects,
607                }
608                | SegmentItem::BlockStart {
609                    bytes: _,
610                    block_objects,
611                }
612                | SegmentItem::BlockContinuation {
613                    bytes: _,
614                    block_objects,
615                } => {
616                    for block_object in block_objects.drain(..) {
617                        // `+1` corresponds to `SegmentItem::X {}` enum variant encoding
618                        let offset_in_segment = base_offset_in_segment
619                            + 1
620                            + u32::encoded_fixed_size().expect("Fixed size; qed")
621                            + block_object.offset as usize;
622                        let raw_piece_offset = (offset_in_segment % Record::SIZE)
623                            .try_into()
624                            .expect("Offset within piece should always fit in 32-bit integer; qed");
625                        corrected_object_mapping.push(GlobalObject {
626                            hash: block_object.hash,
627                            piece_index: source_piece_indexes[offset_in_segment / Record::SIZE],
628                            offset: raw_piece_offset,
629                        });
630                    }
631                }
632                SegmentItem::ParentSegmentHeader(_) => {
633                    // Ignore, no object mappings here
634                }
635            }
636
637            base_offset_in_segment += segment_item.encoded_size();
638        }
639
640        corrected_object_mapping
641    }
642
643    /// Take segment as an input, apply necessary transformations and produce archived segment
644    fn produce_archived_segment(&mut self, segment: Segment) -> NewArchivedSegment {
645        let mut pieces = {
646            let mut pieces = ArchivedHistorySegment::default();
647
648            segment.encode_to(&mut ArchivedHistorySegmentOutput {
649                segment: &mut pieces,
650                offset: 0,
651            });
652            // Segment is quite big and no longer necessary
653            drop(segment);
654
655            let (source_shards, parity_shards) =
656                pieces.split_at_mut(RecordedHistorySegment::NUM_RAW_RECORDS);
657
658            self.erasure_coding
659                .extend(
660                    source_shards.iter().map(|shard| shard.record()),
661                    parity_shards.iter_mut().map(|shard| shard.record_mut()),
662                )
663                .expect("Statically correct parameters; qed");
664
665            pieces
666        };
667
668        // Collect hashes to roots from all records
669        let record_roots = {
670            #[cfg(not(feature = "parallel"))]
671            let source_pieces = pieces.iter_mut();
672            #[cfg(feature = "parallel")]
673            let source_pieces = pieces.par_iter_mut();
674
675            // Here we build a tree of record chunks, with the first half being source chunks as
676            // they are originally and the second half being parity chunks. While we build tree
677            // threes here (for source chunks, parity chunks and combined for the whole record), it
678            // could have been a single tree, and it would end up with the same root. Building them
679            // separately requires less RAM and allows to capture parity chunks root more easily.
680            let iter = source_pieces.map(|piece| {
681                let [source_chunks_root, parity_chunks_root] = {
682                    let mut parity_chunks = Record::new_boxed();
683
684                    self.erasure_coding
685                        .extend(piece.record().iter(), parity_chunks.iter_mut())
686                        .expect(
687                            "Erasure coding instance is deliberately configured to support this \
688                            input; qed",
689                        );
690
691                    let source_chunks_root =
692                        BalancedHashedMerkleTree::compute_root_only(piece.record());
693                    let parity_chunks_root =
694                        BalancedHashedMerkleTree::compute_root_only(&parity_chunks);
695
696                    [source_chunks_root, parity_chunks_root]
697                };
698
699                let record_root = BalancedHashedMerkleTree::compute_root_only(&[
700                    source_chunks_root,
701                    parity_chunks_root,
702                ]);
703
704                piece.root_mut().copy_from_slice(&record_root);
705                piece
706                    .parity_chunks_root_mut()
707                    .copy_from_slice(&parity_chunks_root);
708
709                record_root
710            });
711
712            iter.collect::<Vec<_>>()
713        };
714
715        let segment_merkle_tree =
716            BalancedHashedMerkleTree::<{ ArchivedHistorySegment::NUM_PIECES }>::new_boxed(
717                record_roots
718                    .as_slice()
719                    .try_into()
720                    .expect("Statically guaranteed to have correct length; qed"),
721            );
722
723        let segment_root = SegmentRoot::from(segment_merkle_tree.root());
724
725        // Create proof for every record and write it to corresponding piece.
726        pieces
727            .iter_mut()
728            .zip(segment_merkle_tree.all_proofs())
729            .for_each(|(piece, record_proof)| {
730                piece.proof_mut().copy_from_slice(&record_proof);
731            });
732
733        // Now produce segment header
734        let segment_header = SegmentHeader {
735            segment_index: self.segment_index.into(),
736            segment_root,
737            prev_segment_header_hash: self.prev_segment_header_hash,
738            last_archived_block: self
739                .last_archived_block
740                .expect("Never empty by the time segment is produced; qed"),
741        };
742
743        // Update state
744        self.segment_index += SegmentIndex::ONE;
745        self.prev_segment_header_hash = segment_header.hash();
746
747        // Add segment header to the beginning of the buffer to be the first thing included in the
748        // next segment
749        self.buffer
750            .push_front(SegmentItem::ParentSegmentHeader(segment_header));
751
752        NewArchivedSegment {
753            segment_header,
754            pieces: pieces.to_shared(),
755        }
756    }
757}