Skip to main content

ab_archiving/
archiver.rs

1use crate::objects::{BlockObject, GlobalObject};
2use ab_core_primitives::block::BlockNumber;
3use ab_core_primitives::hashes::Blake3Hash;
4use ab_core_primitives::pieces::{
5    PieceHeader, PiecePosition, Record, RecordChunksRoot, RecordProof,
6};
7use ab_core_primitives::segments::{
8    ArchivedBlockProgress, ArchivedHistorySegment, LastArchivedBlock, LocalSegmentIndex,
9    RecordedHistorySegment, SegmentHeader, SegmentRoot,
10};
11use ab_core_primitives::shard::ShardIndex;
12use ab_erasure_coding::ErasureCoding;
13use ab_merkle_tree::balanced::BalancedMerkleTree;
14use alloc::collections::VecDeque;
15use alloc::vec;
16use alloc::vec::Vec;
17use core::cmp::Ordering;
18use core::num::NonZeroU32;
19use core::ops::Deref;
20use parity_scale_codec::{Decode, Encode, Input, Output};
21#[cfg(feature = "parallel")]
22use rayon::prelude::*;
23
24struct ArchivedHistorySegmentOutput<'a> {
25    segment: &'a mut ArchivedHistorySegment,
26    offset: usize,
27}
28
29impl Output for ArchivedHistorySegmentOutput<'_> {
30    #[inline]
31    fn write(&mut self, mut bytes: &[u8]) {
32        while !bytes.is_empty() {
33            let piece = self
34                .segment
35                .get_mut(self.offset / Record::SIZE)
36                .expect("Encoding never exceeds the segment size; qed");
37            let output = &mut piece.record.as_flattened_mut()[self.offset % Record::SIZE..];
38            let bytes_to_write = output.len().min(bytes.len());
39            output[..bytes_to_write].copy_from_slice(&bytes[..bytes_to_write]);
40            self.offset += bytes_to_write;
41            bytes = &bytes[bytes_to_write..];
42        }
43    }
44}
45
46/// The segment represents a collection of items stored in archival history
47#[derive(Debug, Default, Clone, Eq, PartialEq)]
48pub struct Segment {
49    /// Segment items
50    pub items: Vec<SegmentItem>,
51}
52
53impl Encode for Segment {
54    #[inline(always)]
55    fn size_hint(&self) -> usize {
56        RecordedHistorySegment::SIZE
57    }
58
59    #[inline]
60    fn encode_to<O: Output + ?Sized>(&self, dest: &mut O) {
61        for item in &self.items {
62            item.encode_to(dest);
63        }
64    }
65}
66
67impl Decode for Segment {
68    #[inline]
69    fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
70        let mut items = Vec::new();
71        loop {
72            match input.remaining_len()? {
73                Some(0) => {
74                    break;
75                }
76                Some(_) => {
77                    // Processing continues below
78                }
79                None => {
80                    return Err(
81                        "Source doesn't report remaining length, decoding not possible".into(),
82                    );
83                }
84            }
85
86            match SegmentItem::decode(input) {
87                Ok(item) => {
88                    items.push(item);
89                }
90                Err(error) => {
91                    return Err(error.chain("Could not decode `Segment::items`"));
92                }
93            }
94        }
95
96        Ok(Self { items })
97    }
98}
99
100/// Similar to `Vec<u8>`, but when encoded with SCALE codec uses fixed size length encoding (as
101/// little-endian `u32`)
102#[derive(Debug, Clone, Eq, PartialEq)]
103pub struct BlockBytes(Vec<u8>);
104
105impl Deref for BlockBytes {
106    type Target = [u8];
107
108    #[inline(always)]
109    fn deref(&self) -> &Self::Target {
110        &self.0
111    }
112}
113
114impl From<BlockBytes> for Vec<u8> {
115    #[inline(always)]
116    fn from(value: BlockBytes) -> Self {
117        value.0
118    }
119}
120
121impl Encode for BlockBytes {
122    #[inline(always)]
123    fn size_hint(&self) -> usize {
124        size_of::<u32>() + self.0.len()
125    }
126
127    #[inline]
128    fn encode_to<O: Output + ?Sized>(&self, dest: &mut O) {
129        let length = u32::try_from(self.0.len())
130            .expect("All constructors guarantee the size doesn't exceed `u32`; qed");
131
132        length.encode_to(dest);
133        dest.write(&self.0);
134    }
135}
136
137impl Decode for BlockBytes {
138    #[inline]
139    fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
140        let length = u32::decode(input)?;
141        if length as usize > (RecordedHistorySegment::SIZE - size_of::<u32>()) {
142            return Err("Segment item size is impossibly large".into());
143        }
144        // TODO: It is inefficient to zero it, but there is no API for it right now and actually
145        //  implementation in `parity-scale-codec` itself is unsound:
146        //  https://github.com/paritytech/parity-scale-codec/pull/605#discussion_r2076151291
147        let mut bytes = vec![0; length as usize];
148        input.read(&mut bytes)?;
149        Ok(Self(bytes))
150    }
151}
152
153impl BlockBytes {
154    #[inline(always)]
155    fn truncate(&mut self, size: usize) {
156        self.0.truncate(size)
157    }
158}
159
160/// Kinds of items that are contained within a segment
161#[derive(Debug, Clone, Eq, PartialEq, Encode, Decode)]
162pub enum SegmentItem {
163    /// Special dummy enum variant only used as an implementation detail for padding purposes
164    #[codec(index = 0)]
165    Padding,
166    /// Contains a full block inside
167    #[codec(index = 1)]
168    Block {
169        /// Block bytes
170        bytes: BlockBytes,
171        /// This is a convenience implementation detail and will not be available on decoding
172        #[doc(hidden)]
173        #[codec(skip)]
174        block_objects: Vec<BlockObject>,
175    },
176    /// Contains the beginning of the block inside, the remainder will be found in the following
177    /// segments
178    #[codec(index = 2)]
179    BlockStart {
180        /// Block bytes
181        bytes: BlockBytes,
182        /// This is a convenience implementation detail and will not be available on decoding
183        #[doc(hidden)]
184        #[codec(skip)]
185        block_objects: Vec<BlockObject>,
186    },
187    /// Continuation of the partial block spilled over into the next segment
188    #[codec(index = 3)]
189    BlockContinuation {
190        /// Block bytes
191        bytes: BlockBytes,
192        /// This is a convenience implementation detail and will not be available on decoding
193        #[doc(hidden)]
194        #[codec(skip)]
195        block_objects: Vec<BlockObject>,
196    },
197    /// Segment header of the parent
198    #[codec(index = 4)]
199    ParentSegmentHeader(SegmentHeader),
200}
201
202/// Newly archived segment as a combination of a segment header and corresponding archived history
203/// segment containing pieces
204#[derive(Debug, Clone, Eq, PartialEq)]
205pub struct NewArchivedSegment {
206    /// Segment header
207    pub segment_header: SegmentHeader,
208    /// Segment of archived history containing pieces
209    pub pieces: ArchivedHistorySegment,
210}
211
212/// The outcome of adding a block to the archiver.
213#[derive(Debug, Clone, Eq, PartialEq)]
214pub struct ArchiveBlockOutcome {
215    /// The new segments archived after adding the block.
216    /// There can be zero or more segments created after each block.
217    pub archived_segments: Vec<NewArchivedSegment>,
218
219    /// The new object mappings for those segments.
220    /// There can be zero or more mappings created after each block.
221    pub global_objects: Vec<GlobalObject>,
222}
223
224/// Archiver instantiation error
225#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, thiserror::Error)]
226pub enum ArchiverInstantiationError {
227    /// Invalid last archived block, its size is the same as the encoded block
228    /// (so it should have been completely archived, not partially archived)
229    #[error("Invalid last archived block, its size {0} bytes is the same as the encoded block")]
230    InvalidLastArchivedBlock(u32),
231    /// Invalid block, its size is smaller than the already archived number of bytes
232    #[error(
233        "Invalid block, its size {block_bytes} bytes is smaller than the already archived block \
234        {archived_block_bytes} bytes"
235    )]
236    InvalidBlockSmallSize {
237        /// Full block size
238        block_bytes: u32,
239        /// Already archived portion of the block
240        archived_block_bytes: u32,
241    },
242}
243
244/// Block archiver.
245///
246/// It takes new confirmed (at `K` depth) blocks and concatenates them into a buffer, buffer is
247/// sliced into segments of [`RecordedHistorySegment::SIZE`] size, segments are sliced into source
248/// records of [`Record::SIZE`], records are erasure coded, committed to, then roots with proofs are
249/// appended and records become pieces that are returned alongside the corresponding segment header.
250///
251/// ## Panics
252/// Panics when operating on blocks, whose length doesn't fit into u32 (should never be the case in
253/// blockchain context anyway).
254#[derive(Debug, Clone)]
255pub struct Archiver {
256    shard_index: ShardIndex,
257    /// Buffer containing blocks and other buffered items that are pending to be included into the
258    /// next segment
259    buffer: VecDeque<SegmentItem>,
260    /// Erasure coding data structure
261    erasure_coding: ErasureCoding,
262    /// An index of the current segment
263    segment_index: LocalSegmentIndex,
264    /// Hash of the segment header of the previous segment
265    prev_segment_header_hash: Blake3Hash,
266    /// Last archived block
267    last_archived_block: Option<LastArchivedBlock>,
268}
269
270impl Archiver {
271    /// Create a new instance
272    pub fn new(shard_index: ShardIndex, erasure_coding: ErasureCoding) -> Self {
273        Self {
274            shard_index,
275            buffer: VecDeque::default(),
276            erasure_coding,
277            segment_index: LocalSegmentIndex::ZERO,
278            prev_segment_header_hash: Blake3Hash::default(),
279            last_archived_block: None,
280        }
281    }
282
283    /// Create a new instance of the archiver with the initial state in case of restart.
284    ///
285    /// `block` corresponds to `last_archived_block` and will be processed according to its state.
286    pub fn with_initial_state(
287        shard_index: ShardIndex,
288        erasure_coding: ErasureCoding,
289        segment_header: SegmentHeader,
290        encoded_block: &[u8],
291        mut block_objects: Vec<BlockObject>,
292    ) -> Result<Self, ArchiverInstantiationError> {
293        let mut archiver = Self::new(shard_index, erasure_coding);
294
295        archiver.segment_index = segment_header.local_segment_index() + LocalSegmentIndex::ONE;
296        archiver.prev_segment_header_hash = segment_header.hash();
297        archiver.last_archived_block = Some(segment_header.last_archived_block);
298
299        // The first thing in the buffer should be segment header
300        archiver
301            .buffer
302            .push_back(SegmentItem::ParentSegmentHeader(segment_header));
303
304        if let Some(archived_block_bytes) = archiver
305            .last_archived_block
306            .expect("Just inserted; qed")
307            .partial_archived()
308        {
309            let archived_block_bytes = archived_block_bytes.get();
310            let encoded_block_bytes = u32::try_from(encoded_block.len())
311                .expect("Blocks length is never bigger than u32; qed");
312
313            match encoded_block_bytes.cmp(&archived_block_bytes) {
314                Ordering::Less => {
315                    return Err(ArchiverInstantiationError::InvalidBlockSmallSize {
316                        block_bytes: encoded_block_bytes,
317                        archived_block_bytes,
318                    });
319                }
320                Ordering::Equal => {
321                    return Err(ArchiverInstantiationError::InvalidLastArchivedBlock(
322                        encoded_block_bytes,
323                    ));
324                }
325                Ordering::Greater => {
326                    // Take part of the encoded block that wasn't archived yet and push to the
327                    // buffer as a block continuation
328                    block_objects.retain_mut(|block_object: &mut BlockObject| {
329                        if block_object.offset >= archived_block_bytes {
330                            block_object.offset -= archived_block_bytes;
331                            true
332                        } else {
333                            false
334                        }
335                    });
336                    archiver.buffer.push_back(SegmentItem::BlockContinuation {
337                        bytes: BlockBytes(
338                            encoded_block[(archived_block_bytes as usize)..].to_vec(),
339                        ),
340                        block_objects,
341                    });
342                }
343            }
344        }
345
346        Ok(archiver)
347    }
348
349    /// Get the last archived block if there was any
350    pub fn last_archived_block_number(&self) -> Option<BlockNumber> {
351        self.last_archived_block
352            .map(|last_archived_block| last_archived_block.number())
353    }
354
355    /// Adds a new block to the internal buffer, potentially producing pieces, segment headers, and
356    /// object mappings.
357    ///
358    /// NOTE:
359    /// * pieces inside [`NewArchivedSegment`] are shared initially for efficient memory reuse when
360    ///   working with pieces
361    /// * pieces do not have a segment position and a segment proof filled in because it is not yet
362    ///   known at the time of local segment creation
363    ///
364    /// Returns `None` if the block is empty or larger than [`u32::MAX`].
365    pub fn add_block(
366        &mut self,
367        bytes: Vec<u8>,
368        block_objects: Vec<BlockObject>,
369    ) -> Option<ArchiveBlockOutcome> {
370        if !(1..u32::MAX as usize).contains(&bytes.len()) {
371            return None;
372        }
373
374        // Append new block to the buffer
375        self.buffer.push_back(SegmentItem::Block {
376            bytes: BlockBytes(bytes),
377            block_objects,
378        });
379
380        let mut archived_segments = Vec::new();
381        let mut object_mapping = Vec::new();
382
383        // Add completed segments and their mappings for this block
384        while let Some(mut segment) = self.produce_segment() {
385            // Produce any segment mappings that haven't already been produced.
386            object_mapping.extend(Self::produce_object_mappings(segment.items.iter_mut()));
387            archived_segments.push(self.produce_archived_segment(segment));
388        }
389
390        // Produce any next segment buffer mappings that haven't already been produced.
391        object_mapping.extend(self.produce_next_segment_mappings());
392
393        Some(ArchiveBlockOutcome {
394            archived_segments,
395            global_objects: object_mapping,
396        })
397    }
398
399    /// Try to slice buffer contents into segments if there is enough data, producing one segment at
400    /// a time
401    fn produce_segment(&mut self) -> Option<Segment> {
402        let mut segment = Segment {
403            items: Vec::with_capacity(self.buffer.len()),
404        };
405
406        let mut last_archived_block = self.last_archived_block;
407
408        let mut segment_size = segment.encoded_size();
409
410        // TODO: It is possible to simplify this whole loop to `if` in case "in progress" segment
411        //  with precomputed size is stored somewhere already
412        // 6 bytes is just large enough to encode a segment item (1 byte for enum variant, 4 bytes
413        // for length and 1 for the actual data, while segment header item is never the last one)
414        while RecordedHistorySegment::SIZE.saturating_sub(segment_size) >= 6 {
415            let segment_item = match self.buffer.pop_front() {
416                Some(segment_item) => segment_item,
417                None => {
418                    // Push all the items back into the buffer, we don't have enough data yet
419                    for segment_item in segment.items.into_iter().rev() {
420                        self.buffer.push_front(segment_item);
421                    }
422
423                    return None;
424                }
425            };
426
427            let segment_item_encoded_size = segment_item.encoded_size();
428            segment_size += segment_item_encoded_size;
429
430            // Check if there is an excess of data that should be spilled over into the next segment
431            let spill_over = segment_size.saturating_sub(RecordedHistorySegment::SIZE);
432
433            let segment_item = match segment_item {
434                SegmentItem::Padding => {
435                    unreachable!("Buffer never contains SegmentItem::Padding; qed");
436                }
437                SegmentItem::Block {
438                    mut bytes,
439                    mut block_objects,
440                } => {
441                    let last_archived_block =
442                        if let Some(last_archived_block) = &mut last_archived_block {
443                            // Increase the archived block number and assume the whole block was
444                            // archived (spill over checked below)
445                            last_archived_block
446                                .number
447                                .replace(last_archived_block.number() + BlockNumber::ONE);
448                            last_archived_block.set_complete();
449                            last_archived_block
450                        } else {
451                            // Genesis block
452                            last_archived_block.insert(LastArchivedBlock {
453                                number: BlockNumber::ZERO.into(),
454                                archived_progress: ArchivedBlockProgress::new_complete(),
455                            })
456                        };
457
458                    if spill_over == 0 {
459                        SegmentItem::Block {
460                            bytes,
461                            block_objects,
462                        }
463                    } else {
464                        let split_point = bytes.len() - spill_over;
465
466                        {
467                            let continuation_bytes = bytes[split_point..].to_vec();
468                            let continuation_block_objects = block_objects
469                                .extract_if(.., |block_object: &mut BlockObject| {
470                                    if block_object.offset >= split_point as u32 {
471                                        block_object.offset -= split_point as u32;
472                                        true
473                                    } else {
474                                        false
475                                    }
476                                })
477                                .collect();
478
479                            // Push a continuation element back into the buffer where the removed
480                            // segment item was
481                            self.buffer.push_front(SegmentItem::BlockContinuation {
482                                bytes: BlockBytes(continuation_bytes),
483                                block_objects: continuation_block_objects,
484                            });
485                        }
486
487                        bytes.truncate(split_point);
488                        // Update last archived block to include partial archiving info
489                        let archived_bytes = u32::try_from(split_point)
490                            .ok()
491                            .and_then(NonZeroU32::new)
492                            .expect(
493                                "`::add_block()` method ensures block is not empty and doesn't \
494                                exceed `u32::MAX`; qed",
495                            );
496                        last_archived_block.set_partial_archived(archived_bytes);
497
498                        SegmentItem::BlockStart {
499                            bytes,
500                            block_objects,
501                        }
502                    }
503                }
504                SegmentItem::BlockStart { .. } => {
505                    unreachable!("Buffer never contains SegmentItem::BlockStart; qed");
506                }
507                SegmentItem::BlockContinuation {
508                    mut bytes,
509                    mut block_objects,
510                } => {
511                    let last_archived_block = last_archived_block.as_mut().expect(
512                        "Block continuation implies that there are some bytes archived \
513                        already; qed",
514                    );
515
516                    let previously_archived_bytes = last_archived_block.partial_archived().expect(
517                        "Block continuation implies that there are some bytes archived \
518                        already; qed",
519                    );
520
521                    if spill_over == 0 {
522                        last_archived_block.set_complete();
523
524                        SegmentItem::BlockContinuation {
525                            bytes,
526                            block_objects,
527                        }
528                    } else {
529                        let split_point = bytes.len() - spill_over;
530
531                        {
532                            let continuation_bytes = bytes[split_point..].to_vec();
533                            let continuation_block_objects = block_objects
534                                .extract_if(.., |block_object: &mut BlockObject| {
535                                    if block_object.offset >= split_point as u32 {
536                                        block_object.offset -= split_point as u32;
537                                        true
538                                    } else {
539                                        false
540                                    }
541                                })
542                                .collect();
543                            // Push a continuation element back into the buffer where the removed
544                            // segment item was
545                            self.buffer.push_front(SegmentItem::BlockContinuation {
546                                bytes: BlockBytes(continuation_bytes),
547                                block_objects: continuation_block_objects,
548                            });
549                        }
550
551                        bytes.truncate(split_point);
552                        // Update last archived block to include partial archiving info
553                        let archived_bytes = previously_archived_bytes.get()
554                            + u32::try_from(split_point).expect(
555                                "`::add_block()` method ensures block length doesn't \
556                                    exceed `u32::MAX`; qed",
557                            );
558                        let archived_bytes = NonZeroU32::new(archived_bytes).expect(
559                            "Spillover means non-zero length of the block was archived; qed",
560                        );
561                        last_archived_block.set_partial_archived(archived_bytes);
562
563                        SegmentItem::BlockContinuation {
564                            bytes,
565                            block_objects,
566                        }
567                    }
568                }
569                SegmentItem::ParentSegmentHeader(parent_segment_header) => {
570                    // We are not interested in segment header here
571                    SegmentItem::ParentSegmentHeader(parent_segment_header)
572                }
573            };
574
575            segment.items.push(segment_item);
576        }
577
578        self.last_archived_block = last_archived_block;
579
580        Some(segment)
581    }
582
583    /// Produce object mappings for the buffered items for the next segment. Then remove the
584    /// mappings in those items.
585    ///
586    /// Must only be called after all complete segments for a block have been produced. Before
587    /// that, the buffer can contain a `BlockContinuation` which spans multiple segments.
588    fn produce_next_segment_mappings(&mut self) -> Vec<GlobalObject> {
589        Self::produce_object_mappings(self.buffer.iter_mut())
590    }
591
592    /// Produce object mappings for `items` in `segment_index`. Then remove the mappings from those
593    /// items.
594    ///
595    /// This method can be called on a `Segment`’s items, or on the `Archiver`'s internal buffer.
596    fn produce_object_mappings<'a>(
597        items: impl Iterator<Item = &'a mut SegmentItem>,
598    ) -> Vec<GlobalObject> {
599        let mut corrected_object_mapping = Vec::new();
600        let mut base_offset_in_segment = Segment::default().encoded_size();
601        for segment_item in items {
602            match segment_item {
603                SegmentItem::Padding => {
604                    unreachable!(
605                        "Segment during archiving never contains SegmentItem::Padding; qed"
606                    );
607                }
608                SegmentItem::Block {
609                    bytes: _,
610                    block_objects,
611                }
612                | SegmentItem::BlockStart {
613                    bytes: _,
614                    block_objects,
615                }
616                | SegmentItem::BlockContinuation {
617                    bytes: _,
618                    block_objects,
619                } => {
620                    for block_object in block_objects.drain(..) {
621                        // `+1` corresponds to `SegmentItem::X {}` enum variant encoding
622                        let offset_in_segment = base_offset_in_segment
623                            + 1
624                            + u32::encoded_fixed_size().expect("Fixed size; qed")
625                            + block_object.offset as usize;
626                        let raw_piece_offset = (offset_in_segment % Record::SIZE)
627                            .try_into()
628                            .expect("Offset within piece should always fit in 32-bit integer; qed");
629                        corrected_object_mapping.push(GlobalObject {
630                            hash: block_object.hash,
631                            piece_position: PiecePosition::from(
632                                (offset_in_segment / Record::SIZE) as u8,
633                            ),
634                            offset: raw_piece_offset,
635                        });
636                    }
637                }
638                SegmentItem::ParentSegmentHeader(_) => {
639                    // Ignore, no object mappings here
640                }
641            }
642
643            base_offset_in_segment += segment_item.encoded_size();
644        }
645
646        corrected_object_mapping
647    }
648
649    /// Take a segment as an input, apply necessary transformations, and produce an archived segment
650    fn produce_archived_segment(&mut self, segment: Segment) -> NewArchivedSegment {
651        let mut pieces = {
652            let mut pieces = ArchivedHistorySegment::default();
653
654            segment.encode_to(&mut ArchivedHistorySegmentOutput {
655                segment: &mut pieces,
656                offset: 0,
657            });
658            // Segment is quite big and no longer necessary
659            drop(segment);
660
661            let (source_shards, parity_shards) =
662                pieces.split_at_mut(RecordedHistorySegment::NUM_RAW_RECORDS);
663
664            self.erasure_coding
665                .extend(
666                    source_shards.iter().map(|shard| &shard.record),
667                    parity_shards.iter_mut().map(|shard| &mut shard.record),
668                )
669                .expect("Statically correct parameters; qed");
670
671            pieces
672        };
673
674        // Collect hashes to roots from all records
675        let record_roots = {
676            #[cfg(not(feature = "parallel"))]
677            let source_pieces = pieces.iter_mut();
678            #[cfg(feature = "parallel")]
679            let source_pieces = pieces.par_iter_mut();
680
681            // Here we build a tree of record chunks, with the first half being source chunks as
682            // they are originally and the second half being parity chunks. While we build tree
683            // threes here (for source chunks, parity chunks and combined for the whole record), it
684            // could have been a single tree, and it would end up with the same root. Building them
685            // separately requires less RAM and allows capturing parity chunks root more easily.
686            let iter = source_pieces.map(|piece| {
687                let [source_chunks_root, parity_chunks_root] = {
688                    let mut parity_chunks = Record::new_boxed();
689
690                    self.erasure_coding
691                        .extend(piece.record.iter(), parity_chunks.iter_mut())
692                        .expect(
693                            "Erasure coding instance is deliberately configured to support this \
694                            input; qed",
695                        );
696
697                    let source_chunks_root = *piece.record.source_chunks_root();
698                    let parity_chunks_root = BalancedMerkleTree::compute_root_only(&parity_chunks);
699
700                    [source_chunks_root, parity_chunks_root]
701                };
702
703                let record_root = BalancedMerkleTree::compute_root_only(&[
704                    source_chunks_root,
705                    parity_chunks_root,
706                ]);
707
708                piece.header.parity_chunks_root = RecordChunksRoot::from(parity_chunks_root);
709
710                record_root
711            });
712
713            iter.collect::<Vec<_>>()
714        };
715
716        let segment_merkle_tree =
717            BalancedMerkleTree::<{ ArchivedHistorySegment::NUM_PIECES }>::new_boxed(
718                record_roots
719                    .as_slice()
720                    .try_into()
721                    .expect("Statically guaranteed to have correct length; qed"),
722            );
723
724        let segment_root = SegmentRoot::from(segment_merkle_tree.root());
725
726        // Now produce segment header
727        let segment_header = SegmentHeader {
728            segment_index: self.segment_index.into(),
729            segment_root,
730            prev_segment_header_hash: self.prev_segment_header_hash,
731            last_archived_block: self
732                .last_archived_block
733                .expect("Never empty by the time segment is produced; qed"),
734        };
735
736        // Create proof for every record and write it to corresponding piece.
737        pieces
738            .iter_mut()
739            .zip(segment_merkle_tree.all_proofs())
740            .for_each(|(piece, record_proof)| {
741                piece.header = PieceHeader {
742                    shard_index: self.shard_index.into(),
743                    local_segment_index: segment_header.segment_index,
744                    segment_root: segment_header.segment_root,
745                    record_proof: RecordProof::from(record_proof),
746                    ..piece.header
747                };
748            });
749
750        // Update state
751        self.segment_index += LocalSegmentIndex::ONE;
752        self.prev_segment_header_hash = segment_header.hash();
753
754        // Add segment header to the beginning of the buffer to be the first thing included in the
755        // next segment
756        self.buffer
757            .push_front(SegmentItem::ParentSegmentHeader(segment_header));
758
759        NewArchivedSegment {
760            segment_header,
761            pieces: pieces.to_shared(),
762        }
763    }
764}