Skip to main content

ab_client_database/
lib.rs

1//! Client database.
2//!
3//! ## High-level architecture overview
4//!
5//! The database operates on [`ClientDatabaseStorageBackend`], which is backed by [`AlignedPage`]s
6//! that can be read or written. Pages contain `StorageItem`s, one storage item can occupy one or
7//! more pages, but pages always belong to a single storage item. Pages are the smallest unit and
8//! align nicely with the hardware architecture of modern SSDs. Each page starts with a prefix that
9//! describes the contents of the page. `StorageItem` always starts at the multiple of the
10//! `u128`/16 bytes, allowing for direct memory mapping onto target data structures.
11//!
12//! [`AlignedPage`]: crate::storage_backend::AlignedPage
13//!
14//! Individual pages are grouped into page groups (configurable via [`ClientDatabaseOptions`]). Page
15//! groups can be permanent and ephemeral. Permanent page groups store information that is never
16//! going to be deleted, like segment headers. Ephemeral page groups store the majority of the
17//! information about blocks, blockchain state and other things that are being created all the time.
18//! Once information in an ephemeral page group is too old and no longer needed, it can be
19//! repurposed for a new permanent or ephemeral page group. There are different kinds of page groups
20//! defined in `PageGroupKind`, and each variant has independent sequence numbers.
21//!
22//! Page groups are append-only, there is only one active permanent and one ephemeral page group.
23//! They are appended with more pages containing storage items until there is no space to add a
24//! complete storage item, after which the next page group is started.
25//!
26//! Ephemeral page groups can be freed only when they contain 100% outdated storage items.
27//! Individual pages can't be freed.
28//!
29//! Each storage item has a sequence number and checksums that help to define the global ordering
30//! and check whether a storage item was written fully. Upon restart, the page group containing the
31//! latest storage items is found, and the latest fully written storage item is identified to
32//! reconstruct the database state.
33//!
34//! Each page group starts with a `StorageItemPageGroupHeader` storage item for easier
35//! identification.
36//!
37//! The database is typically contained in a single file (though in principle could be contained in
38//! multiple if necessary). Before the database can be used, it needs to be formatted with a
39//! specific size (it is possible to increase the size afterward) before it can be used. It is
40//! expected (but depends on the storage backend) that the whole file size is pre-allocated on disk
41//! and no writes will fail due to lack of disk space (which could be the case with a sparse file).
42
43#![expect(incomplete_features, reason = "generic_const_exprs")]
44// TODO: This feature is not actually used in this crate, but is added as a workaround for
45//  https://github.com/rust-lang/rust/issues/141492
46#![feature(generic_const_exprs)]
47#![feature(
48    const_block_items,
49    const_convert,
50    const_trait_impl,
51    default_field_values,
52    get_mut_unchecked,
53    iter_collect_into,
54    maybe_uninit_fill
55)]
56
57mod page_group;
58pub mod storage_backend;
59mod storage_backend_adapter;
60
61use crate::page_group::block::StorageItemBlock;
62use crate::page_group::block::block::StorageItemBlockBlock;
63use crate::page_group::block::segment_headers::StorageItemBlockSegmentHeaders;
64use crate::page_group::block::super_segment_headers::StorageItemBlockSuperSegmentHeaders;
65use crate::storage_backend::ClientDatabaseStorageBackend;
66use crate::storage_backend_adapter::{
67    StorageBackendAdapter, StorageItemHandlerArg, StorageItemHandlers, WriteLocation,
68};
69use ab_client_api::{
70    BeaconChainInfo, BeaconChainInfoWrite, BlockDetails, BlockMerkleMountainRange, ChainInfo,
71    ChainInfoWrite, ContractSlotState, PersistBlockError, PersistSegmentHeadersError,
72    PersistSuperSegmentHeadersError, ReadBlockError, ShardSegmentRoot, ShardSegmentRootsError,
73};
74use ab_core_primitives::block::body::BeaconChainBody;
75use ab_core_primitives::block::body::owned::{GenericOwnedBlockBody, OwnedBeaconChainBody};
76use ab_core_primitives::block::header::GenericBlockHeader;
77use ab_core_primitives::block::header::owned::GenericOwnedBlockHeader;
78use ab_core_primitives::block::owned::{GenericOwnedBlock, OwnedBeaconChainBlock};
79use ab_core_primitives::block::{BlockNumber, BlockRoot, GenericBlock};
80use ab_core_primitives::segments::{
81    LocalSegmentIndex, SegmentHeader, SuperSegmentHeader, SuperSegmentIndex,
82};
83use ab_core_primitives::shard::RealShardKind;
84use ab_io_type::trivial_type::TrivialType;
85use async_lock::{
86    RwLock as AsyncRwLock, RwLockUpgradableReadGuard, RwLockWriteGuard as AsyncRwLockWriteGuard,
87};
88use rand::rngs::SysError;
89use rclite::Arc;
90use replace_with::replace_with_or_abort;
91use smallvec::{SmallVec, smallvec};
92use std::any::Any;
93use std::collections::{HashMap, VecDeque};
94use std::hash::{BuildHasherDefault, Hasher};
95use std::num::{NonZeroU32, NonZeroUsize};
96use std::ops::Deref;
97use std::sync::Arc as StdArc;
98use std::{fmt, io};
99use tracing::error;
100
101/// Unique identifier for a database
102#[derive(Debug, Copy, Clone, Eq, PartialEq, TrivialType)]
103#[repr(C)]
104pub struct DatabaseId([u8; 32]);
105
106impl Deref for DatabaseId {
107    type Target = [u8; 32];
108
109    #[inline(always)]
110    fn deref(&self) -> &Self::Target {
111        &self.0
112    }
113}
114
115impl AsRef<[u8]> for DatabaseId {
116    #[inline(always)]
117    fn as_ref(&self) -> &[u8] {
118        &self.0
119    }
120}
121
122impl DatabaseId {
123    #[inline(always)]
124    pub const fn new(bytes: [u8; 32]) -> Self {
125        Self(bytes)
126    }
127}
128
129#[derive(Default)]
130struct BlockRootHasher(u64);
131
132impl Hasher for BlockRootHasher {
133    #[inline(always)]
134    fn finish(&self) -> u64 {
135        self.0
136    }
137
138    #[inline(always)]
139    fn write(&mut self, bytes: &[u8]) {
140        let Some(state) = bytes.as_chunks().0.first().copied().map(u64::from_le_bytes) else {
141            return;
142        };
143
144        self.0 = state;
145    }
146}
147
148#[derive(Debug)]
149pub struct GenesisBlockBuilderResult<Block> {
150    /// Genesis block
151    pub block: Block,
152    /// System contracts state in the genesis block
153    pub system_contract_states: StdArc<[ContractSlotState]>,
154}
155
156/// Options for [`ClientDatabase`]
157#[derive(Debug, Copy, Clone)]
158pub struct ClientDatabaseOptions<GBB, StorageBackend> {
159    /// Write buffer size.
160    ///
161    /// Larger buffer allows buffering more async writes for improved responsiveness but requires
162    /// more RAM. Zero buffer size means all writes must be completed before returning from the
163    /// operation that triggered it. Non-zero buffer means writes can happen in the background.
164    ///
165    /// The recommended value is 5.
166    pub write_buffer_size: usize = 5,
167    /// Blocks at this depth are considered to be "confirmed" and irreversible from the consensus
168    /// perspective.
169    ///
170    /// This parameter allows establishing a final canonical order of blocks and eliminating any
171    /// potential forks at a specified depth and beyond.
172    pub block_confirmation_depth: BlockNumber,
173    /// Soft confirmation depth for blocks.
174    ///
175    /// Doesn't prevent forking on the consensus level but makes it extremely unlikely.
176    ///
177    /// This parameter determines how many blocks are retained in memory before being written to
178    /// disk. Writing discarded blocks to disk is a waste of resources, so they are retained in
179    /// memory before being soft-confirmed and written to disk for longer-term storage.
180    ///
181    /// A smaller number reduces memory usage while increasing the probability of unnecessary disk
182    /// writes. A larger number increases memory usage, while avoiding unnecessary disk writes, but
183    /// also increases the chance of recent blocks not being retained on disk in case of a crash.
184    ///
185    /// The recommended value is 3 blocks.
186    pub soft_confirmation_depth: BlockNumber = BlockNumber::from(3),
187    /// Defines how many fork tips should be maintained in total.
188    ///
189    /// As natural forks occur, there may be more than one tip in existence, with only one of them
190    /// being considered "canonical". This parameter defines how many of these tips to maintain in a
191    /// sort of LRU style cache. Tips beyond this limit that were not extended for a long time will
192    /// be pruned automatically.
193    ///
194    /// A larger number results in higher memory usage and higher complexity of pruning algorithms.
195    ///
196    /// The recommended value is 3 blocks.
197    pub max_fork_tips: NonZeroUsize = NonZeroUsize::new(3).expect("Not zero; qed"),
198    /// Max distance between fork tip and the best block.
199    ///
200    /// When forks are this deep, they will be pruned, even without reaching the `max_fork_tips`
201    /// limit. This essentially means the tip was not extended for some time, and while it is
202    /// theoretically possible for the chain to continue from this tip, the probability is so small
203    /// that it is not worth storing it.
204    ///
205    /// A larger value results in higher memory usage and higher complexity of pruning algorithms.
206    ///
207    /// The recommended value is 5 blocks.
208    pub max_fork_tip_distance: BlockNumber = BlockNumber::from(5),
209    /// Genesis block builder is responsible to create genesis block and corresponding state for
210    /// bootstrapping purposes.
211    pub genesis_block_builder: GBB,
212    /// Storage backend to use for storing and retrieving storage items
213    pub storage_backend: StorageBackend,
214}
215
216/// Options for [`ClientDatabase`]
217#[derive(Debug, Copy, Clone)]
218pub struct ClientDatabaseFormatOptions {
219    /// The number of [`AlignedPage`]s in a single page group.
220    ///
221    /// [`AlignedPage`]: crate::storage_backend::AlignedPage
222    ///
223    /// Each group always has a set of storage items with monotonically increasing sequence
224    /// numbers. The database only frees page groups for reuse when all storage items there are
225    /// no longer in use.
226    ///
227    /// A smaller number means storage can be reclaimed for reuse more quickly and higher
228    /// concurrency during restart, but must not be too small that no storage item fits within a
229    /// page group anymore. A larger number allows finding the range of sequence numbers that are
230    /// already used and where potential write interruption happened on restart more efficiently,
231    /// but will use more RAM in the process.
232    ///
233    /// The recommended size is 256 MiB unless a tiny database is used for testing purposes, where
234    /// a smaller value might work too.
235    pub page_group_size: NonZeroU32,
236    /// By default, formatting will be aborted if the database appears to be already formatted.
237    ///
238    /// Setting this option to `true` skips the check and formats the database anyway.
239    pub force: bool,
240}
241
242#[derive(Debug, thiserror::Error)]
243pub enum ClientDatabaseError {
244    /// Invalid soft confirmation depth, it must be smaller than confirmation depth k
245    #[error("Invalid soft confirmation depth, it must be smaller than confirmation depth k")]
246    InvalidSoftConfirmationDepth,
247    /// Invalid max fork tip distance, it must be smaller or equal to confirmation depth k
248    #[error("Invalid max fork tip distance, it must be smaller or equal to confirmation depth k")]
249    InvalidMaxForkTipDistance,
250    /// Storage backend has canceled read request
251    #[error("Storage backend has canceled read request")]
252    ReadRequestCancelled,
253    /// Storage backend read error
254    #[error("Storage backend read error: {error}")]
255    ReadError {
256        /// Low-level error
257        error: io::Error,
258    },
259    /// Unsupported database version
260    #[error("Unsupported database version: {database_version}")]
261    UnsupportedDatabaseVersion {
262        /// Database version
263        database_version: u8,
264    },
265    /// Page group size is too small, must be at least two pages
266    #[error("Page group size is too small ({page_group_size}), must be at least two pages")]
267    PageGroupSizeTooSmall {
268        /// Page group size in pages
269        page_group_size: u32,
270    },
271    /// Unexpected sequence number
272    #[error(
273        "Unexpected sequence number {actual} at page offset {page_offset} (expected \
274        {expected})"
275    )]
276    UnexpectedSequenceNumber {
277        /// Sequence number in the database
278        actual: u64,
279        /// Expected sequence number
280        expected: u64,
281        /// Page offset where storage item is found
282        page_offset: u32,
283    },
284    /// Unexpected storage item
285    #[error("Unexpected storage item at offset {page_offset}: {storage_item:?}")]
286    UnexpectedStorageItem {
287        /// First storage item
288        storage_item: Box<dyn fmt::Debug + Send + Sync>,
289        /// Page offset where storage item is found
290        page_offset: u32,
291    },
292    /// Invalid block
293    #[error("Invalid block at offset {page_offset}")]
294    InvalidBlock {
295        /// Page offset where storage item is found
296        page_offset: u32,
297    },
298    /// Invalid segment headers
299    #[error("Invalid segment headers at offset {page_offset}")]
300    InvalidSegmentHeaders {
301        /// Page offset where storage item is found
302        page_offset: u32,
303    },
304    /// Failed to adjust ancestor block forks
305    #[error("Failed to adjust ancestor block forks")]
306    FailedToAdjustAncestorBlockForks,
307    /// Database is not formatted yet
308    #[error("Database is not formatted yet")]
309    Unformatted,
310    /// Non-permanent first page group
311    #[error("Non-permanent first page group")]
312    NonPermanentFirstPageGroup,
313}
314
315/// Error for [`ClientDatabase::format()`]
316#[derive(Debug, thiserror::Error)]
317pub enum ClientDatabaseFormatError {
318    /// Storage backend has canceled read request
319    #[error("Storage backend has canceled read request")]
320    ReadRequestCancelled,
321    /// Storage backend read error
322    #[error("Storage backend read error: {error}")]
323    ReadError {
324        /// Low-level error
325        error: io::Error,
326    },
327    /// Failed to generate database id
328    #[error("Failed to generate database id")]
329    FailedToGenerateDatabaseId {
330        /// Low-level error
331        #[from]
332        error: SysError,
333    },
334    /// Database is already formatted yet
335    #[error("Database is already formatted yet")]
336    AlreadyFormatted,
337    /// Storage backend has canceled a writing request
338    #[error("Storage backend has canceled a writing request")]
339    WriteRequestCancelled,
340    /// Storage item write error
341    #[error("Storage item write error")]
342    StorageItemWriteError {
343        /// Low-level error
344        #[from]
345        error: io::Error,
346    },
347}
348
349#[derive(Debug, Copy, Clone)]
350struct ForkTip {
351    number: BlockNumber,
352    root: BlockRoot,
353}
354
355enum FullBlock<'a, Block>
356where
357    Block: GenericOwnedBlock,
358{
359    InMemory(&'a Block),
360    Persisted {
361        header: &'a Block::Header,
362        write_location: WriteLocation,
363    },
364}
365
366#[derive(Debug)]
367struct BeaconChainBlockDetails {
368    // Shard segment roots, only present for beacon chain blocks
369    shard_segment_roots: StdArc<[ShardSegmentRoot]>,
370}
371
372impl BeaconChainBlockDetails {
373    fn from_body(body: &BeaconChainBody<'_>) -> Self {
374        let shard_segment_roots = body
375            .intermediate_shard_blocks()
376            .iter()
377            .flat_map(|intermediate_shard_block_info| {
378                let own_segments = intermediate_shard_block_info
379                    .own_segments
380                    .into_iter()
381                    .flat_map({
382                        let shard_index = intermediate_shard_block_info.header.prefix.shard_index;
383
384                        move |own_segments| {
385                            (own_segments.first_local_segment_index..)
386                                .zip(own_segments.segment_roots)
387                                .map(move |(segment_index, &segment_root)| ShardSegmentRoot {
388                                    shard_index,
389                                    segment_index,
390                                    segment_root,
391                                })
392                        }
393                    });
394                let child_shard_segment_roots = intermediate_shard_block_info
395                    .leaf_shards_segments()
396                    .flat_map(move |(shard_index, own_segments)| {
397                        (own_segments.first_local_segment_index..)
398                            .zip(own_segments.segment_roots)
399                            .map(move |(segment_index, &segment_root)| ShardSegmentRoot {
400                                shard_index,
401                                segment_index,
402                                segment_root,
403                            })
404                    });
405
406                own_segments.chain(child_shard_segment_roots)
407            })
408            .collect();
409
410        Self {
411            shard_segment_roots,
412        }
413    }
414}
415
416/// Client database block contains details about the block state in the database.
417///
418/// Originally all blocks are stored in memory. Once a block is soft-confirmed (see
419/// [`ClientDatabaseOptions::soft_confirmation_depth`]), it is persisted (likely on disk). Later
420///  when it is "confirmed" fully (see [`ClientDatabaseOptions::soft_confirmation_depth`]), it
421/// becomes irreversible.
422#[derive(Debug)]
423enum ClientDatabaseBlock<Block>
424where
425    Block: GenericOwnedBlock,
426{
427    /// Block is stored in memory and wasn't persisted yet
428    InMemory {
429        block: Block,
430        block_details: BlockDetails,
431        /// Only present for beacon chain blocks
432        beacon_chain_block_details: Option<BeaconChainBlockDetails>,
433    },
434    /// Block was persisted (likely on disk)
435    Persisted {
436        header: Block::Header,
437        block_details: BlockDetails,
438        /// Only present for beacon chain blocks
439        beacon_chain_block_details: Option<BeaconChainBlockDetails>,
440        write_location: WriteLocation,
441    },
442    /// Block was persisted (likely on disk) and is irreversibly "confirmed" from the consensus
443    /// perspective
444    PersistedConfirmed {
445        header: Block::Header,
446        /// Only present for beacon chain blocks
447        beacon_chain_block_details: Option<BeaconChainBlockDetails>,
448        write_location: WriteLocation,
449    },
450}
451
452impl<Block> ClientDatabaseBlock<Block>
453where
454    Block: GenericOwnedBlock,
455{
456    #[inline(always)]
457    fn header(&self) -> &Block::Header {
458        match self {
459            Self::InMemory { block, .. } => block.header(),
460            Self::Persisted { header, .. } | Self::PersistedConfirmed { header, .. } => header,
461        }
462    }
463
464    #[inline(always)]
465    fn full_block(&self) -> FullBlock<'_, Block> {
466        match self {
467            Self::InMemory { block, .. } => FullBlock::InMemory(block),
468            Self::Persisted {
469                header,
470                write_location,
471                ..
472            }
473            | Self::PersistedConfirmed {
474                header,
475                write_location,
476                ..
477            } => FullBlock::Persisted {
478                header,
479                write_location: *write_location,
480            },
481        }
482    }
483
484    #[inline(always)]
485    fn block_details(&self) -> Option<&BlockDetails> {
486        match self {
487            Self::InMemory { block_details, .. } | Self::Persisted { block_details, .. } => {
488                Some(block_details)
489            }
490            Self::PersistedConfirmed { .. } => None,
491        }
492    }
493
494    #[inline(always)]
495    fn beacon_chain_block_details(&self) -> &Option<BeaconChainBlockDetails> {
496        match self {
497            Self::InMemory {
498                beacon_chain_block_details,
499                ..
500            }
501            | Self::Persisted {
502                beacon_chain_block_details,
503                ..
504            }
505            | Self::PersistedConfirmed {
506                beacon_chain_block_details,
507                ..
508            } => beacon_chain_block_details,
509        }
510    }
511}
512
513#[derive(Debug)]
514struct StateData<Block>
515where
516    Block: GenericOwnedBlock,
517{
518    /// Tips of forks that have no descendants.
519    ///
520    /// The current best block is at the front, the rest are in the order from most recently
521    /// updated towards the front to least recently at the back.
522    fork_tips: VecDeque<ForkTip>,
523    /// Map from block root to block number.
524    ///
525    /// Is meant to be used in conjunction with `headers` and `blocks` fields, which are indexed by
526    /// block numbers.
527    block_roots: HashMap<BlockRoot, BlockNumber, BuildHasherDefault<BlockRootHasher>>,
528    /// List of blocks with the newest at the front.
529    ///
530    /// The first element of the first entry corresponds to the best block.
531    ///
532    /// It is expected that in most block numbers there will be exactly one block, some two,
533    /// anything more than that will be very rare. The list of forks for a block number is
534    /// organized in such a way that the first entry at every block number corresponds to the
535    /// canonical version of the blockchain at any point in time.
536    ///
537    /// A position withing this data structure is called "block offset". This is an ephemeral value
538    /// and changes as new best blocks are added. Blocks at the same height are collectively called
539    /// "block forks" and the position of the block within the same block height is called
540    /// "fork offset". While fork offset `0` always corresponds to the canonical version of the
541    /// blockchain, other offsets are not guaranteed to follow any particular ordering rules.
542    blocks: VecDeque<SmallVec<[ClientDatabaseBlock<Block>; 2]>>,
543}
544
545#[derive(Debug)]
546struct SegmentHeadersCache {
547    segment_headers_cache: Vec<SegmentHeader>,
548}
549
550impl SegmentHeadersCache {
551    #[inline(always)]
552    fn last_segment_header(&self) -> Option<SegmentHeader> {
553        self.segment_headers_cache.last().cloned()
554    }
555
556    #[inline(always)]
557    fn max_local_segment_index(&self) -> Option<LocalSegmentIndex> {
558        self.segment_headers_cache
559            .last()
560            .map(|segment_header| segment_header.segment_index.as_inner())
561    }
562
563    #[inline(always)]
564    fn get_segment_header(&self, local_segment_index: LocalSegmentIndex) -> Option<SegmentHeader> {
565        self.segment_headers_cache
566            .get(u64::from(local_segment_index) as usize)
567            .copied()
568    }
569
570    /// Returns actually added segments (some might have been skipped)
571    fn add_segment_headers(
572        &mut self,
573        mut segment_headers: Vec<SegmentHeader>,
574    ) -> Result<Vec<SegmentHeader>, PersistSegmentHeadersError> {
575        self.segment_headers_cache.reserve(segment_headers.len());
576
577        let mut maybe_last_local_segment_index = self.max_local_segment_index();
578
579        if let Some(last_segment_index) = maybe_last_local_segment_index {
580            // Skip already stored segment headers
581            segment_headers.retain(|segment_header| {
582                segment_header.segment_index.as_inner() > last_segment_index
583            });
584        }
585
586        // Check all input segment headers to see which ones are not stored yet and verifying that
587        // segment indices are monotonically increasing
588        for segment_header in segment_headers.iter().copied() {
589            let local_segment_index = segment_header.local_segment_index();
590            match maybe_last_local_segment_index {
591                Some(last_local_segment_index) => {
592                    if local_segment_index != last_local_segment_index + LocalSegmentIndex::ONE {
593                        return Err(PersistSegmentHeadersError::MustFollowLastSegmentIndex {
594                            local_segment_index,
595                            last_local_segment_index,
596                        });
597                    }
598
599                    self.segment_headers_cache.push(segment_header);
600                    maybe_last_local_segment_index.replace(local_segment_index);
601                }
602                None => {
603                    if local_segment_index != LocalSegmentIndex::ZERO {
604                        return Err(PersistSegmentHeadersError::FirstSegmentIndexZero {
605                            local_segment_index,
606                        });
607                    }
608
609                    self.segment_headers_cache.push(segment_header);
610                    maybe_last_local_segment_index.replace(local_segment_index);
611                }
612            }
613        }
614
615        Ok(segment_headers)
616    }
617}
618
619#[derive(Debug)]
620struct SuperSegmentHeadersCache {
621    super_segment_headers_cache: Vec<SuperSegmentHeader>,
622}
623
624impl SuperSegmentHeadersCache {
625    #[inline(always)]
626    fn last_super_segment_header(&self) -> Option<SuperSegmentHeader> {
627        self.super_segment_headers_cache.last().cloned()
628    }
629
630    #[inline]
631    fn previous_super_segment_header(
632        &self,
633        target_block_number: BlockNumber,
634    ) -> Option<SuperSegmentHeader> {
635        let block_number = target_block_number.checked_sub(BlockNumber::ONE)?;
636        let index = match self.super_segment_headers_cache.binary_search_by_key(
637            &block_number,
638            |super_segment_header| {
639                super_segment_header
640                    .target_beacon_chain_block_number
641                    .as_inner()
642            },
643        ) {
644            Ok(found_index) => found_index,
645            Err(insert_index) => insert_index.checked_sub(1)?,
646        };
647
648        self.super_segment_headers_cache.get(index).copied()
649    }
650
651    #[inline(always)]
652    fn get_super_segment_header(
653        &self,
654        local_segment_index: SuperSegmentIndex,
655    ) -> Option<SuperSegmentHeader> {
656        self.super_segment_headers_cache
657            .get(u64::from(local_segment_index) as usize)
658            .copied()
659    }
660
661    /// Returns actually added super segments (some might have been skipped)
662    fn add_super_segment_headers(
663        &mut self,
664        mut super_segment_headers: Vec<SuperSegmentHeader>,
665    ) -> Result<Vec<SuperSegmentHeader>, PersistSuperSegmentHeadersError> {
666        self.super_segment_headers_cache
667            .reserve(super_segment_headers.len());
668
669        let mut maybe_last_super_segment_index = self
670            .super_segment_headers_cache
671            .last()
672            .map(|header| header.index.as_inner());
673
674        if let Some(last_super_segment_index) = maybe_last_super_segment_index {
675            // Skip already stored super segment headers
676            super_segment_headers.retain(|super_segment_header| {
677                super_segment_header.index.as_inner() > last_super_segment_index
678            });
679        }
680
681        // Check all input super segment headers to see which ones are not stored yet and verifying
682        // that super segment indices are monotonically increasing
683        for super_segment_header in super_segment_headers.iter().copied() {
684            let super_segment_index = super_segment_header.index.as_inner();
685            match maybe_last_super_segment_index {
686                Some(last_super_segment_index) => {
687                    if super_segment_index != last_super_segment_index + SuperSegmentIndex::ONE {
688                        return Err(
689                            PersistSuperSegmentHeadersError::MustFollowLastSegmentIndex {
690                                super_segment_index,
691                                last_super_segment_index,
692                            },
693                        );
694                    }
695
696                    self.super_segment_headers_cache.push(super_segment_header);
697                    maybe_last_super_segment_index.replace(super_segment_index);
698                }
699                None => {
700                    if super_segment_index != SuperSegmentIndex::ZERO {
701                        return Err(PersistSuperSegmentHeadersError::FirstSegmentIndexZero {
702                            super_segment_index,
703                        });
704                    }
705
706                    self.super_segment_headers_cache.push(super_segment_header);
707                    maybe_last_super_segment_index.replace(super_segment_index);
708                }
709            }
710        }
711
712        Ok(super_segment_headers)
713    }
714}
715
716// TODO: Hide implementation details
717#[derive(Debug)]
718struct State<Block, StorageBackend>
719where
720    Block: GenericOwnedBlock,
721{
722    data: StateData<Block>,
723    segment_headers_cache: SegmentHeadersCache,
724    super_segment_headers_cache: SuperSegmentHeadersCache,
725    storage_backend_adapter: AsyncRwLock<StorageBackendAdapter<StorageBackend>>,
726}
727
728impl<Block, StorageBackend> State<Block, StorageBackend>
729where
730    Block: GenericOwnedBlock,
731{
732    #[inline(always)]
733    fn best_tip(&self) -> &ForkTip {
734        self.data
735            .fork_tips
736            .front()
737            .expect("The best block is always present; qed")
738    }
739
740    #[inline(always)]
741    fn best_block(&self) -> &ClientDatabaseBlock<Block> {
742        self.data
743            .blocks
744            .front()
745            .expect("The best block is always present; qed")
746            .first()
747            .expect("The best block is always present; qed")
748    }
749}
750
751#[derive(Debug)]
752struct BlockToPersist<'a, Block>
753where
754    Block: GenericOwnedBlock,
755{
756    block_offset: usize,
757    fork_offset: usize,
758    block: &'a Block,
759    block_details: &'a BlockDetails,
760}
761
762#[derive(Debug)]
763struct PersistedBlock {
764    block_offset: usize,
765    fork_offset: usize,
766    write_location: WriteLocation,
767}
768
769#[derive(Debug)]
770struct ClientDatabaseInnerOptions {
771    block_confirmation_depth: BlockNumber,
772    soft_confirmation_depth: BlockNumber,
773    max_fork_tips: NonZeroUsize,
774    max_fork_tip_distance: BlockNumber,
775}
776
777#[derive(Debug)]
778struct Inner<Block, StorageBackend>
779where
780    Block: GenericOwnedBlock,
781{
782    state: AsyncRwLock<State<Block, StorageBackend>>,
783    options: ClientDatabaseInnerOptions,
784}
785
786/// Client database
787#[derive(Debug)]
788pub struct ClientDatabase<Block, StorageBackend>
789where
790    Block: GenericOwnedBlock,
791{
792    inner: Arc<Inner<Block, StorageBackend>>,
793}
794
795impl<Block, StorageBackend> Clone for ClientDatabase<Block, StorageBackend>
796where
797    Block: GenericOwnedBlock,
798{
799    fn clone(&self) -> Self {
800        Self {
801            inner: self.inner.clone(),
802        }
803    }
804}
805
806impl<Block, StorageBackend> Drop for ClientDatabase<Block, StorageBackend>
807where
808    Block: GenericOwnedBlock,
809{
810    fn drop(&mut self) {
811        // TODO: Persist things that were not persisted yet to reduce the data loss on shutdown
812    }
813}
814
815impl<Block, StorageBackend> ChainInfo<Block> for ClientDatabase<Block, StorageBackend>
816where
817    Block: GenericOwnedBlock,
818    StorageBackend: ClientDatabaseStorageBackend,
819{
820    #[inline]
821    fn best_root(&self) -> BlockRoot {
822        // Blocking read lock is fine because where a write lock is only taken for a short time and
823        // most locks are read locks
824        self.inner.state.read_blocking().best_tip().root
825    }
826
827    #[inline]
828    fn best_header(&self) -> Block::Header {
829        // Blocking read lock is fine because where a write lock is only taken for a short time and
830        // most locks are read locks
831        self.inner
832            .state
833            .read_blocking()
834            .best_block()
835            .header()
836            .clone()
837    }
838
839    #[inline]
840    fn best_header_with_details(&self) -> (Block::Header, BlockDetails) {
841        // Blocking read lock is fine because where a write lock is only taken for a short time and
842        // most locks are read locks
843        let state = self.inner.state.read_blocking();
844        let best_block = state.best_block();
845        (
846            best_block.header().clone(),
847            best_block
848                .block_details()
849                .expect("Always present for the best block; qed")
850                .clone(),
851        )
852    }
853
854    // TODO: Add fast path when `descendant_block_root` is the best block
855    #[inline]
856    fn ancestor_header(
857        &self,
858        ancestor_block_number: BlockNumber,
859        descendant_block_root: &BlockRoot,
860    ) -> Option<Block::Header> {
861        // Blocking read lock is fine because where a write lock is only taken for a short time and
862        // most locks are read locks
863        let state = self.inner.state.read_blocking();
864        let best_number = state.best_tip().number;
865
866        let ancestor_block_offset =
867            u64::from(best_number.checked_sub(ancestor_block_number)?) as usize;
868        let ancestor_block_candidates = state.data.blocks.get(ancestor_block_offset)?;
869
870        let descendant_block_number = *state.data.block_roots.get(descendant_block_root)?;
871        if ancestor_block_number > descendant_block_number {
872            return None;
873        }
874        let descendant_block_offset =
875            u64::from(best_number.checked_sub(descendant_block_number)?) as usize;
876
877        // Range of blocks where the first item is expected to contain a descendant
878        let mut blocks_range_iter = state
879            .data
880            .blocks
881            .iter()
882            .enumerate()
883            .skip(descendant_block_offset);
884
885        let (_offset, descendant_block_candidates) = blocks_range_iter.next()?;
886        let descendant_header = descendant_block_candidates
887            .iter()
888            .find(|block| &*block.header().header().root() == descendant_block_root)?
889            .header()
890            .header();
891
892        // If there are no forks at this level, then this is the canonical chain and ancestor
893        // block number we're looking for is the first block at the corresponding block number.
894        // Similarly, if there is just a single ancestor candidate and descendant exists, it must be
895        // the one we care about.
896        if descendant_block_candidates.len() == 1 || ancestor_block_candidates.len() == 1 {
897            return ancestor_block_candidates
898                .iter()
899                .next()
900                .map(|block| block.header().clone());
901        }
902
903        let mut parent_block_root = &descendant_header.prefix.parent_root;
904
905        // Iterate over the blocks following descendant until ancestor is reached
906        for (block_offset, parent_candidates) in blocks_range_iter {
907            let parent_header = parent_candidates
908                .iter()
909                .find(|header| &*header.header().header().root() == parent_block_root)?
910                .header();
911
912            // When header offset matches, we found the header
913            if block_offset == ancestor_block_offset {
914                return Some(parent_header.clone());
915            }
916
917            parent_block_root = &parent_header.header().prefix.parent_root;
918        }
919
920        None
921    }
922
923    #[inline]
924    fn header(&self, block_root: &BlockRoot) -> Option<Block::Header> {
925        // Blocking read lock is fine because where a write lock is only taken for a short time and
926        // most locks are read locks
927        let state = self.inner.state.read_blocking();
928        let best_number = state.best_tip().number;
929
930        let block_number = *state.data.block_roots.get(block_root)?;
931        let block_offset = u64::from(best_number.checked_sub(block_number)?) as usize;
932        let block_candidates = state.data.blocks.get(block_offset)?;
933
934        block_candidates.iter().find_map(|block| {
935            let header = block.header();
936
937            if &*header.header().root() == block_root {
938                Some(header.clone())
939            } else {
940                None
941            }
942        })
943    }
944
945    #[inline]
946    fn header_with_details(&self, block_root: &BlockRoot) -> Option<(Block::Header, BlockDetails)> {
947        // Blocking read lock is fine because where a write lock is only taken for a short time and
948        // most locks are read locks
949        let state = self.inner.state.read_blocking();
950        let best_number = state.best_tip().number;
951
952        let block_number = *state.data.block_roots.get(block_root)?;
953        let block_offset = u64::from(best_number.checked_sub(block_number)?) as usize;
954        let block_candidates = state.data.blocks.get(block_offset)?;
955
956        block_candidates.iter().find_map(|block| {
957            let header = block.header();
958            let block_details = block.block_details().cloned()?;
959
960            if &*header.header().root() == block_root {
961                Some((header.clone(), block_details))
962            } else {
963                None
964            }
965        })
966    }
967
968    #[inline]
969    async fn block(&self, block_root: &BlockRoot) -> Result<Block, ReadBlockError> {
970        let state = self.inner.state.read().await;
971        let best_number = state.best_tip().number;
972
973        let block_number = *state
974            .data
975            .block_roots
976            .get(block_root)
977            .ok_or(ReadBlockError::UnknownBlockRoot)?;
978        let block_offset = u64::from(
979            best_number
980                .checked_sub(block_number)
981                .expect("Known block roots always have valid block offset; qed"),
982        ) as usize;
983        let block_candidates = state
984            .data
985            .blocks
986            .get(block_offset)
987            .expect("Valid block offsets always have block entries; qed");
988
989        for block_candidate in block_candidates {
990            let header = block_candidate.header();
991
992            if &*header.header().root() == block_root {
993                return match block_candidate.full_block() {
994                    FullBlock::InMemory(block) => Ok(block.clone()),
995                    FullBlock::Persisted {
996                        header,
997                        write_location,
998                    } => {
999                        let storage_backend_adapter = state.storage_backend_adapter.read().await;
1000
1001                        let storage_item = storage_backend_adapter
1002                            .read_storage_item::<StorageItemBlock>(write_location)
1003                            .await?;
1004
1005                        let storage_item_block = match storage_item {
1006                            StorageItemBlock::Block(storage_item_block) => storage_item_block,
1007                            StorageItemBlock::SegmentHeaders(_) => {
1008                                return Err(ReadBlockError::StorageItemReadError {
1009                                    error: io::Error::other(
1010                                        "Unexpected storage item: `SegmentHeaders`",
1011                                    ),
1012                                });
1013                            }
1014                            StorageItemBlock::SuperSegmentHeaders(_) => {
1015                                return Err(ReadBlockError::StorageItemReadError {
1016                                    error: io::Error::other(
1017                                        "Unexpected storage item: `SuperSegmentHeaders`",
1018                                    ),
1019                                });
1020                            }
1021                        };
1022
1023                        let StorageItemBlockBlock {
1024                            header: _,
1025                            body,
1026                            mmr_with_block: _,
1027                            system_contract_states: _,
1028                        } = storage_item_block;
1029
1030                        Block::from_buffers(header.buffer().clone(), body)
1031                            .ok_or(ReadBlockError::FailedToDecode)
1032                    }
1033                };
1034            }
1035        }
1036
1037        unreachable!("Known block root always has block candidate associated with it; qed")
1038    }
1039
1040    #[inline]
1041    fn last_segment_header(&self) -> Option<SegmentHeader> {
1042        // Blocking read lock is fine because where a write lock is only taken for a short time and
1043        // most locks are read locks
1044        let state = self.inner.state.read_blocking();
1045        state.segment_headers_cache.last_segment_header()
1046    }
1047
1048    #[inline]
1049    fn get_segment_header(&self, segment_index: LocalSegmentIndex) -> Option<SegmentHeader> {
1050        // Blocking read lock is fine because where a write lock is only taken for a short time and
1051        // most locks are read locks
1052        let state = self.inner.state.read_blocking();
1053
1054        state
1055            .segment_headers_cache
1056            .get_segment_header(segment_index)
1057    }
1058
1059    fn segment_headers_for_block(&self, block_number: BlockNumber) -> Vec<SegmentHeader> {
1060        // Blocking read lock is fine because where a write lock is only taken for a short time and
1061        // most locks are read locks
1062        let state = self.inner.state.read_blocking();
1063
1064        let Some(last_local_segment_index) = state.segment_headers_cache.max_local_segment_index()
1065        else {
1066            // Not initialized
1067            return Vec::new();
1068        };
1069
1070        // Special case for the initial segment (for beacon chain genesis block)
1071        if Block::Block::SHARD_KIND == RealShardKind::BeaconChain
1072            && block_number == BlockNumber::ONE
1073        {
1074            // If there is a segment index present, and we store monotonically increasing segment
1075            // headers, then the first header exists
1076            return vec![
1077                state
1078                    .segment_headers_cache
1079                    .get_segment_header(LocalSegmentIndex::ZERO)
1080                    .expect("Segment headers are stored in monotonically increasing order; qed"),
1081            ];
1082        }
1083
1084        if last_local_segment_index == LocalSegmentIndex::ZERO {
1085            // Genesis segment already included in block #1
1086            return Vec::new();
1087        }
1088
1089        let mut current_local_segment_index = last_local_segment_index;
1090        loop {
1091            // If the current segment index present, and we store monotonically increasing segment
1092            // headers, then the current segment header exists as well.
1093            let current_segment_header = state
1094                .segment_headers_cache
1095                .get_segment_header(current_local_segment_index)
1096                .expect("Segment headers are stored in monotonically increasing order; qed");
1097
1098            // The block immediately after the archived segment adding the confirmation depth
1099            let target_block_number = current_segment_header.last_archived_block.number()
1100                + BlockNumber::ONE
1101                + self.inner.options.block_confirmation_depth;
1102            if target_block_number == block_number {
1103                let mut headers_for_block = vec![current_segment_header];
1104
1105                // Check block spanning multiple segments
1106                let last_archived_block_number = current_segment_header.last_archived_block.number;
1107                let mut local_segment_index = current_local_segment_index - LocalSegmentIndex::ONE;
1108
1109                while let Some(segment_header) = state
1110                    .segment_headers_cache
1111                    .get_segment_header(local_segment_index)
1112                {
1113                    if segment_header.last_archived_block.number == last_archived_block_number {
1114                        headers_for_block.insert(0, segment_header);
1115                        local_segment_index -= LocalSegmentIndex::ONE;
1116                    } else {
1117                        break;
1118                    }
1119                }
1120
1121                return headers_for_block;
1122            }
1123
1124            // iterate segments further
1125            if target_block_number > block_number {
1126                // no need to check the initial segment
1127                if current_local_segment_index > LocalSegmentIndex::ONE {
1128                    current_local_segment_index -= LocalSegmentIndex::ONE
1129                } else {
1130                    break;
1131                }
1132            } else {
1133                // No segment headers required
1134                return Vec::new();
1135            }
1136        }
1137
1138        // No segment headers required
1139        Vec::new()
1140    }
1141}
1142
1143impl<Block, StorageBackend> ChainInfoWrite<Block> for ClientDatabase<Block, StorageBackend>
1144where
1145    Block: GenericOwnedBlock,
1146    StorageBackend: ClientDatabaseStorageBackend,
1147{
1148    async fn persist_block(
1149        &self,
1150        block: Block,
1151        block_details: BlockDetails,
1152    ) -> Result<(), PersistBlockError> {
1153        let mut state = self.inner.state.write().await;
1154        let best_number = state.best_tip().number;
1155
1156        let header = block.header().header();
1157
1158        let block_number = header.prefix.number;
1159
1160        if best_number == BlockNumber::ZERO && block_number != BlockNumber::ONE {
1161            // Special case when syncing on top of the fresh database
1162            Self::insert_first_block(&mut state.data, block, block_details);
1163
1164            return Ok(());
1165        }
1166
1167        if block_number == best_number + BlockNumber::ONE {
1168            return Self::insert_new_best_block(state, &self.inner, block, block_details).await;
1169        }
1170
1171        let block_offset = u64::from(
1172            best_number
1173                .checked_sub(block_number)
1174                .ok_or(PersistBlockError::MissingParent)?,
1175        ) as usize;
1176
1177        if block_offset >= u64::from(self.inner.options.block_confirmation_depth) as usize {
1178            return Err(PersistBlockError::OutsideAcceptableRange);
1179        }
1180
1181        let state = &mut *state;
1182
1183        let block_forks = state.data.blocks.get_mut(block_offset).ok_or_else(|| {
1184            error!(
1185                %block_number,
1186                %block_offset,
1187                "Failed to store block fork, header offset is missing despite being within \
1188                acceptable range"
1189            );
1190
1191            PersistBlockError::OutsideAcceptableRange
1192        })?;
1193
1194        for (index, fork_tip) in state.data.fork_tips.iter_mut().enumerate() {
1195            // Block's parent is no longer a fork tip, remove it
1196            if fork_tip.root == header.prefix.parent_root {
1197                state.data.fork_tips.remove(index);
1198                break;
1199            }
1200        }
1201
1202        let block_root = *header.root();
1203        // Insert at position 1, which means the most recent tip, which doesn't correspond to
1204        // the best block
1205        state.data.fork_tips.insert(
1206            1,
1207            ForkTip {
1208                number: block_number,
1209                root: block_root,
1210            },
1211        );
1212        state.data.block_roots.insert(block_root, block_number);
1213        let beacon_chain_block_details = <dyn Any>::downcast_ref::<OwnedBeaconChainBlock>(&block)
1214            .map(|block| BeaconChainBlockDetails::from_body(block.body.body()));
1215        block_forks.push(ClientDatabaseBlock::InMemory {
1216            block,
1217            block_details,
1218            beacon_chain_block_details,
1219        });
1220
1221        Self::prune_outdated_fork_tips(block_number, &mut state.data, &self.inner.options);
1222
1223        Ok(())
1224    }
1225
1226    async fn persist_segment_headers(
1227        &self,
1228        segment_headers: Vec<SegmentHeader>,
1229    ) -> Result<(), PersistSegmentHeadersError> {
1230        let mut state = self.inner.state.write().await;
1231
1232        let added_segment_headers = state
1233            .segment_headers_cache
1234            .add_segment_headers(segment_headers)?;
1235
1236        if added_segment_headers.is_empty() {
1237            return Ok(());
1238        }
1239
1240        // Convert write lock into upgradable read lock to allow reads, while preventing segment
1241        // headers modifications
1242        // TODO: This assumes both guarantees in https://github.com/smol-rs/async-lock/issues/100
1243        //  are satisfied. If not, blocking read locks in other places will cause issues.
1244        let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1245
1246        let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1247
1248        storage_backend_adapter
1249            .write_storage_item(StorageItemBlock::SegmentHeaders(
1250                StorageItemBlockSegmentHeaders {
1251                    segment_headers: added_segment_headers,
1252                },
1253            ))
1254            .await?;
1255
1256        Ok(())
1257    }
1258}
1259
1260impl<StorageBackend> BeaconChainInfo for ClientDatabase<OwnedBeaconChainBlock, StorageBackend>
1261where
1262    StorageBackend: ClientDatabaseStorageBackend,
1263{
1264    fn shard_segment_roots(
1265        &self,
1266        block_number: BlockNumber,
1267    ) -> Result<StdArc<[ShardSegmentRoot]>, ShardSegmentRootsError> {
1268        // Blocking read lock is fine because where a write lock is only taken for a short time and
1269        // most locks are read locks
1270        let state = self.inner.state.read_blocking();
1271        let best_number = state.best_tip().number;
1272
1273        let block_offset = u64::from(
1274            best_number
1275                .checked_sub(block_number)
1276                .ok_or(ShardSegmentRootsError::BlockMissing { block_number })?,
1277        ) as usize;
1278
1279        let block = state
1280            .data
1281            .blocks
1282            .get(block_offset)
1283            .ok_or(ShardSegmentRootsError::BlockMissing { block_number })?
1284            .first()
1285            .expect("There is always at least one block candidate; qed");
1286
1287        Ok(StdArc::clone(
1288            &block
1289                .beacon_chain_block_details()
1290                .as_ref()
1291                .expect("Always present in the beacon chain block; qed")
1292                .shard_segment_roots,
1293        ))
1294    }
1295
1296    #[inline]
1297    fn last_super_segment_header(&self) -> Option<SuperSegmentHeader> {
1298        // Blocking read lock is fine because where a write lock is only taken for a short time and
1299        // most locks are read locks
1300        let state = self.inner.state.read_blocking();
1301        state
1302            .super_segment_headers_cache
1303            .last_super_segment_header()
1304    }
1305
1306    #[inline]
1307    fn previous_super_segment_header(
1308        &self,
1309        target_block_number: BlockNumber,
1310    ) -> Option<SuperSegmentHeader> {
1311        // Blocking read lock is fine because where a write lock is only taken for a short time and
1312        // most locks are read locks
1313        let state = self.inner.state.read_blocking();
1314
1315        state
1316            .super_segment_headers_cache
1317            .previous_super_segment_header(target_block_number)
1318    }
1319
1320    #[inline]
1321    fn get_super_segment_header(
1322        &self,
1323        super_segment_index: SuperSegmentIndex,
1324    ) -> Option<SuperSegmentHeader> {
1325        // Blocking read lock is fine because where a write lock is only taken for a short time and
1326        // most locks are read locks
1327        let state = self.inner.state.read_blocking();
1328
1329        state
1330            .super_segment_headers_cache
1331            .get_super_segment_header(super_segment_index)
1332    }
1333}
1334
1335impl<StorageBackend> BeaconChainInfoWrite for ClientDatabase<OwnedBeaconChainBlock, StorageBackend>
1336where
1337    StorageBackend: ClientDatabaseStorageBackend,
1338{
1339    async fn persist_super_segment_header(
1340        &self,
1341        super_segment_header: SuperSegmentHeader,
1342    ) -> Result<bool, PersistSuperSegmentHeadersError> {
1343        let mut state = self.inner.state.write().await;
1344
1345        let added_super_segment_headers = state
1346            .super_segment_headers_cache
1347            .add_super_segment_headers(vec![super_segment_header])?;
1348
1349        if added_super_segment_headers.is_empty() {
1350            return Ok(false);
1351        }
1352
1353        // Convert write lock into upgradable read lock to allow reads, while preventing super
1354        // segment headers modifications
1355        // TODO: This assumes both guarantees in https://github.com/smol-rs/async-lock/issues/100
1356        //  are satisfied. If not, blocking read locks in other places will cause issues.
1357        let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1358
1359        let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1360
1361        storage_backend_adapter
1362            .write_storage_item(StorageItemBlock::SuperSegmentHeaders(
1363                StorageItemBlockSuperSegmentHeaders {
1364                    super_segment_headers: added_super_segment_headers,
1365                },
1366            ))
1367            .await?;
1368
1369        Ok(true)
1370    }
1371
1372    async fn persist_super_segment_headers(
1373        &self,
1374        super_segment_headers: Vec<SuperSegmentHeader>,
1375    ) -> Result<(), PersistSuperSegmentHeadersError> {
1376        let mut state = self.inner.state.write().await;
1377
1378        let added_super_segment_headers = state
1379            .super_segment_headers_cache
1380            .add_super_segment_headers(super_segment_headers)?;
1381
1382        if added_super_segment_headers.is_empty() {
1383            return Ok(());
1384        }
1385
1386        // Convert write lock into upgradable read lock to allow reads, while preventing super
1387        // segment headers modifications
1388        // TODO: This assumes both guarantees in https://github.com/smol-rs/async-lock/issues/100
1389        //  are satisfied. If not, blocking read locks in other places will cause issues.
1390        let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1391
1392        let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1393
1394        storage_backend_adapter
1395            .write_storage_item(StorageItemBlock::SuperSegmentHeaders(
1396                StorageItemBlockSuperSegmentHeaders {
1397                    super_segment_headers: added_super_segment_headers,
1398                },
1399            ))
1400            .await?;
1401
1402        Ok(())
1403    }
1404}
1405
1406impl<Block, StorageBackend> ClientDatabase<Block, StorageBackend>
1407where
1408    Block: GenericOwnedBlock,
1409    StorageBackend: ClientDatabaseStorageBackend,
1410{
1411    /// Open the existing database.
1412    ///
1413    /// NOTE: The database needs to be formatted with [`Self::format()`] before it can be used.
1414    pub async fn open<GBB>(
1415        options: ClientDatabaseOptions<GBB, StorageBackend>,
1416    ) -> Result<Self, ClientDatabaseError>
1417    where
1418        GBB: FnOnce() -> GenesisBlockBuilderResult<Block>,
1419    {
1420        let ClientDatabaseOptions {
1421            write_buffer_size,
1422            block_confirmation_depth,
1423            soft_confirmation_depth,
1424            max_fork_tips,
1425            max_fork_tip_distance,
1426            genesis_block_builder,
1427            storage_backend,
1428        } = options;
1429        if soft_confirmation_depth >= block_confirmation_depth {
1430            return Err(ClientDatabaseError::InvalidSoftConfirmationDepth);
1431        }
1432
1433        if max_fork_tip_distance > block_confirmation_depth {
1434            return Err(ClientDatabaseError::InvalidMaxForkTipDistance);
1435        }
1436
1437        let mut state_data = StateData {
1438            fork_tips: VecDeque::new(),
1439            block_roots: HashMap::default(),
1440            blocks: VecDeque::new(),
1441        };
1442        let mut segment_headers_cache = SegmentHeadersCache {
1443            segment_headers_cache: Vec::new(),
1444        };
1445        let mut super_segment_headers_cache = SuperSegmentHeadersCache {
1446            super_segment_headers_cache: Vec::new(),
1447        };
1448
1449        let options = ClientDatabaseInnerOptions {
1450            block_confirmation_depth,
1451            soft_confirmation_depth,
1452            max_fork_tips,
1453            max_fork_tip_distance,
1454        };
1455
1456        let storage_item_handlers = StorageItemHandlers {
1457            permanent: |_arg| {
1458                // TODO
1459                Ok(())
1460            },
1461            block: |arg| {
1462                let StorageItemHandlerArg {
1463                    storage_item,
1464                    page_offset,
1465                    num_pages,
1466                } = arg;
1467                let storage_item_block = match storage_item {
1468                    StorageItemBlock::Block(storage_item_block) => storage_item_block,
1469                    StorageItemBlock::SegmentHeaders(segment_headers) => {
1470                        let num_segment_headers = segment_headers.segment_headers.len();
1471                        return match segment_headers_cache
1472                            .add_segment_headers(segment_headers.segment_headers)
1473                        {
1474                            Ok(_) => Ok(()),
1475                            Err(error) => {
1476                                error!(
1477                                    %page_offset,
1478                                    %num_segment_headers,
1479                                    %error,
1480                                    "Failed to add segment headers from storage item"
1481                                );
1482
1483                                Err(ClientDatabaseError::InvalidSegmentHeaders { page_offset })
1484                            }
1485                        };
1486                    }
1487                    StorageItemBlock::SuperSegmentHeaders(super_segment_headers) => {
1488                        let num_super_segment_headers =
1489                            super_segment_headers.super_segment_headers.len();
1490                        return match super_segment_headers_cache
1491                            .add_super_segment_headers(super_segment_headers.super_segment_headers)
1492                        {
1493                            Ok(_) => Ok(()),
1494                            Err(error) => {
1495                                error!(
1496                                    %page_offset,
1497                                    %num_super_segment_headers,
1498                                    %error,
1499                                    "Failed to add segment headers from storage item"
1500                                );
1501
1502                                Err(ClientDatabaseError::InvalidSegmentHeaders { page_offset })
1503                            }
1504                        };
1505                    }
1506                };
1507
1508                // TODO: It would be nice to not allocate body here since we'll not use it here
1509                //  anyway
1510                let StorageItemBlockBlock {
1511                    header,
1512                    body,
1513                    mmr_with_block,
1514                    system_contract_states,
1515                } = storage_item_block;
1516
1517                let header = Block::Header::from_buffer(header).map_err(|_buffer| {
1518                    error!(%page_offset, "Failed to decode block header from bytes");
1519
1520                    ClientDatabaseError::InvalidBlock { page_offset }
1521                })?;
1522                let body = Block::Body::from_buffer(body).map_err(|_buffer| {
1523                    error!(%page_offset, "Failed to decode block body from bytes");
1524
1525                    ClientDatabaseError::InvalidBlock { page_offset }
1526                })?;
1527
1528                let block_root = *header.header().root();
1529                let block_number = header.header().prefix.number;
1530
1531                state_data.block_roots.insert(block_root, block_number);
1532
1533                let maybe_best_number = state_data
1534                    .blocks
1535                    .front()
1536                    .and_then(|block_forks| block_forks.first())
1537                    .map(|best_block| {
1538                        // Type inference is not working here for some reason
1539                        let header: &Block::Header = best_block.header();
1540
1541                        header.header().prefix.number
1542                    });
1543
1544                let block_offset = if let Some(best_number) = maybe_best_number {
1545                    if block_number <= best_number {
1546                        u64::from(best_number - block_number) as usize
1547                    } else {
1548                        // The new best block must follow the previous best block
1549                        if block_number - best_number != BlockNumber::ONE {
1550                            error!(
1551                                %page_offset,
1552                                %best_number,
1553                                %block_number,
1554                                "Invalid new best block number, it must be only one block \
1555                                higher than the best block"
1556                            );
1557
1558                            return Err(ClientDatabaseError::InvalidBlock { page_offset });
1559                        }
1560
1561                        state_data.blocks.push_front(SmallVec::new());
1562                        // Will insert a new block at the front
1563                        0
1564                    }
1565                } else {
1566                    state_data.blocks.push_front(SmallVec::new());
1567                    // Will insert a new block at the front
1568                    0
1569                };
1570
1571                let block_forks = match state_data.blocks.get_mut(block_offset) {
1572                    Some(block_forks) => block_forks,
1573                    None => {
1574                        // Ignore the older block, other blocks at its height were already pruned
1575                        // anyway
1576
1577                        return Ok(());
1578                    }
1579                };
1580
1581                // Push a new block to the end of the list, we'll fix it up later
1582                let beacon_chain_block_details =
1583                    <dyn Any>::downcast_ref::<OwnedBeaconChainBody>(&body)
1584                        .map(|body| BeaconChainBlockDetails::from_body(body.body()));
1585                block_forks.push(ClientDatabaseBlock::Persisted {
1586                    header,
1587                    block_details: BlockDetails {
1588                        mmr_with_block,
1589                        system_contract_states,
1590                    },
1591                    beacon_chain_block_details,
1592                    write_location: WriteLocation {
1593                        page_offset,
1594                        num_pages,
1595                    },
1596                });
1597
1598                // If a new block was inserted, confirm a new canonical block to prune extra
1599                // in-memory information
1600                if block_offset == 0 && block_forks.len() == 1 {
1601                    Self::confirm_canonical_block(block_number, &mut state_data, &options);
1602                }
1603
1604                Ok(())
1605            },
1606        };
1607
1608        let storage_backend_adapter =
1609            StorageBackendAdapter::open(write_buffer_size, storage_item_handlers, storage_backend)
1610                .await?;
1611
1612        if let Some(best_block) = state_data.blocks.front().and_then(|block_forks| {
1613            // The best block is last in the list here because that is how it was inserted while
1614            // reading from the database
1615            block_forks.last()
1616        }) {
1617            // Type inference is not working here for some reason
1618            let header: &Block::Header = best_block.header();
1619            let header = header.header();
1620            let block_number = header.prefix.number;
1621            let block_root = *header.root();
1622
1623            if !Self::adjust_ancestor_block_forks(&mut state_data.blocks, block_root) {
1624                return Err(ClientDatabaseError::FailedToAdjustAncestorBlockForks);
1625            }
1626
1627            // Store the best block as the first and only fork tip
1628            state_data.fork_tips.push_front(ForkTip {
1629                number: block_number,
1630                root: block_root,
1631            });
1632        } else {
1633            let GenesisBlockBuilderResult {
1634                block,
1635                system_contract_states,
1636            } = genesis_block_builder();
1637
1638            // If the database is empty, initialize everything with the genesis block
1639            let header = block.header().header();
1640            let block_number = header.prefix.number;
1641            let block_root = *header.root();
1642
1643            state_data.fork_tips.push_front(ForkTip {
1644                number: block_number,
1645                root: block_root,
1646            });
1647            state_data.block_roots.insert(block_root, block_number);
1648            let beacon_chain_block_details =
1649                <dyn Any>::downcast_ref::<OwnedBeaconChainBlock>(&block)
1650                    .map(|block| BeaconChainBlockDetails::from_body(block.body.body()));
1651            state_data
1652                .blocks
1653                .push_front(smallvec![ClientDatabaseBlock::InMemory {
1654                    block,
1655                    block_details: BlockDetails {
1656                        system_contract_states,
1657                        mmr_with_block: Arc::new({
1658                            let mut mmr = BlockMerkleMountainRange::new();
1659                            mmr.add_leaf(&block_root);
1660                            mmr
1661                        })
1662                    },
1663                    beacon_chain_block_details,
1664                }]);
1665        }
1666
1667        let state = State {
1668            data: state_data,
1669            segment_headers_cache,
1670            super_segment_headers_cache,
1671            storage_backend_adapter: AsyncRwLock::new(storage_backend_adapter),
1672        };
1673
1674        let inner = Inner {
1675            state: AsyncRwLock::new(state),
1676            options,
1677        };
1678
1679        Ok(Self {
1680            inner: Arc::new(inner),
1681        })
1682    }
1683
1684    /// Format a new database
1685    pub async fn format(
1686        storage_backend: &StorageBackend,
1687        options: ClientDatabaseFormatOptions,
1688    ) -> Result<(), ClientDatabaseFormatError> {
1689        StorageBackendAdapter::format(storage_backend, options).await
1690    }
1691
1692    fn insert_first_block(state: &mut StateData<Block>, block: Block, block_details: BlockDetails) {
1693        // If the database is empty, initialize everything with the genesis block
1694        let header = block.header().header();
1695        let block_number = header.prefix.number;
1696        let block_root = *header.root();
1697
1698        state.fork_tips.clear();
1699        state.fork_tips.push_front(ForkTip {
1700            number: block_number,
1701            root: block_root,
1702        });
1703        state.block_roots.clear();
1704        state.block_roots.insert(block_root, block_number);
1705        state.blocks.clear();
1706        let beacon_chain_block_details = <dyn Any>::downcast_ref::<OwnedBeaconChainBlock>(&block)
1707            .map(|block| BeaconChainBlockDetails::from_body(block.body.body()));
1708        state
1709            .blocks
1710            .push_front(smallvec![ClientDatabaseBlock::InMemory {
1711                block,
1712                block_details,
1713                beacon_chain_block_details,
1714            }]);
1715    }
1716
1717    async fn insert_new_best_block(
1718        mut state: AsyncRwLockWriteGuard<'_, State<Block, StorageBackend>>,
1719        inner: &Inner<Block, StorageBackend>,
1720        block: Block,
1721        block_details: BlockDetails,
1722    ) -> Result<(), PersistBlockError> {
1723        let header = block.header().header();
1724        let block_number = header.prefix.number;
1725        let block_root = *header.root();
1726        let parent_root = header.prefix.parent_root;
1727
1728        // Adjust the relative order of forks to ensure the first index always corresponds to
1729        // ancestors of the new best block
1730        if !Self::adjust_ancestor_block_forks(&mut state.data.blocks, parent_root) {
1731            return Err(PersistBlockError::MissingParent);
1732        }
1733
1734        // Store new block in the state
1735        {
1736            for (index, fork_tip) in state.data.fork_tips.iter_mut().enumerate() {
1737                // Block's parent is no longer a fork tip, remove it
1738                if fork_tip.root == parent_root {
1739                    state.data.fork_tips.remove(index);
1740                    break;
1741                }
1742            }
1743
1744            state.data.fork_tips.push_front(ForkTip {
1745                number: block_number,
1746                root: block_root,
1747            });
1748            state.data.block_roots.insert(block_root, block_number);
1749            let beacon_chain_block_details =
1750                <dyn Any>::downcast_ref::<OwnedBeaconChainBlock>(&block)
1751                    .map(|block| BeaconChainBlockDetails::from_body(block.body.body()));
1752            state
1753                .data
1754                .blocks
1755                .push_front(smallvec![ClientDatabaseBlock::InMemory {
1756                    block,
1757                    block_details: block_details.clone(),
1758                    beacon_chain_block_details,
1759                }]);
1760        }
1761
1762        let options = &inner.options;
1763
1764        Self::confirm_canonical_block(block_number, &mut state.data, options);
1765        Self::prune_outdated_fork_tips(block_number, &mut state.data, options);
1766
1767        // Convert write lock into upgradable read lock to allow reads, while preventing concurrent
1768        // block modifications
1769        // TODO: This assumes both guarantees in https://github.com/smol-rs/async-lock/issues/100
1770        //  are satisfied. If not, blocking read locks in other places will cause issues.
1771        let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1772
1773        let mut blocks_to_persist = Vec::new();
1774        for block_offset in u64::from(options.soft_confirmation_depth) as usize.. {
1775            let Some(fork_blocks) = state.data.blocks.get(block_offset) else {
1776                break;
1777            };
1778
1779            let len_before = blocks_to_persist.len();
1780            fork_blocks
1781                .iter()
1782                .enumerate()
1783                .filter_map(|(fork_offset, client_database_block)| {
1784                    match client_database_block {
1785                        ClientDatabaseBlock::InMemory {
1786                            block,
1787                            block_details,
1788                            beacon_chain_block_details: _,
1789                        } => Some(BlockToPersist {
1790                            block_offset,
1791                            fork_offset,
1792                            block,
1793                            block_details,
1794                        }),
1795                        ClientDatabaseBlock::Persisted { .. }
1796                        | ClientDatabaseBlock::PersistedConfirmed { .. } => {
1797                            // Already persisted
1798                            None
1799                        }
1800                    }
1801                })
1802                .collect_into(&mut blocks_to_persist);
1803
1804            if blocks_to_persist.len() == len_before {
1805                break;
1806            }
1807        }
1808
1809        // Persist blocks from older to newer
1810        let mut persisted_blocks = Vec::with_capacity(blocks_to_persist.len());
1811        {
1812            let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1813
1814            for block_to_persist in blocks_to_persist.into_iter().rev() {
1815                let BlockToPersist {
1816                    block_offset,
1817                    fork_offset,
1818                    block,
1819                    block_details,
1820                } = block_to_persist;
1821
1822                let write_location = storage_backend_adapter
1823                    .write_storage_item(StorageItemBlock::Block(StorageItemBlockBlock {
1824                        header: block.header().buffer().clone(),
1825                        body: block.body().buffer().clone(),
1826                        mmr_with_block: Arc::clone(&block_details.mmr_with_block),
1827                        system_contract_states: StdArc::clone(
1828                            &block_details.system_contract_states,
1829                        ),
1830                    }))
1831                    .await?;
1832
1833                persisted_blocks.push(PersistedBlock {
1834                    block_offset,
1835                    fork_offset,
1836                    write_location,
1837                });
1838            }
1839        }
1840
1841        // Convert blocks to persisted
1842        let mut state = RwLockUpgradableReadGuard::upgrade(state).await;
1843        for persisted_block in persisted_blocks {
1844            let PersistedBlock {
1845                block_offset,
1846                fork_offset,
1847                write_location,
1848            } = persisted_block;
1849
1850            let block = state
1851                .data
1852                .blocks
1853                .get_mut(block_offset)
1854                .expect("Still holding the same lock since last check; qed")
1855                .get_mut(fork_offset)
1856                .expect("Still holding the same lock since last check; qed");
1857
1858            replace_with_or_abort(block, |block| {
1859                if let ClientDatabaseBlock::InMemory {
1860                    block,
1861                    block_details,
1862                    beacon_chain_block_details,
1863                } = block
1864                {
1865                    let (header, _body) = block.split();
1866
1867                    ClientDatabaseBlock::Persisted {
1868                        header,
1869                        block_details,
1870                        beacon_chain_block_details,
1871                        write_location,
1872                    }
1873                } else {
1874                    unreachable!("Still holding the same lock since last check; qed");
1875                }
1876            });
1877        }
1878
1879        // TODO: Prune blocks that are no longer necessary
1880        // TODO: Prune unused page groups here or elsewhere?
1881
1882        Ok(())
1883    }
1884
1885    /// Adjust the relative order of forks to ensure the first index always corresponds to
1886    /// `parent_block_root` and its ancestors.
1887    ///
1888    /// Returns `true` on success and `false` if one of the parents was not found.
1889    #[must_use]
1890    fn adjust_ancestor_block_forks(
1891        blocks: &mut VecDeque<SmallVec<[ClientDatabaseBlock<Block>; 2]>>,
1892        mut parent_block_root: BlockRoot,
1893    ) -> bool {
1894        let mut ancestor_blocks = blocks.iter_mut();
1895
1896        loop {
1897            if ancestor_blocks.len() == 1 {
1898                // Nothing left to adjust with a single fork
1899                break;
1900            }
1901
1902            let Some(parent_blocks) = ancestor_blocks.next() else {
1903                // No more parent headers present
1904                break;
1905            };
1906
1907            let Some(fork_offset_parent_block_root) =
1908                parent_blocks
1909                    .iter()
1910                    .enumerate()
1911                    .find_map(|(fork_offset, fork_block)| {
1912                        let fork_header = fork_block.header().header();
1913                        if *fork_header.root() == parent_block_root {
1914                            Some((fork_offset, fork_header.prefix.parent_root))
1915                        } else {
1916                            None
1917                        }
1918                    })
1919            else {
1920                return false;
1921            };
1922
1923            let fork_offset;
1924            (fork_offset, parent_block_root) = fork_offset_parent_block_root;
1925
1926            parent_blocks.swap(0, fork_offset);
1927        }
1928
1929        true
1930    }
1931
1932    /// Prune outdated fork tips that are too deep and have not been updated for a long time.
1933    ///
1934    /// Note that actual headers, blocks and MMRs could remain if they are currently used by
1935    /// something or were already persisted on disk. With persisted blocks specifically, RAM usage
1936    /// implications are minimal, and we wouldn't want to re-download already stored blocks in case
1937    /// they end up being necessary later.
1938    fn prune_outdated_fork_tips(
1939        best_number: BlockNumber,
1940        state: &mut StateData<Block>,
1941        options: &ClientDatabaseInnerOptions,
1942    ) {
1943        let state = &mut *state;
1944
1945        // These forks are just candidates because they will not be pruned if the reference count is
1946        // not 1, indicating they are still in use by something
1947        let mut candidate_forks_to_remove = Vec::with_capacity(options.max_fork_tips.get());
1948
1949        // Prune forks that are too far away from the best block
1950        state.fork_tips.retain(|fork_tip| {
1951            if best_number - fork_tip.number > options.max_fork_tip_distance {
1952                candidate_forks_to_remove.push(*fork_tip);
1953                false
1954            } else {
1955                true
1956            }
1957        });
1958        // Prune forks that exceed the maximum number of forks
1959        if state.fork_tips.len() > options.max_fork_tips.get() {
1960            state
1961                .fork_tips
1962                .drain(options.max_fork_tips.get()..)
1963                .collect_into(&mut candidate_forks_to_remove);
1964        }
1965
1966        // Prune all possible candidates
1967        candidate_forks_to_remove
1968            .retain(|fork_tip| !Self::prune_outdated_fork(best_number, fork_tip, state));
1969        // Return those that were not pruned back to the list of tips
1970        state.fork_tips.extend(candidate_forks_to_remove);
1971    }
1972
1973    /// Returns `true` if the tip was pruned successfully and `false` if it should be returned to
1974    /// the list of fork tips
1975    #[must_use]
1976    fn prune_outdated_fork(
1977        best_number: BlockNumber,
1978        fork_tip: &ForkTip,
1979        state: &mut StateData<Block>,
1980    ) -> bool {
1981        let block_offset = u64::from(best_number - fork_tip.number) as usize;
1982
1983        // Prune fork top and all its ancestors that are not used
1984        let mut block_root_to_prune = fork_tip.root;
1985        let mut pruned_tip = false;
1986        for block_offset in block_offset.. {
1987            let Some(fork_blocks) = state.blocks.get_mut(block_offset) else {
1988                if !pruned_tip {
1989                    error!(
1990                        %best_number,
1991                        ?fork_tip,
1992                        block_offset,
1993                        "Block offset was not present in the database, this is an implementation \
1994                        bug #1"
1995                    );
1996                }
1997                // No forks left to prune
1998                break;
1999            };
2000
2001            if fork_blocks.len() == 1 {
2002                if !pruned_tip {
2003                    error!(
2004                        %best_number,
2005                        ?fork_tip,
2006                        block_offset,
2007                        "Block offset was not present in the database, this is an implementation \
2008                        bug #2"
2009                    );
2010                }
2011
2012                // No forks left to prune
2013                break;
2014            }
2015
2016            let Some((fork_offset, block)) = fork_blocks
2017                .iter()
2018                .enumerate()
2019                // Skip ancestor of the best block, it is certainly not a fork to be pruned
2020                .skip(1)
2021                .find(|(_fork_offset, block)| {
2022                    *block.header().header().root() == block_root_to_prune
2023                })
2024            else {
2025                if !pruned_tip {
2026                    error!(
2027                        %best_number,
2028                        ?fork_tip,
2029                        block_offset,
2030                        "Block offset was not present in the database, this is an implementation \
2031                        bug #3"
2032                    );
2033                }
2034
2035                // Nothing left to prune
2036                break;
2037            };
2038
2039            // More than one instance means something somewhere is using or depends on this block
2040            if block.header().ref_count() > 1 {
2041                break;
2042            }
2043
2044            // Blocks that are already persisted
2045            match block {
2046                ClientDatabaseBlock::InMemory { .. } => {
2047                    // Prune
2048                }
2049                ClientDatabaseBlock::Persisted { .. }
2050                | ClientDatabaseBlock::PersistedConfirmed { .. } => {
2051                    // Already on disk, keep it in memory for later, but prune the tip
2052                    pruned_tip = true;
2053                    break;
2054                }
2055            }
2056
2057            state.block_roots.get_mut(&block_root_to_prune);
2058            block_root_to_prune = block.header().header().prefix.parent_root;
2059            fork_blocks.swap_remove(fork_offset);
2060
2061            pruned_tip = true;
2062        }
2063
2064        pruned_tip
2065    }
2066
2067    /// Confirm a block at confirmation depth k and prune any other blocks at the same depth with
2068    /// their descendants
2069    fn confirm_canonical_block(
2070        best_number: BlockNumber,
2071        state_data: &mut StateData<Block>,
2072        options: &ClientDatabaseInnerOptions,
2073    ) {
2074        // `+1` means it effectively confirms parent blocks instead. This is done to keep the parent
2075        // of the confirmed block with its MMR in memory due to confirmed blocks not storing their
2076        // MMRs, which might be needed for reorgs at the lowest possible depth.
2077        let block_offset = u64::from(options.block_confirmation_depth + BlockNumber::ONE) as usize;
2078
2079        let Some(fork_blocks) = state_data.blocks.get_mut(block_offset) else {
2080            // Nothing to confirm yet
2081            return;
2082        };
2083
2084        // Mark the canonical block as confirmed
2085        {
2086            let Some(canonical_block) = fork_blocks.first_mut() else {
2087                error!(
2088                    %best_number,
2089                    block_offset,
2090                    "Have not found a canonical block to confirm, this is an implementation bug"
2091                );
2092                return;
2093            };
2094
2095            replace_with_or_abort(canonical_block, |block| match block {
2096                ClientDatabaseBlock::InMemory { .. } => {
2097                    error!(
2098                        %best_number,
2099                        block_offset,
2100                        header = ?block.header(),
2101                        "Block to be confirmed must not be in memory, this is an implementation bug"
2102                    );
2103                    block
2104                }
2105                ClientDatabaseBlock::Persisted {
2106                    header,
2107                    block_details: _,
2108                    beacon_chain_block_details,
2109                    write_location,
2110                } => ClientDatabaseBlock::PersistedConfirmed {
2111                    header,
2112                    beacon_chain_block_details,
2113                    write_location,
2114                },
2115                ClientDatabaseBlock::PersistedConfirmed { .. } => {
2116                    error!(
2117                        %best_number,
2118                        block_offset,
2119                        header = ?block.header(),
2120                        "Block to be confirmed must not be confirmed yet, this is an \
2121                        implementation bug"
2122                    );
2123                    block
2124                }
2125            });
2126        }
2127
2128        // Prune the rest of the blocks and their descendants
2129        let mut block_roots_to_prune = fork_blocks
2130            .drain(1..)
2131            .map(|block| *block.header().header().root())
2132            .collect::<Vec<_>>();
2133        let mut current_block_offset = block_offset;
2134        while !block_roots_to_prune.is_empty() {
2135            // Prune fork tips (if any)
2136            state_data
2137                .fork_tips
2138                .retain(|fork_tip| !block_roots_to_prune.contains(&fork_tip.root));
2139
2140            // Prune removed block roots
2141            for block_root in &block_roots_to_prune {
2142                state_data.block_roots.remove(block_root);
2143            }
2144
2145            // Block offset for direct descendants
2146            if let Some(next_block_offset) = current_block_offset.checked_sub(1) {
2147                current_block_offset = next_block_offset;
2148            } else {
2149                // Reached the tip
2150                break;
2151            }
2152
2153            let fork_blocks = state_data
2154                .blocks
2155                .get_mut(current_block_offset)
2156                .expect("Lower block offset always exists; qed");
2157
2158            // Collect descendants of pruned blocks to prune them next
2159            block_roots_to_prune = fork_blocks
2160                .drain_filter(|block| {
2161                    let header = block.header().header();
2162
2163                    block_roots_to_prune.contains(&header.prefix.parent_root)
2164                })
2165                .map(|block| *block.header().header().root())
2166                .collect();
2167        }
2168    }
2169}