Skip to main content

ab_client_database/
lib.rs

1//! Client database.
2//!
3//! ## High-level architecture overview
4//!
5//! The database operates on [`ClientDatabaseStorageBackend`], which is backed by [`AlignedPage`]s
6//! that can be read or written. Pages contain `StorageItem`s, one storage item can occupy one or
7//! more pages, but pages always belong to a single storage item. Pages are the smallest unit and
8//! align nicely with the hardware architecture of modern SSDs. Each page starts with a prefix that
9//! describes the contents of the page. `StorageItem` always starts at the multiple of the
10//! `u128`/16 bytes, allowing for direct memory mapping onto target data structures.
11//!
12//! [`AlignedPage`]: crate::storage_backend::AlignedPage
13//!
14//! Individual pages are grouped into page groups (configurable via [`ClientDatabaseOptions`]). Page
15//! groups can be permanent and ephemeral. Permanent page groups store information that is never
16//! going to be deleted, like segment headers. Ephemeral page groups store the majority of the
17//! information about blocks, blockchain state and other things that are being created all the time.
18//! Once information in an ephemeral page group is too old and no longer needed, it can be
19//! repurposed for a new permanent or ephemeral page group. There are different kinds of page groups
20//! defined in `PageGroupKind`, and each variant has independent sequence numbers.
21//!
22//! Page groups are append-only, there is only one active permanent and one ephemeral page group.
23//! They are appended with more pages containing storage items until there is no space to add a
24//! complete storage item, after which the next page group is started.
25//!
26//! Ephemeral page groups can be freed only when they contain 100% outdated storage items.
27//! Individual pages can't be freed.
28//!
29//! Each storage item has a sequence number and checksums that help to define the global ordering
30//! and check whether a storage item was written fully. Upon restart, the page group containing the
31//! latest storage items is found, and the latest fully written storage item is identified to
32//! reconstruct the database state.
33//!
34//! Each page group starts with a `StorageItemPageGroupHeader` storage item for easier
35//! identification.
36//!
37//! The database is typically contained in a single file (though in principle could be contained in
38//! multiple if necessary). Before the database can be used, it needs to be formatted with a
39//! specific size (it is possible to increase the size afterward) before it can be used. It is
40//! expected (but depends on the storage backend) that the whole file size is pre-allocated on disk
41//! and no writes will fail due to lack of disk space (which could be the case with a sparse file).
42
43#![expect(incomplete_features, reason = "generic_const_exprs")]
44// TODO: This feature is not actually used in this crate, but is added as a workaround for
45//  https://github.com/rust-lang/rust/issues/141492
46#![feature(generic_const_exprs)]
47#![feature(
48    const_block_items,
49    const_convert,
50    const_trait_impl,
51    default_field_values,
52    get_mut_unchecked,
53    iter_collect_into,
54    maybe_uninit_fill
55)]
56
57mod page_group;
58pub mod storage_backend;
59mod storage_backend_adapter;
60
61use crate::page_group::temporary::StorageItemTemporary;
62use crate::page_group::temporary::block::StorageItemTemporaryBlock;
63use crate::page_group::temporary::segment_headers::StorageItemTemporarySegmentHeaders;
64use crate::page_group::temporary::super_segment_headers::StorageItemTemporarySuperSegmentHeaders;
65use crate::storage_backend::ClientDatabaseStorageBackend;
66use crate::storage_backend_adapter::{
67    StorageBackendAdapter, StorageItemHandlerArg, StorageItemHandlers, WriteLocation,
68};
69use ab_client_api::{
70    BeaconChainInfo, BeaconChainInfoWrite, BlockDetails, BlockMerkleMountainRange, ChainInfo,
71    ChainInfoWrite, ContractSlotState, PersistBlockError, PersistSegmentHeadersError,
72    PersistSuperSegmentHeadersError, ReadBlockError, ShardSegmentRoot, ShardSegmentRootsError,
73};
74use ab_core_primitives::block::body::BeaconChainBody;
75use ab_core_primitives::block::body::owned::{GenericOwnedBlockBody, OwnedBeaconChainBody};
76use ab_core_primitives::block::header::GenericBlockHeader;
77use ab_core_primitives::block::header::owned::GenericOwnedBlockHeader;
78use ab_core_primitives::block::owned::{GenericOwnedBlock, OwnedBeaconChainBlock};
79use ab_core_primitives::block::{BlockNumber, BlockRoot, GenericBlock};
80use ab_core_primitives::segments::{
81    LocalSegmentIndex, SegmentHeader, SegmentIndex, SuperSegmentHeader, SuperSegmentIndex,
82};
83use ab_core_primitives::shard::RealShardKind;
84use ab_io_type::trivial_type::TrivialType;
85use async_lock::{
86    RwLock as AsyncRwLock, RwLockUpgradableReadGuard, RwLockWriteGuard as AsyncRwLockWriteGuard,
87};
88use rand::rngs::SysError;
89use rclite::Arc;
90use replace_with::replace_with_or_abort;
91use smallvec::{SmallVec, smallvec};
92use std::any::Any;
93use std::collections::{HashMap, VecDeque};
94use std::hash::{BuildHasherDefault, Hasher};
95use std::num::{NonZeroU32, NonZeroUsize};
96use std::ops::Deref;
97use std::sync::Arc as StdArc;
98use std::{fmt, io};
99use tracing::error;
100
101/// Unique identifier for a database
102#[derive(Debug, Copy, Clone, Eq, PartialEq, TrivialType)]
103#[repr(C)]
104pub struct DatabaseId([u8; 32]);
105
106impl Deref for DatabaseId {
107    type Target = [u8; 32];
108
109    #[inline(always)]
110    fn deref(&self) -> &Self::Target {
111        &self.0
112    }
113}
114
115impl AsRef<[u8]> for DatabaseId {
116    #[inline(always)]
117    fn as_ref(&self) -> &[u8] {
118        &self.0
119    }
120}
121
122impl DatabaseId {
123    #[inline(always)]
124    pub const fn new(bytes: [u8; 32]) -> Self {
125        Self(bytes)
126    }
127}
128
129#[derive(Default)]
130struct BlockRootHasher(u64);
131
132impl Hasher for BlockRootHasher {
133    #[inline(always)]
134    fn finish(&self) -> u64 {
135        self.0
136    }
137
138    #[inline(always)]
139    fn write(&mut self, bytes: &[u8]) {
140        let Some(state) = bytes.as_chunks().0.first().copied().map(u64::from_le_bytes) else {
141            return;
142        };
143
144        self.0 = state;
145    }
146}
147
148#[derive(Debug)]
149pub struct GenesisBlockBuilderResult<Block> {
150    /// Genesis block
151    pub block: Block,
152    /// System contracts state in the genesis block
153    pub system_contract_states: StdArc<[ContractSlotState]>,
154}
155
156/// Options for [`ClientDatabase`]
157#[derive(Debug, Copy, Clone)]
158pub struct ClientDatabaseOptions<GBB, StorageBackend> {
159    /// Write buffer size.
160    ///
161    /// Larger buffer allows buffering more async writes for improved responsiveness but requires
162    /// more RAM. Zero buffer size means all writes must be completed before returning from the
163    /// operation that triggered it. Non-zero buffer means writes can happen in the background.
164    ///
165    /// The recommended value is 5.
166    pub write_buffer_size: usize = 5,
167    /// Blocks at this depth are considered to be "confirmed" and irreversible from the consensus
168    /// perspective.
169    ///
170    /// This parameter allows establishing a final canonical order of blocks and eliminating any
171    /// potential forks at a specified depth and beyond.
172    pub block_confirmation_depth: BlockNumber,
173    /// Soft confirmation depth for blocks.
174    ///
175    /// Doesn't prevent forking on the consensus level but makes it extremely unlikely.
176    ///
177    /// This parameter determines how many blocks are retained in memory before being written to
178    /// disk. Writing discarded blocks to disk is a waste of resources, so they are retained in
179    /// memory before being soft-confirmed and written to disk for longer-term storage.
180    ///
181    /// A smaller number reduces memory usage while increasing the probability of unnecessary disk
182    /// writes. A larger number increases memory usage, while avoiding unnecessary disk writes, but
183    /// also increases the chance of recent blocks not being retained on disk in case of a crash.
184    ///
185    /// The recommended value is 3 blocks.
186    pub soft_confirmation_depth: BlockNumber = BlockNumber::from(3),
187    /// Defines how many fork tips should be maintained in total.
188    ///
189    /// As natural forks occur, there may be more than one tip in existence, with only one of them
190    /// being considered "canonical". This parameter defines how many of these tips to maintain in a
191    /// sort of LRU style cache. Tips beyond this limit that were not extended for a long time will
192    /// be pruned automatically.
193    ///
194    /// A larger number results in higher memory usage and higher complexity of pruning algorithms.
195    ///
196    /// The recommended value is 3 blocks.
197    pub max_fork_tips: NonZeroUsize = NonZeroUsize::new(3).expect("Not zero; qed"),
198    /// Max distance between fork tip and the best block.
199    ///
200    /// When forks are this deep, they will be pruned, even without reaching the `max_fork_tips`
201    /// limit. This essentially means the tip was not extended for some time, and while it is
202    /// theoretically possible for the chain to continue from this tip, the probability is so small
203    /// that it is not worth storing it.
204    ///
205    /// A larger value results in higher memory usage and higher complexity of pruning algorithms.
206    ///
207    /// The recommended value is 5 blocks.
208    pub max_fork_tip_distance: BlockNumber = BlockNumber::from(5),
209    /// Genesis block builder is responsible to create genesis block and corresponding state for
210    /// bootstrapping purposes.
211    pub genesis_block_builder: GBB,
212    /// Storage backend to use for storing and retrieving storage items
213    pub storage_backend: StorageBackend,
214}
215
216/// Options for [`ClientDatabase`]
217#[derive(Debug, Copy, Clone)]
218pub struct ClientDatabaseFormatOptions {
219    /// The number of [`AlignedPage`]s in a single page group.
220    ///
221    /// [`AlignedPage`]: crate::storage_backend::AlignedPage
222    ///
223    /// Each group always has a set of storage items with monotonically increasing sequence
224    /// numbers. The database only frees page groups for reuse when all storage items there are
225    /// no longer in use.
226    ///
227    /// A smaller number means storage can be reclaimed for reuse more quickly and higher
228    /// concurrency during restart, but must not be too small that no storage item fits within a
229    /// page group anymore. A larger number allows finding the range of sequence numbers that are
230    /// already used and where potential write interruption happened on restart more efficiently,
231    /// but will use more RAM in the process.
232    ///
233    /// The recommended size is 256 MiB unless a tiny database is used for testing purposes, where
234    /// a smaller value might work too.
235    pub page_group_size: NonZeroU32,
236    /// By default, formatting will be aborted if the database appears to be already formatted.
237    ///
238    /// Setting this option to `true` skips the check and formats the database anyway.
239    pub force: bool,
240}
241
242#[derive(Debug, thiserror::Error)]
243pub enum ClientDatabaseError {
244    /// Invalid soft confirmation depth, it must be smaller than confirmation depth k
245    #[error("Invalid soft confirmation depth, it must be smaller than confirmation depth k")]
246    InvalidSoftConfirmationDepth,
247    /// Invalid max fork tip distance, it must be smaller or equal to confirmation depth k
248    #[error("Invalid max fork tip distance, it must be smaller or equal to confirmation depth k")]
249    InvalidMaxForkTipDistance,
250    /// Storage backend has canceled read request
251    #[error("Storage backend has canceled read request")]
252    ReadRequestCancelled,
253    /// Storage backend read error
254    #[error("Storage backend read error: {error}")]
255    ReadError {
256        /// Low-level error
257        error: io::Error,
258    },
259    /// Unsupported database version
260    #[error("Unsupported database version: {database_version}")]
261    UnsupportedDatabaseVersion {
262        /// Database version
263        database_version: u8,
264    },
265    /// Page group size is too small, must be at least two pages
266    #[error("Page group size is too small ({page_group_size}), must be at least two pages")]
267    PageGroupSizeTooSmall {
268        /// Page group size in pages
269        page_group_size: u32,
270    },
271    /// Unexpected sequence number
272    #[error(
273        "Unexpected sequence number {actual} at page offset {page_offset} (expected \
274        {expected})"
275    )]
276    UnexpectedSequenceNumber {
277        /// Sequence number in the database
278        actual: u64,
279        /// Expected sequence number
280        expected: u64,
281        /// Page offset where storage item is found
282        page_offset: u32,
283    },
284    /// Unexpected storage item
285    #[error("Unexpected storage item at offset {page_offset}: {storage_item:?}")]
286    UnexpectedStorageItem {
287        /// First storage item
288        storage_item: Box<dyn fmt::Debug + Send + Sync>,
289        /// Page offset where storage item is found
290        page_offset: u32,
291    },
292    /// Invalid block
293    #[error("Invalid block at offset {page_offset}")]
294    InvalidBlock {
295        /// Page offset where storage item is found
296        page_offset: u32,
297    },
298    /// Invalid segment headers
299    #[error("Invalid segment headers at offset {page_offset}")]
300    InvalidSegmentHeaders {
301        /// Page offset where storage item is found
302        page_offset: u32,
303    },
304    /// Failed to adjust ancestor block forks
305    #[error("Failed to adjust ancestor block forks")]
306    FailedToAdjustAncestorBlockForks,
307    /// Database is not formatted yet
308    #[error("Database is not formatted yet")]
309    Unformatted,
310    /// Non-permanent first page group
311    #[error("Non-permanent first page group")]
312    NonPermanentFirstPageGroup,
313}
314
315/// Error for [`ClientDatabase::format()`]
316#[derive(Debug, thiserror::Error)]
317pub enum ClientDatabaseFormatError {
318    /// Storage backend has canceled read request
319    #[error("Storage backend has canceled read request")]
320    ReadRequestCancelled,
321    /// Storage backend read error
322    #[error("Storage backend read error: {error}")]
323    ReadError {
324        /// Low-level error
325        error: io::Error,
326    },
327    /// Failed to generate database id
328    #[error("Failed to generate database id")]
329    FailedToGenerateDatabaseId {
330        /// Low-level error
331        #[from]
332        error: SysError,
333    },
334    /// Database is already formatted yet
335    #[error("Database is already formatted yet")]
336    AlreadyFormatted,
337    /// Storage backend has canceled a writing request
338    #[error("Storage backend has canceled a writing request")]
339    WriteRequestCancelled,
340    /// Storage item write error
341    #[error("Storage item write error")]
342    StorageItemWriteError {
343        /// Low-level error
344        #[from]
345        error: io::Error,
346    },
347}
348
349#[derive(Debug, Copy, Clone)]
350struct ForkTip {
351    number: BlockNumber,
352    root: BlockRoot,
353}
354
355enum FullBlock<'a, Block>
356where
357    Block: GenericOwnedBlock,
358{
359    InMemory(&'a Block),
360    Persisted {
361        header: &'a Block::Header,
362        write_location: WriteLocation,
363    },
364}
365
366#[derive(Debug)]
367struct BeaconChainBlockDetails {
368    // Shard segment roots, only present for beacon chain blocks
369    shard_segment_roots: StdArc<[ShardSegmentRoot]>,
370}
371
372impl BeaconChainBlockDetails {
373    fn from_body(body: &BeaconChainBody<'_>) -> Self {
374        let shard_segment_roots = body
375            .intermediate_shard_blocks()
376            .iter()
377            .flat_map(|intermediate_shard_block_info| {
378                let own_segments = intermediate_shard_block_info
379                    .own_segments
380                    .into_iter()
381                    .flat_map({
382                        let shard_index = intermediate_shard_block_info.header.prefix.shard_index;
383
384                        move |own_segments| {
385                            (own_segments.first_local_segment_index..)
386                                .zip(own_segments.segment_roots)
387                                .map(move |(segment_index, &segment_root)| ShardSegmentRoot {
388                                    shard_index,
389                                    segment_index,
390                                    segment_root,
391                                })
392                        }
393                    });
394                let child_shard_segment_roots = intermediate_shard_block_info
395                    .leaf_shards_segments()
396                    .flat_map(move |(shard_index, own_segments)| {
397                        (own_segments.first_local_segment_index..)
398                            .zip(own_segments.segment_roots)
399                            .map(move |(segment_index, &segment_root)| ShardSegmentRoot {
400                                shard_index,
401                                segment_index,
402                                segment_root,
403                            })
404                    });
405
406                own_segments.chain(child_shard_segment_roots)
407            })
408            .collect();
409
410        Self {
411            shard_segment_roots,
412        }
413    }
414}
415
416/// Client database block contains details about the block state in the database.
417///
418/// Originally all blocks are stored in memory. Once a block is soft-confirmed (see
419/// [`ClientDatabaseOptions::soft_confirmation_depth`]), it is persisted (likely on disk). Later
420///  when it is "confirmed" fully (see [`ClientDatabaseOptions::soft_confirmation_depth`]), it
421/// becomes irreversible.
422#[derive(Debug)]
423enum ClientDatabaseBlock<Block>
424where
425    Block: GenericOwnedBlock,
426{
427    /// Block is stored in memory and wasn't persisted yet
428    InMemory {
429        block: Block,
430        block_details: BlockDetails,
431        /// Only present for beacon chain blocks
432        beacon_chain_block_details: Option<BeaconChainBlockDetails>,
433    },
434    /// Block was persisted (likely on disk)
435    Persisted {
436        header: Block::Header,
437        block_details: BlockDetails,
438        /// Only present for beacon chain blocks
439        beacon_chain_block_details: Option<BeaconChainBlockDetails>,
440        write_location: WriteLocation,
441    },
442    /// Block was persisted (likely on disk) and is irreversibly "confirmed" from the consensus
443    /// perspective
444    PersistedConfirmed {
445        header: Block::Header,
446        /// Only present for beacon chain blocks
447        beacon_chain_block_details: Option<BeaconChainBlockDetails>,
448        write_location: WriteLocation,
449    },
450}
451
452impl<Block> ClientDatabaseBlock<Block>
453where
454    Block: GenericOwnedBlock,
455{
456    #[inline(always)]
457    fn header(&self) -> &Block::Header {
458        match self {
459            Self::InMemory { block, .. } => block.header(),
460            Self::Persisted { header, .. } | Self::PersistedConfirmed { header, .. } => header,
461        }
462    }
463
464    #[inline(always)]
465    fn full_block(&self) -> FullBlock<'_, Block> {
466        match self {
467            Self::InMemory { block, .. } => FullBlock::InMemory(block),
468            Self::Persisted {
469                header,
470                write_location,
471                ..
472            }
473            | Self::PersistedConfirmed {
474                header,
475                write_location,
476                ..
477            } => FullBlock::Persisted {
478                header,
479                write_location: *write_location,
480            },
481        }
482    }
483
484    #[inline(always)]
485    fn block_details(&self) -> Option<&BlockDetails> {
486        match self {
487            Self::InMemory { block_details, .. } | Self::Persisted { block_details, .. } => {
488                Some(block_details)
489            }
490            Self::PersistedConfirmed { .. } => None,
491        }
492    }
493
494    #[inline(always)]
495    fn beacon_chain_block_details(&self) -> Option<&BeaconChainBlockDetails> {
496        match self {
497            Self::InMemory {
498                beacon_chain_block_details,
499                ..
500            }
501            | Self::Persisted {
502                beacon_chain_block_details,
503                ..
504            }
505            | Self::PersistedConfirmed {
506                beacon_chain_block_details,
507                ..
508            } => beacon_chain_block_details.as_ref(),
509        }
510    }
511}
512
513#[derive(Debug)]
514struct StateData<Block>
515where
516    Block: GenericOwnedBlock,
517{
518    /// Tips of forks that have no descendants.
519    ///
520    /// The current best block is at the front, the rest are in the order from most recently
521    /// updated towards the front to least recently at the back.
522    fork_tips: VecDeque<ForkTip>,
523    /// Map from block root to block number.
524    ///
525    /// Is meant to be used in conjunction with `headers` and `blocks` fields, which are indexed by
526    /// block numbers.
527    block_roots: HashMap<BlockRoot, BlockNumber, BuildHasherDefault<BlockRootHasher>>,
528    /// List of blocks with the newest at the front.
529    ///
530    /// The first element of the first entry corresponds to the best block.
531    ///
532    /// It is expected that in most block numbers there will be exactly one block, some two,
533    /// anything more than that will be very rare. The list of forks for a block number is
534    /// organized in such a way that the first entry at every block number corresponds to the
535    /// canonical version of the blockchain at any point in time.
536    ///
537    /// A position withing this data structure is called "block offset". This is an ephemeral value
538    /// and changes as new best blocks are added. Blocks at the same height are collectively called
539    /// "block forks" and the position of the block within the same block height is called
540    /// "fork offset". While fork offset `0` always corresponds to the canonical version of the
541    /// blockchain, other offsets are not guaranteed to follow any particular ordering rules.
542    blocks: VecDeque<SmallVec<[ClientDatabaseBlock<Block>; 2]>>,
543}
544
545#[derive(Debug)]
546struct SegmentHeadersCache {
547    segment_headers_cache: Vec<SegmentHeader>,
548}
549
550impl SegmentHeadersCache {
551    #[inline(always)]
552    fn last_segment_header(&self) -> Option<SegmentHeader> {
553        self.segment_headers_cache.last().copied()
554    }
555
556    #[inline(always)]
557    fn max_local_segment_index(&self) -> Option<LocalSegmentIndex> {
558        self.segment_headers_cache
559            .last()
560            .map(|segment_header| segment_header.index.as_inner())
561    }
562
563    #[inline(always)]
564    fn get_segment_header(&self, local_segment_index: LocalSegmentIndex) -> Option<SegmentHeader> {
565        self.segment_headers_cache
566            .get(u64::from(local_segment_index) as usize)
567            .copied()
568    }
569
570    /// Returns actually added segments (some might have been skipped)
571    fn add_segment_headers(
572        &mut self,
573        mut segment_headers: Vec<SegmentHeader>,
574    ) -> Result<Vec<SegmentHeader>, PersistSegmentHeadersError> {
575        self.segment_headers_cache.reserve(segment_headers.len());
576
577        let mut maybe_last_local_segment_index = self.max_local_segment_index();
578
579        if let Some(last_segment_index) = maybe_last_local_segment_index {
580            // Skip already stored segment headers
581            segment_headers
582                .retain(|segment_header| segment_header.index.as_inner() > last_segment_index);
583        }
584
585        // Check all input segment headers to see which ones are not stored yet and verifying that
586        // segment indices are monotonically increasing
587        for segment_header in segment_headers.iter().copied() {
588            let local_segment_index = segment_header.index.as_inner();
589            if let Some(last_local_segment_index) = maybe_last_local_segment_index {
590                if local_segment_index != last_local_segment_index + LocalSegmentIndex::ONE {
591                    return Err(PersistSegmentHeadersError::MustFollowLastSegmentIndex {
592                        local_segment_index,
593                        last_local_segment_index,
594                    });
595                }
596
597                self.segment_headers_cache.push(segment_header);
598                maybe_last_local_segment_index.replace(local_segment_index);
599            } else {
600                if local_segment_index != LocalSegmentIndex::ZERO {
601                    return Err(PersistSegmentHeadersError::FirstSegmentIndexZero {
602                        local_segment_index,
603                    });
604                }
605
606                self.segment_headers_cache.push(segment_header);
607                maybe_last_local_segment_index.replace(local_segment_index);
608            }
609        }
610
611        Ok(segment_headers)
612    }
613}
614
615#[derive(Debug)]
616struct SuperSegmentHeadersCache {
617    super_segment_headers_cache: Vec<SuperSegmentHeader>,
618}
619
620impl SuperSegmentHeadersCache {
621    #[inline(always)]
622    fn last_super_segment_header(&self) -> Option<SuperSegmentHeader> {
623        self.super_segment_headers_cache.last().copied()
624    }
625
626    #[inline]
627    fn previous_super_segment_header(
628        &self,
629        target_block_number: BlockNumber,
630    ) -> Option<SuperSegmentHeader> {
631        let block_number = target_block_number.checked_sub(BlockNumber::ONE)?;
632        let index = match self.super_segment_headers_cache.binary_search_by_key(
633            &block_number,
634            |super_segment_header| {
635                super_segment_header
636                    .target_beacon_chain_block_number
637                    .as_inner()
638            },
639        ) {
640            Ok(found_index) => found_index,
641            Err(insert_index) => insert_index.checked_sub(1)?,
642        };
643
644        self.super_segment_headers_cache.get(index).copied()
645    }
646
647    #[inline(always)]
648    fn get_super_segment_header(
649        &self,
650        local_segment_index: SuperSegmentIndex,
651    ) -> Option<SuperSegmentHeader> {
652        self.super_segment_headers_cache
653            .get(u64::from(local_segment_index) as usize)
654            .copied()
655    }
656
657    #[inline(always)]
658    fn get_super_segment_header_for_segment_index(
659        &self,
660        segment_index: SegmentIndex,
661    ) -> Option<SuperSegmentHeader> {
662        let index = self
663            .super_segment_headers_cache
664            .binary_search_by_key(&segment_index, |super_segment_header| {
665                super_segment_header.max_segment_index.as_inner()
666            })
667            .unwrap_or_else(|insert_index| insert_index);
668
669        let super_segment_header = self.super_segment_headers_cache.get(index).copied()?;
670
671        let max_segment_index = super_segment_header.max_segment_index.as_inner();
672        let first_segment_index = max_segment_index
673            - SegmentIndex::from(u64::from(super_segment_header.num_segments))
674            + SegmentIndex::ONE;
675
676        (first_segment_index..=max_segment_index)
677            .contains(&segment_index)
678            .then_some(super_segment_header)
679    }
680
681    /// Returns actually added super segments (some might have been skipped)
682    fn add_super_segment_headers(
683        &mut self,
684        mut super_segment_headers: Vec<SuperSegmentHeader>,
685    ) -> Result<Vec<SuperSegmentHeader>, PersistSuperSegmentHeadersError> {
686        self.super_segment_headers_cache
687            .reserve(super_segment_headers.len());
688
689        let mut maybe_last_super_segment_index = self
690            .super_segment_headers_cache
691            .last()
692            .map(|header| header.index.as_inner());
693
694        if let Some(last_super_segment_index) = maybe_last_super_segment_index {
695            // Skip already stored super segment headers
696            super_segment_headers.retain(|super_segment_header| {
697                super_segment_header.index.as_inner() > last_super_segment_index
698            });
699        }
700
701        // Check all input super segment headers to see which ones are not stored yet and verifying
702        // that super segment indices are monotonically increasing
703        for super_segment_header in super_segment_headers.iter().copied() {
704            let super_segment_index = super_segment_header.index.as_inner();
705            if let Some(last_super_segment_index) = maybe_last_super_segment_index {
706                if super_segment_index != last_super_segment_index + SuperSegmentIndex::ONE {
707                    return Err(
708                        PersistSuperSegmentHeadersError::MustFollowLastSegmentIndex {
709                            super_segment_index,
710                            last_super_segment_index,
711                        },
712                    );
713                }
714
715                self.super_segment_headers_cache.push(super_segment_header);
716                maybe_last_super_segment_index.replace(super_segment_index);
717            } else {
718                if super_segment_index != SuperSegmentIndex::ZERO {
719                    return Err(PersistSuperSegmentHeadersError::FirstSegmentIndexZero {
720                        super_segment_index,
721                    });
722                }
723
724                self.super_segment_headers_cache.push(super_segment_header);
725                maybe_last_super_segment_index.replace(super_segment_index);
726            }
727        }
728
729        Ok(super_segment_headers)
730    }
731}
732
733// TODO: Hide implementation details
734#[derive(Debug)]
735struct State<Block, StorageBackend>
736where
737    Block: GenericOwnedBlock,
738{
739    data: StateData<Block>,
740    segment_headers_cache: SegmentHeadersCache,
741    super_segment_headers_cache: SuperSegmentHeadersCache,
742    storage_backend_adapter: AsyncRwLock<StorageBackendAdapter<StorageBackend>>,
743}
744
745impl<Block, StorageBackend> State<Block, StorageBackend>
746where
747    Block: GenericOwnedBlock,
748{
749    #[inline(always)]
750    fn best_tip(&self) -> &ForkTip {
751        self.data
752            .fork_tips
753            .front()
754            .expect("The best block is always present; qed")
755    }
756
757    #[inline(always)]
758    fn best_block(&self) -> &ClientDatabaseBlock<Block> {
759        self.data
760            .blocks
761            .front()
762            .expect("The best block is always present; qed")
763            .first()
764            .expect("The best block is always present; qed")
765    }
766}
767
768#[derive(Debug)]
769struct BlockToPersist<'a, Block>
770where
771    Block: GenericOwnedBlock,
772{
773    block_offset: usize,
774    fork_offset: usize,
775    block: &'a Block,
776    block_details: &'a BlockDetails,
777}
778
779#[derive(Debug)]
780struct PersistedBlock {
781    block_offset: usize,
782    fork_offset: usize,
783    write_location: WriteLocation,
784}
785
786#[derive(Debug)]
787struct ClientDatabaseInnerOptions {
788    block_confirmation_depth: BlockNumber,
789    soft_confirmation_depth: BlockNumber,
790    max_fork_tips: NonZeroUsize,
791    max_fork_tip_distance: BlockNumber,
792}
793
794#[derive(Debug)]
795struct Inner<Block, StorageBackend>
796where
797    Block: GenericOwnedBlock,
798{
799    state: AsyncRwLock<State<Block, StorageBackend>>,
800    options: ClientDatabaseInnerOptions,
801}
802
803/// Client database
804#[derive(Debug)]
805pub struct ClientDatabase<Block, StorageBackend>
806where
807    Block: GenericOwnedBlock,
808{
809    inner: Arc<Inner<Block, StorageBackend>>,
810}
811
812impl<Block, StorageBackend> Clone for ClientDatabase<Block, StorageBackend>
813where
814    Block: GenericOwnedBlock,
815{
816    fn clone(&self) -> Self {
817        Self {
818            inner: self.inner.clone(),
819        }
820    }
821}
822
823impl<Block, StorageBackend> Drop for ClientDatabase<Block, StorageBackend>
824where
825    Block: GenericOwnedBlock,
826{
827    fn drop(&mut self) {
828        // TODO: Persist things that were not persisted yet to reduce the data loss on shutdown
829    }
830}
831
832impl<Block, StorageBackend> ChainInfo<Block> for ClientDatabase<Block, StorageBackend>
833where
834    Block: GenericOwnedBlock,
835    StorageBackend: ClientDatabaseStorageBackend,
836{
837    #[inline]
838    fn best_root(&self) -> BlockRoot {
839        // Blocking read lock is fine because where a write lock is only taken for a short time and
840        // most locks are read locks
841        self.inner.state.read_blocking().best_tip().root
842    }
843
844    #[inline]
845    fn best_header(&self) -> Block::Header {
846        // Blocking read lock is fine because where a write lock is only taken for a short time and
847        // most locks are read locks
848        self.inner
849            .state
850            .read_blocking()
851            .best_block()
852            .header()
853            .clone()
854    }
855
856    #[inline]
857    fn best_header_with_details(&self) -> (Block::Header, BlockDetails) {
858        // Blocking read lock is fine because where a write lock is only taken for a short time and
859        // most locks are read locks
860        let state = self.inner.state.read_blocking();
861        let best_block = state.best_block();
862        (
863            best_block.header().clone(),
864            best_block
865                .block_details()
866                .expect("Always present for the best block; qed")
867                .clone(),
868        )
869    }
870
871    // TODO: Add fast path when `descendant_block_root` is the best block
872    #[inline]
873    fn ancestor_header(
874        &self,
875        ancestor_block_number: BlockNumber,
876        descendant_block_root: &BlockRoot,
877    ) -> Option<Block::Header> {
878        // Blocking read lock is fine because where a write lock is only taken for a short time and
879        // most locks are read locks
880        let state = self.inner.state.read_blocking();
881        let best_number = state.best_tip().number;
882
883        let ancestor_block_offset =
884            u64::from(best_number.checked_sub(ancestor_block_number)?) as usize;
885        let ancestor_block_candidates = state.data.blocks.get(ancestor_block_offset)?;
886
887        let descendant_block_number = *state.data.block_roots.get(descendant_block_root)?;
888        if ancestor_block_number > descendant_block_number {
889            return None;
890        }
891        let descendant_block_offset =
892            u64::from(best_number.checked_sub(descendant_block_number)?) as usize;
893
894        // Range of blocks where the first item is expected to contain a descendant
895        let mut blocks_range_iter = state
896            .data
897            .blocks
898            .iter()
899            .enumerate()
900            .skip(descendant_block_offset);
901
902        let (_offset, descendant_block_candidates) = blocks_range_iter.next()?;
903        let descendant_header = descendant_block_candidates
904            .iter()
905            .find(|block| &*block.header().header().root() == descendant_block_root)?
906            .header()
907            .header();
908
909        // If there are no forks at this level, then this is the canonical chain and ancestor
910        // block number we're looking for is the first block at the corresponding block number.
911        // Similarly, if there is just a single ancestor candidate and descendant exists, it must be
912        // the one we care about.
913        if descendant_block_candidates.len() == 1 || ancestor_block_candidates.len() == 1 {
914            return ancestor_block_candidates
915                .iter()
916                .next()
917                .map(|block| block.header().clone());
918        }
919
920        let mut parent_block_root = &descendant_header.prefix.parent_root;
921
922        // Iterate over the blocks following descendant until ancestor is reached
923        for (block_offset, parent_candidates) in blocks_range_iter {
924            let parent_header = parent_candidates
925                .iter()
926                .find(|header| &*header.header().header().root() == parent_block_root)?
927                .header();
928
929            // When header offset matches, we found the header
930            if block_offset == ancestor_block_offset {
931                return Some(parent_header.clone());
932            }
933
934            parent_block_root = &parent_header.header().prefix.parent_root;
935        }
936
937        None
938    }
939
940    #[inline]
941    fn header(&self, block_root: &BlockRoot) -> Option<Block::Header> {
942        // Blocking read lock is fine because where a write lock is only taken for a short time and
943        // most locks are read locks
944        let state = self.inner.state.read_blocking();
945        let best_number = state.best_tip().number;
946
947        let block_number = *state.data.block_roots.get(block_root)?;
948        let block_offset = u64::from(best_number.checked_sub(block_number)?) as usize;
949        let block_candidates = state.data.blocks.get(block_offset)?;
950
951        block_candidates.iter().find_map(|block| {
952            let header = block.header();
953
954            if &*header.header().root() == block_root {
955                Some(header.clone())
956            } else {
957                None
958            }
959        })
960    }
961
962    #[inline]
963    fn header_with_details(&self, block_root: &BlockRoot) -> Option<(Block::Header, BlockDetails)> {
964        // Blocking read lock is fine because where a write lock is only taken for a short time and
965        // most locks are read locks
966        let state = self.inner.state.read_blocking();
967        let best_number = state.best_tip().number;
968
969        let block_number = *state.data.block_roots.get(block_root)?;
970        let block_offset = u64::from(best_number.checked_sub(block_number)?) as usize;
971        let block_candidates = state.data.blocks.get(block_offset)?;
972
973        block_candidates.iter().find_map(|block| {
974            let header = block.header();
975            let block_details = block.block_details().cloned()?;
976
977            if &*header.header().root() == block_root {
978                Some((header.clone(), block_details))
979            } else {
980                None
981            }
982        })
983    }
984
985    #[inline]
986    async fn block(&self, block_root: &BlockRoot) -> Result<Block, ReadBlockError> {
987        let state = self.inner.state.read().await;
988        let best_number = state.best_tip().number;
989
990        let block_number = *state
991            .data
992            .block_roots
993            .get(block_root)
994            .ok_or(ReadBlockError::UnknownBlockRoot)?;
995        let block_offset = u64::from(
996            best_number
997                .checked_sub(block_number)
998                .expect("Known block roots always have valid block offset; qed"),
999        ) as usize;
1000        let block_candidates = state
1001            .data
1002            .blocks
1003            .get(block_offset)
1004            .expect("Valid block offsets always have block entries; qed");
1005
1006        for block_candidate in block_candidates {
1007            let header = block_candidate.header();
1008
1009            if &*header.header().root() == block_root {
1010                return match block_candidate.full_block() {
1011                    FullBlock::InMemory(block) => Ok(block.clone()),
1012                    FullBlock::Persisted {
1013                        header,
1014                        write_location,
1015                    } => {
1016                        let storage_backend_adapter = state.storage_backend_adapter.read().await;
1017
1018                        let storage_item = storage_backend_adapter
1019                            .read_storage_item::<StorageItemTemporary>(write_location)
1020                            .await?;
1021
1022                        let storage_item_block = match storage_item {
1023                            StorageItemTemporary::Block(storage_item_block) => storage_item_block,
1024                            StorageItemTemporary::SegmentHeaders(_) => {
1025                                return Err(ReadBlockError::StorageItemReadError {
1026                                    error: io::Error::other(
1027                                        "Unexpected storage item: `SegmentHeaders`",
1028                                    ),
1029                                });
1030                            }
1031                            StorageItemTemporary::SuperSegmentHeaders(_) => {
1032                                return Err(ReadBlockError::StorageItemReadError {
1033                                    error: io::Error::other(
1034                                        "Unexpected storage item: `SuperSegmentHeaders`",
1035                                    ),
1036                                });
1037                            }
1038                        };
1039
1040                        let StorageItemTemporaryBlock {
1041                            header: _,
1042                            body,
1043                            mmr_with_block: _,
1044                            system_contract_states: _,
1045                        } = storage_item_block;
1046
1047                        Block::from_buffers(header.buffer().clone(), body)
1048                            .ok_or(ReadBlockError::FailedToDecode)
1049                    }
1050                };
1051            }
1052        }
1053
1054        unreachable!("Known block root always has block candidate associated with it; qed")
1055    }
1056
1057    #[inline]
1058    fn last_segment_header(&self) -> Option<SegmentHeader> {
1059        // Blocking read lock is fine because where a write lock is only taken for a short time and
1060        // most locks are read locks
1061        let state = self.inner.state.read_blocking();
1062        state.segment_headers_cache.last_segment_header()
1063    }
1064
1065    #[inline]
1066    fn get_segment_header(&self, segment_index: LocalSegmentIndex) -> Option<SegmentHeader> {
1067        // Blocking read lock is fine because where a write lock is only taken for a short time and
1068        // most locks are read locks
1069        let state = self.inner.state.read_blocking();
1070
1071        state
1072            .segment_headers_cache
1073            .get_segment_header(segment_index)
1074    }
1075
1076    fn segment_headers_for_block(&self, block_number: BlockNumber) -> Vec<SegmentHeader> {
1077        // Blocking read lock is fine because where a write lock is only taken for a short time and
1078        // most locks are read locks
1079        let state = self.inner.state.read_blocking();
1080
1081        let Some(last_local_segment_index) = state.segment_headers_cache.max_local_segment_index()
1082        else {
1083            // Not initialized
1084            return Vec::new();
1085        };
1086
1087        // Special case for the initial segment (for beacon chain genesis block)
1088        if Block::Block::SHARD_KIND == RealShardKind::BeaconChain
1089            && block_number == BlockNumber::ONE
1090        {
1091            // If there is a segment index present, and we store monotonically increasing segment
1092            // headers, then the first header exists
1093            return vec![
1094                state
1095                    .segment_headers_cache
1096                    .get_segment_header(LocalSegmentIndex::ZERO)
1097                    .expect("Segment headers are stored in monotonically increasing order; qed"),
1098            ];
1099        }
1100
1101        if last_local_segment_index == LocalSegmentIndex::ZERO {
1102            // Genesis segment already included in block #1
1103            return Vec::new();
1104        }
1105
1106        let mut current_local_segment_index = last_local_segment_index;
1107        loop {
1108            // If the current segment index present, and we store monotonically increasing segment
1109            // headers, then the current segment header exists as well.
1110            let current_segment_header = state
1111                .segment_headers_cache
1112                .get_segment_header(current_local_segment_index)
1113                .expect("Segment headers are stored in monotonically increasing order; qed");
1114
1115            // The block immediately after the archived segment adding the confirmation depth
1116            let target_block_number = current_segment_header.last_archived_block.number()
1117                + BlockNumber::ONE
1118                + self.inner.options.block_confirmation_depth;
1119            if target_block_number == block_number {
1120                let mut headers_for_block = vec![current_segment_header];
1121
1122                // Check block spanning multiple segments
1123                let last_archived_block_number = current_segment_header.last_archived_block.number;
1124                let mut local_segment_index = current_local_segment_index - LocalSegmentIndex::ONE;
1125
1126                while let Some(segment_header) = state
1127                    .segment_headers_cache
1128                    .get_segment_header(local_segment_index)
1129                {
1130                    if segment_header.last_archived_block.number == last_archived_block_number {
1131                        headers_for_block.insert(0, segment_header);
1132                        local_segment_index -= LocalSegmentIndex::ONE;
1133                    } else {
1134                        break;
1135                    }
1136                }
1137
1138                return headers_for_block;
1139            }
1140
1141            // iterate segments further
1142            if target_block_number > block_number {
1143                // no need to check the initial segment
1144                if current_local_segment_index > LocalSegmentIndex::ONE {
1145                    current_local_segment_index -= LocalSegmentIndex::ONE;
1146                } else {
1147                    break;
1148                }
1149            } else {
1150                // No segment headers required
1151                return Vec::new();
1152            }
1153        }
1154
1155        // No segment headers required
1156        Vec::new()
1157    }
1158}
1159
1160impl<Block, StorageBackend> ChainInfoWrite<Block> for ClientDatabase<Block, StorageBackend>
1161where
1162    Block: GenericOwnedBlock,
1163    StorageBackend: ClientDatabaseStorageBackend,
1164{
1165    async fn persist_block(
1166        &self,
1167        block: Block,
1168        block_details: BlockDetails,
1169    ) -> Result<(), PersistBlockError> {
1170        let mut state = self.inner.state.write().await;
1171        let best_number = state.best_tip().number;
1172
1173        let header = block.header().header();
1174
1175        let block_number = header.prefix.number;
1176
1177        if best_number == BlockNumber::ZERO && block_number != BlockNumber::ONE {
1178            // Special case when syncing on top of the fresh database
1179            Self::insert_first_block(&mut state.data, block, block_details);
1180
1181            return Ok(());
1182        }
1183
1184        if block_number == best_number + BlockNumber::ONE {
1185            return Self::insert_new_best_block(state, &self.inner, block, block_details).await;
1186        }
1187
1188        let block_offset = u64::from(
1189            best_number
1190                .checked_sub(block_number)
1191                .ok_or(PersistBlockError::MissingParent)?,
1192        ) as usize;
1193
1194        if block_offset >= u64::from(self.inner.options.block_confirmation_depth) as usize {
1195            return Err(PersistBlockError::OutsideAcceptableRange);
1196        }
1197
1198        let state = &mut *state;
1199
1200        let block_forks = state.data.blocks.get_mut(block_offset).ok_or_else(|| {
1201            error!(
1202                %block_number,
1203                %block_offset,
1204                "Failed to store block fork, header offset is missing despite being within \
1205                acceptable range"
1206            );
1207
1208            PersistBlockError::OutsideAcceptableRange
1209        })?;
1210
1211        for (index, fork_tip) in state.data.fork_tips.iter_mut().enumerate() {
1212            // Block's parent is no longer a fork tip, remove it
1213            if fork_tip.root == header.prefix.parent_root {
1214                state.data.fork_tips.remove(index);
1215                break;
1216            }
1217        }
1218
1219        let block_root = *header.root();
1220        // Insert at position 1, which means the most recent tip, which doesn't correspond to
1221        // the best block
1222        state.data.fork_tips.insert(
1223            1,
1224            ForkTip {
1225                number: block_number,
1226                root: block_root,
1227            },
1228        );
1229        state.data.block_roots.insert(block_root, block_number);
1230        let beacon_chain_block_details = <dyn Any>::downcast_ref::<OwnedBeaconChainBlock>(&block)
1231            .map(|block| BeaconChainBlockDetails::from_body(block.body.body()));
1232        block_forks.push(ClientDatabaseBlock::InMemory {
1233            block,
1234            block_details,
1235            beacon_chain_block_details,
1236        });
1237
1238        Self::prune_outdated_fork_tips(block_number, &mut state.data, &self.inner.options);
1239
1240        Ok(())
1241    }
1242
1243    async fn persist_segment_headers(
1244        &self,
1245        segment_headers: Vec<SegmentHeader>,
1246    ) -> Result<(), PersistSegmentHeadersError> {
1247        let mut state = self.inner.state.write().await;
1248
1249        let added_segment_headers = state
1250            .segment_headers_cache
1251            .add_segment_headers(segment_headers)?;
1252
1253        if added_segment_headers.is_empty() {
1254            return Ok(());
1255        }
1256
1257        // Convert write lock into upgradable read lock to allow reads, while preventing segment
1258        // headers modifications
1259        // TODO: This assumes both guarantees in https://github.com/smol-rs/async-lock/issues/100
1260        //  are satisfied. If not, blocking read locks in other places will cause issues.
1261        let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1262
1263        let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1264
1265        storage_backend_adapter
1266            .write_storage_item(StorageItemTemporary::SegmentHeaders(
1267                StorageItemTemporarySegmentHeaders {
1268                    segment_headers: added_segment_headers,
1269                },
1270            ))
1271            .await?;
1272
1273        Ok(())
1274    }
1275}
1276
1277impl<StorageBackend> BeaconChainInfo for ClientDatabase<OwnedBeaconChainBlock, StorageBackend>
1278where
1279    StorageBackend: ClientDatabaseStorageBackend,
1280{
1281    fn shard_segment_roots(
1282        &self,
1283        block_number: BlockNumber,
1284    ) -> Result<StdArc<[ShardSegmentRoot]>, ShardSegmentRootsError> {
1285        // Blocking read lock is fine because where a write lock is only taken for a short time and
1286        // most locks are read locks
1287        let state = self.inner.state.read_blocking();
1288        let best_number = state.best_tip().number;
1289
1290        let block_offset = u64::from(
1291            best_number
1292                .checked_sub(block_number)
1293                .ok_or(ShardSegmentRootsError::BlockMissing { block_number })?,
1294        ) as usize;
1295
1296        let block = state
1297            .data
1298            .blocks
1299            .get(block_offset)
1300            .ok_or(ShardSegmentRootsError::BlockMissing { block_number })?
1301            .first()
1302            .expect("There is always at least one block candidate; qed");
1303
1304        Ok(StdArc::clone(
1305            &block
1306                .beacon_chain_block_details()
1307                .as_ref()
1308                .expect("Always present in the beacon chain block; qed")
1309                .shard_segment_roots,
1310        ))
1311    }
1312
1313    #[inline]
1314    fn last_super_segment_header(&self) -> Option<SuperSegmentHeader> {
1315        // Blocking read lock is fine because where a write lock is only taken for a short time and
1316        // most locks are read locks
1317        let state = self.inner.state.read_blocking();
1318        state
1319            .super_segment_headers_cache
1320            .last_super_segment_header()
1321    }
1322
1323    #[inline]
1324    fn previous_super_segment_header(
1325        &self,
1326        block_number: BlockNumber,
1327    ) -> Option<SuperSegmentHeader> {
1328        // Blocking read lock is fine because where a write lock is only taken for a short time and
1329        // most locks are read locks
1330        let state = self.inner.state.read_blocking();
1331
1332        state
1333            .super_segment_headers_cache
1334            .previous_super_segment_header(block_number)
1335    }
1336
1337    #[inline]
1338    fn get_super_segment_header(
1339        &self,
1340        super_segment_index: SuperSegmentIndex,
1341    ) -> Option<SuperSegmentHeader> {
1342        // Blocking read lock is fine because where a write lock is only taken for a short time and
1343        // most locks are read locks
1344        let state = self.inner.state.read_blocking();
1345
1346        state
1347            .super_segment_headers_cache
1348            .get_super_segment_header(super_segment_index)
1349    }
1350
1351    fn get_super_segment_header_for_segment_index(
1352        &self,
1353        segment_index: SegmentIndex,
1354    ) -> Option<SuperSegmentHeader> {
1355        // Blocking read lock is fine because where a write lock is only taken for a short time and
1356        // most locks are read locks
1357        let state = self.inner.state.read_blocking();
1358
1359        state
1360            .super_segment_headers_cache
1361            .get_super_segment_header_for_segment_index(segment_index)
1362    }
1363}
1364
1365impl<StorageBackend> BeaconChainInfoWrite for ClientDatabase<OwnedBeaconChainBlock, StorageBackend>
1366where
1367    StorageBackend: ClientDatabaseStorageBackend,
1368{
1369    async fn persist_super_segment_header(
1370        &self,
1371        super_segment_header: SuperSegmentHeader,
1372    ) -> Result<bool, PersistSuperSegmentHeadersError> {
1373        let mut state = self.inner.state.write().await;
1374
1375        let added_super_segment_headers = state
1376            .super_segment_headers_cache
1377            .add_super_segment_headers(vec![super_segment_header])?;
1378
1379        if added_super_segment_headers.is_empty() {
1380            return Ok(false);
1381        }
1382
1383        // Convert write lock into upgradable read lock to allow reads, while preventing super
1384        // segment headers modifications
1385        // TODO: This assumes both guarantees in https://github.com/smol-rs/async-lock/issues/100
1386        //  are satisfied. If not, blocking read locks in other places will cause issues.
1387        let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1388
1389        let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1390
1391        storage_backend_adapter
1392            .write_storage_item(StorageItemTemporary::SuperSegmentHeaders(
1393                StorageItemTemporarySuperSegmentHeaders {
1394                    super_segment_headers: added_super_segment_headers,
1395                },
1396            ))
1397            .await?;
1398
1399        Ok(true)
1400    }
1401
1402    async fn persist_super_segment_headers(
1403        &self,
1404        super_segment_headers: Vec<SuperSegmentHeader>,
1405    ) -> Result<(), PersistSuperSegmentHeadersError> {
1406        let mut state = self.inner.state.write().await;
1407
1408        let added_super_segment_headers = state
1409            .super_segment_headers_cache
1410            .add_super_segment_headers(super_segment_headers)?;
1411
1412        if added_super_segment_headers.is_empty() {
1413            return Ok(());
1414        }
1415
1416        // Convert write lock into upgradable read lock to allow reads, while preventing super
1417        // segment headers modifications
1418        // TODO: This assumes both guarantees in https://github.com/smol-rs/async-lock/issues/100
1419        //  are satisfied. If not, blocking read locks in other places will cause issues.
1420        let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1421
1422        let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1423
1424        storage_backend_adapter
1425            .write_storage_item(StorageItemTemporary::SuperSegmentHeaders(
1426                StorageItemTemporarySuperSegmentHeaders {
1427                    super_segment_headers: added_super_segment_headers,
1428                },
1429            ))
1430            .await?;
1431
1432        Ok(())
1433    }
1434}
1435
1436impl<Block, StorageBackend> ClientDatabase<Block, StorageBackend>
1437where
1438    Block: GenericOwnedBlock,
1439    StorageBackend: ClientDatabaseStorageBackend,
1440{
1441    /// Open the existing database.
1442    ///
1443    /// NOTE: The database needs to be formatted with [`Self::format()`] before it can be used.
1444    pub async fn open<GBB>(
1445        options: ClientDatabaseOptions<GBB, StorageBackend>,
1446    ) -> Result<Self, ClientDatabaseError>
1447    where
1448        GBB: FnOnce() -> GenesisBlockBuilderResult<Block>,
1449    {
1450        let ClientDatabaseOptions {
1451            write_buffer_size,
1452            block_confirmation_depth,
1453            soft_confirmation_depth,
1454            max_fork_tips,
1455            max_fork_tip_distance,
1456            genesis_block_builder,
1457            storage_backend,
1458        } = options;
1459        if soft_confirmation_depth >= block_confirmation_depth {
1460            return Err(ClientDatabaseError::InvalidSoftConfirmationDepth);
1461        }
1462
1463        if max_fork_tip_distance > block_confirmation_depth {
1464            return Err(ClientDatabaseError::InvalidMaxForkTipDistance);
1465        }
1466
1467        let mut state_data = StateData {
1468            fork_tips: VecDeque::new(),
1469            block_roots: HashMap::default(),
1470            blocks: VecDeque::new(),
1471        };
1472        let mut segment_headers_cache = SegmentHeadersCache {
1473            segment_headers_cache: Vec::new(),
1474        };
1475        let mut super_segment_headers_cache = SuperSegmentHeadersCache {
1476            super_segment_headers_cache: Vec::new(),
1477        };
1478
1479        let options = ClientDatabaseInnerOptions {
1480            block_confirmation_depth,
1481            soft_confirmation_depth,
1482            max_fork_tips,
1483            max_fork_tip_distance,
1484        };
1485
1486        let storage_item_handlers = StorageItemHandlers {
1487            permanent: |_arg| {
1488                // TODO
1489                Ok(())
1490            },
1491            temporary: |arg| {
1492                let StorageItemHandlerArg {
1493                    storage_item,
1494                    page_offset,
1495                    num_pages,
1496                } = arg;
1497                let storage_item_block = match storage_item {
1498                    StorageItemTemporary::Block(storage_item_block) => storage_item_block,
1499                    StorageItemTemporary::SegmentHeaders(segment_headers) => {
1500                        let num_segment_headers = segment_headers.segment_headers.len();
1501                        return match segment_headers_cache
1502                            .add_segment_headers(segment_headers.segment_headers)
1503                        {
1504                            Ok(_) => Ok(()),
1505                            Err(error) => {
1506                                error!(
1507                                    %page_offset,
1508                                    %num_segment_headers,
1509                                    %error,
1510                                    "Failed to add segment headers from storage item"
1511                                );
1512
1513                                Err(ClientDatabaseError::InvalidSegmentHeaders { page_offset })
1514                            }
1515                        };
1516                    }
1517                    StorageItemTemporary::SuperSegmentHeaders(super_segment_headers) => {
1518                        let num_super_segment_headers =
1519                            super_segment_headers.super_segment_headers.len();
1520                        return match super_segment_headers_cache
1521                            .add_super_segment_headers(super_segment_headers.super_segment_headers)
1522                        {
1523                            Ok(_) => Ok(()),
1524                            Err(error) => {
1525                                error!(
1526                                    %page_offset,
1527                                    %num_super_segment_headers,
1528                                    %error,
1529                                    "Failed to add segment headers from storage item"
1530                                );
1531
1532                                Err(ClientDatabaseError::InvalidSegmentHeaders { page_offset })
1533                            }
1534                        };
1535                    }
1536                };
1537
1538                // TODO: It would be nice to not allocate body here since we'll not use it here
1539                //  anyway
1540                let StorageItemTemporaryBlock {
1541                    header,
1542                    body,
1543                    mmr_with_block,
1544                    system_contract_states,
1545                } = storage_item_block;
1546
1547                let header = Block::Header::from_buffer(header).map_err(|_buffer| {
1548                    error!(%page_offset, "Failed to decode block header from bytes");
1549
1550                    ClientDatabaseError::InvalidBlock { page_offset }
1551                })?;
1552                let body = Block::Body::from_buffer(body).map_err(|_buffer| {
1553                    error!(%page_offset, "Failed to decode block body from bytes");
1554
1555                    ClientDatabaseError::InvalidBlock { page_offset }
1556                })?;
1557
1558                let block_root = *header.header().root();
1559                let block_number = header.header().prefix.number;
1560
1561                state_data.block_roots.insert(block_root, block_number);
1562
1563                let maybe_best_number = state_data
1564                    .blocks
1565                    .front()
1566                    .and_then(|block_forks| block_forks.first())
1567                    .map(|best_block| {
1568                        // Type inference is not working here for some reason
1569                        let header: &Block::Header = best_block.header();
1570
1571                        header.header().prefix.number
1572                    });
1573
1574                let block_offset = if let Some(best_number) = maybe_best_number {
1575                    if block_number <= best_number {
1576                        u64::from(best_number - block_number) as usize
1577                    } else {
1578                        // The new best block must follow the previous best block
1579                        if block_number - best_number != BlockNumber::ONE {
1580                            error!(
1581                                %page_offset,
1582                                %best_number,
1583                                %block_number,
1584                                "Invalid new best block number, it must be only one block \
1585                                higher than the best block"
1586                            );
1587
1588                            return Err(ClientDatabaseError::InvalidBlock { page_offset });
1589                        }
1590
1591                        state_data.blocks.push_front(SmallVec::new());
1592                        // Will insert a new block at the front
1593                        0
1594                    }
1595                } else {
1596                    state_data.blocks.push_front(SmallVec::new());
1597                    // Will insert a new block at the front
1598                    0
1599                };
1600
1601                let Some(block_forks) = state_data.blocks.get_mut(block_offset) else {
1602                    // Ignore the older block, other blocks at its height were already pruned
1603                    // anyway
1604                    return Ok(());
1605                };
1606
1607                // Push a new block to the end of the list, we'll fix it up later
1608                let beacon_chain_block_details =
1609                    <dyn Any>::downcast_ref::<OwnedBeaconChainBody>(&body)
1610                        .map(|body| BeaconChainBlockDetails::from_body(body.body()));
1611                block_forks.push(ClientDatabaseBlock::Persisted {
1612                    header,
1613                    block_details: BlockDetails {
1614                        mmr_with_block,
1615                        system_contract_states,
1616                    },
1617                    beacon_chain_block_details,
1618                    write_location: WriteLocation {
1619                        page_offset,
1620                        num_pages,
1621                    },
1622                });
1623
1624                // If a new block was inserted, confirm a new canonical block to prune extra
1625                // in-memory information
1626                if block_offset == 0 && block_forks.len() == 1 {
1627                    Self::confirm_canonical_block(block_number, &mut state_data, &options);
1628                }
1629
1630                Ok(())
1631            },
1632        };
1633
1634        let storage_backend_adapter =
1635            StorageBackendAdapter::open(write_buffer_size, storage_item_handlers, storage_backend)
1636                .await?;
1637
1638        if let Some(best_block) = state_data.blocks.front().and_then(|block_forks| {
1639            // The best block is last in the list here because that is how it was inserted while
1640            // reading from the database
1641            block_forks.last()
1642        }) {
1643            // Type inference is not working here for some reason
1644            let header: &Block::Header = best_block.header();
1645            let header = header.header();
1646            let block_number = header.prefix.number;
1647            let block_root = *header.root();
1648
1649            if !Self::adjust_ancestor_block_forks(&mut state_data.blocks, block_root) {
1650                return Err(ClientDatabaseError::FailedToAdjustAncestorBlockForks);
1651            }
1652
1653            // Store the best block as the first and only fork tip
1654            state_data.fork_tips.push_front(ForkTip {
1655                number: block_number,
1656                root: block_root,
1657            });
1658        } else {
1659            let GenesisBlockBuilderResult {
1660                block,
1661                system_contract_states,
1662            } = genesis_block_builder();
1663
1664            // If the database is empty, initialize everything with the genesis block
1665            let header = block.header().header();
1666            let block_number = header.prefix.number;
1667            let block_root = *header.root();
1668
1669            state_data.fork_tips.push_front(ForkTip {
1670                number: block_number,
1671                root: block_root,
1672            });
1673            state_data.block_roots.insert(block_root, block_number);
1674            let beacon_chain_block_details =
1675                <dyn Any>::downcast_ref::<OwnedBeaconChainBlock>(&block)
1676                    .map(|block| BeaconChainBlockDetails::from_body(block.body.body()));
1677            state_data
1678                .blocks
1679                .push_front(smallvec![ClientDatabaseBlock::InMemory {
1680                    block,
1681                    block_details: BlockDetails {
1682                        system_contract_states,
1683                        mmr_with_block: Arc::new({
1684                            let mut mmr = BlockMerkleMountainRange::new();
1685                            mmr.add_leaf(&block_root);
1686                            mmr
1687                        })
1688                    },
1689                    beacon_chain_block_details,
1690                }]);
1691        }
1692
1693        let state = State {
1694            data: state_data,
1695            segment_headers_cache,
1696            super_segment_headers_cache,
1697            storage_backend_adapter: AsyncRwLock::new(storage_backend_adapter),
1698        };
1699
1700        let inner = Inner {
1701            state: AsyncRwLock::new(state),
1702            options,
1703        };
1704
1705        Ok(Self {
1706            inner: Arc::new(inner),
1707        })
1708    }
1709
1710    /// Format a new database
1711    pub async fn format(
1712        storage_backend: &StorageBackend,
1713        options: ClientDatabaseFormatOptions,
1714    ) -> Result<(), ClientDatabaseFormatError> {
1715        StorageBackendAdapter::format(storage_backend, options).await
1716    }
1717
1718    fn insert_first_block(state: &mut StateData<Block>, block: Block, block_details: BlockDetails) {
1719        // If the database is empty, initialize everything with the genesis block
1720        let header = block.header().header();
1721        let block_number = header.prefix.number;
1722        let block_root = *header.root();
1723
1724        state.fork_tips.clear();
1725        state.fork_tips.push_front(ForkTip {
1726            number: block_number,
1727            root: block_root,
1728        });
1729        state.block_roots.clear();
1730        state.block_roots.insert(block_root, block_number);
1731        state.blocks.clear();
1732        let beacon_chain_block_details = <dyn Any>::downcast_ref::<OwnedBeaconChainBlock>(&block)
1733            .map(|block| BeaconChainBlockDetails::from_body(block.body.body()));
1734        state
1735            .blocks
1736            .push_front(smallvec![ClientDatabaseBlock::InMemory {
1737                block,
1738                block_details,
1739                beacon_chain_block_details,
1740            }]);
1741    }
1742
1743    async fn insert_new_best_block(
1744        mut state: AsyncRwLockWriteGuard<'_, State<Block, StorageBackend>>,
1745        inner: &Inner<Block, StorageBackend>,
1746        block: Block,
1747        block_details: BlockDetails,
1748    ) -> Result<(), PersistBlockError> {
1749        let header = block.header().header();
1750        let block_number = header.prefix.number;
1751        let block_root = *header.root();
1752        let parent_root = header.prefix.parent_root;
1753
1754        // Adjust the relative order of forks to ensure the first index always corresponds to
1755        // ancestors of the new best block
1756        if !Self::adjust_ancestor_block_forks(&mut state.data.blocks, parent_root) {
1757            return Err(PersistBlockError::MissingParent);
1758        }
1759
1760        // Store new block in the state
1761        {
1762            for (index, fork_tip) in state.data.fork_tips.iter_mut().enumerate() {
1763                // Block's parent is no longer a fork tip, remove it
1764                if fork_tip.root == parent_root {
1765                    state.data.fork_tips.remove(index);
1766                    break;
1767                }
1768            }
1769
1770            state.data.fork_tips.push_front(ForkTip {
1771                number: block_number,
1772                root: block_root,
1773            });
1774            state.data.block_roots.insert(block_root, block_number);
1775            let beacon_chain_block_details =
1776                <dyn Any>::downcast_ref::<OwnedBeaconChainBlock>(&block)
1777                    .map(|block| BeaconChainBlockDetails::from_body(block.body.body()));
1778            state
1779                .data
1780                .blocks
1781                .push_front(smallvec![ClientDatabaseBlock::InMemory {
1782                    block,
1783                    block_details: block_details.clone(),
1784                    beacon_chain_block_details,
1785                }]);
1786        }
1787
1788        let options = &inner.options;
1789
1790        Self::confirm_canonical_block(block_number, &mut state.data, options);
1791        Self::prune_outdated_fork_tips(block_number, &mut state.data, options);
1792
1793        // Convert write lock into upgradable read lock to allow reads, while preventing concurrent
1794        // block modifications
1795        // TODO: This assumes both guarantees in https://github.com/smol-rs/async-lock/issues/100
1796        //  are satisfied. If not, blocking read locks in other places will cause issues.
1797        let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1798
1799        let mut blocks_to_persist = Vec::new();
1800        for block_offset in u64::from(options.soft_confirmation_depth) as usize.. {
1801            let Some(fork_blocks) = state.data.blocks.get(block_offset) else {
1802                break;
1803            };
1804
1805            let len_before = blocks_to_persist.len();
1806            fork_blocks
1807                .iter()
1808                .enumerate()
1809                .filter_map(|(fork_offset, client_database_block)| {
1810                    match client_database_block {
1811                        ClientDatabaseBlock::InMemory {
1812                            block,
1813                            block_details,
1814                            beacon_chain_block_details: _,
1815                        } => Some(BlockToPersist {
1816                            block_offset,
1817                            fork_offset,
1818                            block,
1819                            block_details,
1820                        }),
1821                        ClientDatabaseBlock::Persisted { .. }
1822                        | ClientDatabaseBlock::PersistedConfirmed { .. } => {
1823                            // Already persisted
1824                            None
1825                        }
1826                    }
1827                })
1828                .collect_into(&mut blocks_to_persist);
1829
1830            if blocks_to_persist.len() == len_before {
1831                break;
1832            }
1833        }
1834
1835        // Persist blocks from older to newer
1836        let mut persisted_blocks = Vec::with_capacity(blocks_to_persist.len());
1837        {
1838            let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1839
1840            for block_to_persist in blocks_to_persist.into_iter().rev() {
1841                let BlockToPersist {
1842                    block_offset,
1843                    fork_offset,
1844                    block,
1845                    block_details,
1846                } = block_to_persist;
1847
1848                let write_location = storage_backend_adapter
1849                    .write_storage_item(StorageItemTemporary::Block(StorageItemTemporaryBlock {
1850                        header: block.header().buffer().clone(),
1851                        body: block.body().buffer().clone(),
1852                        mmr_with_block: Arc::clone(&block_details.mmr_with_block),
1853                        system_contract_states: StdArc::clone(
1854                            &block_details.system_contract_states,
1855                        ),
1856                    }))
1857                    .await?;
1858
1859                persisted_blocks.push(PersistedBlock {
1860                    block_offset,
1861                    fork_offset,
1862                    write_location,
1863                });
1864            }
1865        }
1866
1867        // Convert blocks to persisted
1868        let mut state = RwLockUpgradableReadGuard::upgrade(state).await;
1869        for persisted_block in persisted_blocks {
1870            let PersistedBlock {
1871                block_offset,
1872                fork_offset,
1873                write_location,
1874            } = persisted_block;
1875
1876            let block = state
1877                .data
1878                .blocks
1879                .get_mut(block_offset)
1880                .expect("Still holding the same lock since last check; qed")
1881                .get_mut(fork_offset)
1882                .expect("Still holding the same lock since last check; qed");
1883
1884            replace_with_or_abort(block, |block| {
1885                if let ClientDatabaseBlock::InMemory {
1886                    block,
1887                    block_details,
1888                    beacon_chain_block_details,
1889                } = block
1890                {
1891                    let (header, _body) = block.split();
1892
1893                    ClientDatabaseBlock::Persisted {
1894                        header,
1895                        block_details,
1896                        beacon_chain_block_details,
1897                        write_location,
1898                    }
1899                } else {
1900                    unreachable!("Still holding the same lock since last check; qed");
1901                }
1902            });
1903        }
1904
1905        // TODO: Prune blocks that are no longer necessary
1906        // TODO: Prune unused page groups here or elsewhere?
1907
1908        Ok(())
1909    }
1910
1911    /// Adjust the relative order of forks to ensure the first index always corresponds to
1912    /// `parent_block_root` and its ancestors.
1913    ///
1914    /// Returns `true` on success and `false` if one of the parents was not found.
1915    #[must_use]
1916    fn adjust_ancestor_block_forks(
1917        blocks: &mut VecDeque<SmallVec<[ClientDatabaseBlock<Block>; 2]>>,
1918        mut parent_block_root: BlockRoot,
1919    ) -> bool {
1920        let mut ancestor_blocks = blocks.iter_mut();
1921
1922        loop {
1923            if ancestor_blocks.len() == 1 {
1924                // Nothing left to adjust with a single fork
1925                break;
1926            }
1927
1928            let Some(parent_blocks) = ancestor_blocks.next() else {
1929                // No more parent headers present
1930                break;
1931            };
1932
1933            let Some(fork_offset_parent_block_root) =
1934                parent_blocks
1935                    .iter()
1936                    .enumerate()
1937                    .find_map(|(fork_offset, fork_block)| {
1938                        let fork_header = fork_block.header().header();
1939                        if *fork_header.root() == parent_block_root {
1940                            Some((fork_offset, fork_header.prefix.parent_root))
1941                        } else {
1942                            None
1943                        }
1944                    })
1945            else {
1946                return false;
1947            };
1948
1949            let fork_offset;
1950            (fork_offset, parent_block_root) = fork_offset_parent_block_root;
1951
1952            parent_blocks.swap(0, fork_offset);
1953        }
1954
1955        true
1956    }
1957
1958    /// Prune outdated fork tips that are too deep and have not been updated for a long time.
1959    ///
1960    /// Note that actual headers, blocks and MMRs could remain if they are currently used by
1961    /// something or were already persisted on disk. With persisted blocks specifically, RAM usage
1962    /// implications are minimal, and we wouldn't want to re-download already stored blocks in case
1963    /// they end up being necessary later.
1964    fn prune_outdated_fork_tips(
1965        best_number: BlockNumber,
1966        state: &mut StateData<Block>,
1967        options: &ClientDatabaseInnerOptions,
1968    ) {
1969        let state = &mut *state;
1970
1971        // These forks are just candidates because they will not be pruned if the reference count is
1972        // not 1, indicating they are still in use by something
1973        let mut candidate_forks_to_remove = Vec::with_capacity(options.max_fork_tips.get());
1974
1975        // Prune forks that are too far away from the best block
1976        state.fork_tips.retain(|fork_tip| {
1977            if best_number - fork_tip.number > options.max_fork_tip_distance {
1978                candidate_forks_to_remove.push(*fork_tip);
1979                false
1980            } else {
1981                true
1982            }
1983        });
1984        // Prune forks that exceed the maximum number of forks
1985        if state.fork_tips.len() > options.max_fork_tips.get() {
1986            state
1987                .fork_tips
1988                .drain(options.max_fork_tips.get()..)
1989                .collect_into(&mut candidate_forks_to_remove);
1990        }
1991
1992        // Prune all possible candidates
1993        candidate_forks_to_remove
1994            .retain(|fork_tip| !Self::prune_outdated_fork(best_number, fork_tip, state));
1995        // Return those that were not pruned back to the list of tips
1996        state.fork_tips.extend(candidate_forks_to_remove);
1997    }
1998
1999    /// Returns `true` if the tip was pruned successfully and `false` if it should be returned to
2000    /// the list of fork tips
2001    #[must_use]
2002    fn prune_outdated_fork(
2003        best_number: BlockNumber,
2004        fork_tip: &ForkTip,
2005        state: &mut StateData<Block>,
2006    ) -> bool {
2007        let block_offset = u64::from(best_number - fork_tip.number) as usize;
2008
2009        // Prune fork top and all its ancestors that are not used
2010        let mut block_root_to_prune = fork_tip.root;
2011        let mut pruned_tip = false;
2012        for block_offset in block_offset.. {
2013            let Some(fork_blocks) = state.blocks.get_mut(block_offset) else {
2014                if !pruned_tip {
2015                    error!(
2016                        %best_number,
2017                        ?fork_tip,
2018                        block_offset,
2019                        "Block offset was not present in the database, this is an implementation \
2020                        bug #1"
2021                    );
2022                }
2023                // No forks left to prune
2024                break;
2025            };
2026
2027            if fork_blocks.len() == 1 {
2028                if !pruned_tip {
2029                    error!(
2030                        %best_number,
2031                        ?fork_tip,
2032                        block_offset,
2033                        "Block offset was not present in the database, this is an implementation \
2034                        bug #2"
2035                    );
2036                }
2037
2038                // No forks left to prune
2039                break;
2040            }
2041
2042            let Some((fork_offset, block)) = fork_blocks
2043                .iter()
2044                .enumerate()
2045                // Skip ancestor of the best block, it is certainly not a fork to be pruned
2046                .skip(1)
2047                .find(|(_fork_offset, block)| {
2048                    *block.header().header().root() == block_root_to_prune
2049                })
2050            else {
2051                if !pruned_tip {
2052                    error!(
2053                        %best_number,
2054                        ?fork_tip,
2055                        block_offset,
2056                        "Block offset was not present in the database, this is an implementation \
2057                        bug #3"
2058                    );
2059                }
2060
2061                // Nothing left to prune
2062                break;
2063            };
2064
2065            // More than one instance means something somewhere is using or depends on this block
2066            if block.header().ref_count() > 1 {
2067                break;
2068            }
2069
2070            // Blocks that are already persisted
2071            match block {
2072                ClientDatabaseBlock::InMemory { .. } => {
2073                    // Prune
2074                }
2075                ClientDatabaseBlock::Persisted { .. }
2076                | ClientDatabaseBlock::PersistedConfirmed { .. } => {
2077                    // Already on disk, keep it in memory for later, but prune the tip
2078                    pruned_tip = true;
2079                    break;
2080                }
2081            }
2082
2083            state.block_roots.get_mut(&block_root_to_prune);
2084            block_root_to_prune = block.header().header().prefix.parent_root;
2085            fork_blocks.swap_remove(fork_offset);
2086
2087            pruned_tip = true;
2088        }
2089
2090        pruned_tip
2091    }
2092
2093    /// Confirm a block at confirmation depth k and prune any other blocks at the same depth with
2094    /// their descendants
2095    fn confirm_canonical_block(
2096        best_number: BlockNumber,
2097        state_data: &mut StateData<Block>,
2098        options: &ClientDatabaseInnerOptions,
2099    ) {
2100        // `+1` means it effectively confirms parent blocks instead. This is done to keep the parent
2101        // of the confirmed block with its MMR in memory due to confirmed blocks not storing their
2102        // MMRs, which might be needed for reorgs at the lowest possible depth.
2103        let block_offset = u64::from(options.block_confirmation_depth + BlockNumber::ONE) as usize;
2104
2105        let Some(fork_blocks) = state_data.blocks.get_mut(block_offset) else {
2106            // Nothing to confirm yet
2107            return;
2108        };
2109
2110        // Mark the canonical block as confirmed
2111        {
2112            let Some(canonical_block) = fork_blocks.first_mut() else {
2113                error!(
2114                    %best_number,
2115                    block_offset,
2116                    "Have not found a canonical block to confirm, this is an implementation bug"
2117                );
2118                return;
2119            };
2120
2121            replace_with_or_abort(canonical_block, |block| match block {
2122                ClientDatabaseBlock::InMemory { .. } => {
2123                    error!(
2124                        %best_number,
2125                        block_offset,
2126                        header = ?block.header(),
2127                        "Block to be confirmed must not be in memory, this is an implementation bug"
2128                    );
2129                    block
2130                }
2131                ClientDatabaseBlock::Persisted {
2132                    header,
2133                    block_details: _,
2134                    beacon_chain_block_details,
2135                    write_location,
2136                } => ClientDatabaseBlock::PersistedConfirmed {
2137                    header,
2138                    beacon_chain_block_details,
2139                    write_location,
2140                },
2141                ClientDatabaseBlock::PersistedConfirmed { .. } => {
2142                    error!(
2143                        %best_number,
2144                        block_offset,
2145                        header = ?block.header(),
2146                        "Block to be confirmed must not be confirmed yet, this is an \
2147                        implementation bug"
2148                    );
2149                    block
2150                }
2151            });
2152        }
2153
2154        // Prune the rest of the blocks and their descendants
2155        let mut block_roots_to_prune = fork_blocks
2156            .drain(1..)
2157            .map(|block| *block.header().header().root())
2158            .collect::<Vec<_>>();
2159        let mut current_block_offset = block_offset;
2160        while !block_roots_to_prune.is_empty() {
2161            // Prune fork tips (if any)
2162            state_data
2163                .fork_tips
2164                .retain(|fork_tip| !block_roots_to_prune.contains(&fork_tip.root));
2165
2166            // Prune removed block roots
2167            for block_root in &block_roots_to_prune {
2168                state_data.block_roots.remove(block_root);
2169            }
2170
2171            // Block offset for direct descendants
2172            if let Some(next_block_offset) = current_block_offset.checked_sub(1) {
2173                current_block_offset = next_block_offset;
2174            } else {
2175                // Reached the tip
2176                break;
2177            }
2178
2179            let fork_blocks = state_data
2180                .blocks
2181                .get_mut(current_block_offset)
2182                .expect("Lower block offset always exists; qed");
2183
2184            // Collect descendants of pruned blocks to prune them next
2185            block_roots_to_prune = fork_blocks
2186                .drain_filter(|block| {
2187                    let header = block.header().header();
2188
2189                    block_roots_to_prune.contains(&header.prefix.parent_root)
2190                })
2191                .map(|block| *block.header().header().root())
2192                .collect();
2193        }
2194    }
2195}