Skip to main content

ab_client_database/
lib.rs

1//! Client database.
2//!
3//! ## High-level architecture overview
4//!
5//! The database operates on [`ClientDatabaseStorageBackend`], which is backed by [`AlignedPage`]s
6//! that can be read or written. Pages contain `StorageItem`s, one storage item can occupy one or
7//! more pages, but pages always belong to a single storage item. Pages are the smallest unit and
8//! align nicely with the hardware architecture of modern SSDs. Each page starts with a prefix that
9//! describes the contents of the page. `StorageItem` always starts at the multiple of the
10//! `u128`/16 bytes, allowing for direct memory mapping onto target data structures.
11//!
12//! [`AlignedPage`]: crate::storage_backend::AlignedPage
13//!
14//! Individual pages are grouped into page groups (configurable via [`ClientDatabaseOptions`]). Page
15//! groups can be permanent and ephemeral. Permanent page groups store information that is never
16//! going to be deleted, like segment headers. Ephemeral page groups store the majority of the
17//! information about blocks, blockchain state and other things that are being created all the time.
18//! Once information in an ephemeral page group is too old and no longer needed, it can be
19//! repurposed for a new permanent or ephemeral page group. There are different kinds of page groups
20//! defined in `PageGroupKind`, and each variant has independent sequence numbers.
21//!
22//! Page groups are append-only, there is only one active permanent and one ephemeral page group.
23//! They are appended with more pages containing storage items until there is no space to add a
24//! complete storage item, after which the next page group is started.
25//!
26//! Ephemeral page groups can be freed only when they contain 100% outdated storage items.
27//! Individual pages can't be freed.
28//!
29//! Each storage item has a sequence number and checksums that help to define the global ordering
30//! and check whether a storage item was written fully. Upon restart, the page group containing the
31//! latest storage items is found, and the latest fully written storage item is identified to
32//! reconstruct the database state.
33//!
34//! Each page group starts with a `StorageItemPageGroupHeader` storage item for easier
35//! identification.
36//!
37//! The database is typically contained in a single file (though in principle could be contained in
38//! multiple if necessary). Before the database can be used, it needs to be formatted with a
39//! specific size (it is possible to increase the size afterward) before it can be used. It is
40//! expected (but depends on the storage backend) that the whole file size is pre-allocated on disk
41//! and no writes will fail due to lack of disk space (which could be the case with a sparse file).
42
43#![expect(incomplete_features, reason = "generic_const_exprs")]
44// TODO: This feature is not actually used in this crate, but is added as a workaround for
45//  https://github.com/rust-lang/rust/issues/141492
46#![feature(generic_const_exprs)]
47#![feature(
48    const_block_items,
49    const_convert,
50    const_trait_impl,
51    default_field_values,
52    get_mut_unchecked,
53    iter_collect_into,
54    maybe_uninit_fill
55)]
56
57mod page_group;
58pub mod storage_backend;
59mod storage_backend_adapter;
60
61use crate::page_group::temporary::StorageItemTemporary;
62use crate::page_group::temporary::block::StorageItemTemporaryBlock;
63use crate::page_group::temporary::segment_headers::StorageItemTemporarySegmentHeaders;
64use crate::page_group::temporary::super_segment_headers::StorageItemTemporarySuperSegmentHeaders;
65use crate::storage_backend::ClientDatabaseStorageBackend;
66use crate::storage_backend_adapter::{
67    StorageBackendAdapter, StorageItemHandlerArg, StorageItemHandlers, WriteLocation,
68};
69use ab_client_api::{
70    BeaconChainInfo, BeaconChainInfoWrite, BlockDetails, BlockMerkleMountainRange, ChainInfo,
71    ChainInfoWrite, ContractSlotState, PersistBlockError, PersistSegmentHeadersError,
72    PersistSuperSegmentHeadersError, ReadBlockError, ShardSegmentRoot, ShardSegmentRootsError,
73};
74use ab_core_primitives::block::body::BeaconChainBody;
75use ab_core_primitives::block::body::owned::{GenericOwnedBlockBody, OwnedBeaconChainBody};
76use ab_core_primitives::block::header::GenericBlockHeader;
77use ab_core_primitives::block::header::owned::GenericOwnedBlockHeader;
78use ab_core_primitives::block::owned::{GenericOwnedBlock, OwnedBeaconChainBlock};
79use ab_core_primitives::block::{BlockNumber, BlockRoot, GenericBlock};
80use ab_core_primitives::segments::{
81    LocalSegmentIndex, SegmentHeader, SegmentIndex, SuperSegmentHeader, SuperSegmentIndex,
82};
83use ab_core_primitives::shard::RealShardKind;
84use ab_io_type::trivial_type::TrivialType;
85use async_lock::{
86    RwLock as AsyncRwLock, RwLockUpgradableReadGuard, RwLockWriteGuard as AsyncRwLockWriteGuard,
87};
88use rand::rngs::SysError;
89use rclite::Arc;
90use replace_with::replace_with_or_abort;
91use smallvec::{SmallVec, smallvec};
92use std::any::Any;
93use std::collections::{HashMap, VecDeque};
94use std::hash::{BuildHasherDefault, Hasher};
95use std::num::{NonZeroU32, NonZeroUsize};
96use std::ops::Deref;
97use std::sync::Arc as StdArc;
98use std::{fmt, io};
99use tracing::error;
100
101/// Unique identifier for a database
102#[derive(Debug, Copy, Clone, Eq, PartialEq, TrivialType)]
103#[repr(C)]
104pub struct DatabaseId([u8; 32]);
105
106impl Deref for DatabaseId {
107    type Target = [u8; 32];
108
109    #[inline(always)]
110    fn deref(&self) -> &Self::Target {
111        &self.0
112    }
113}
114
115impl AsRef<[u8]> for DatabaseId {
116    #[inline(always)]
117    fn as_ref(&self) -> &[u8] {
118        &self.0
119    }
120}
121
122impl DatabaseId {
123    #[inline(always)]
124    pub const fn new(bytes: [u8; 32]) -> Self {
125        Self(bytes)
126    }
127}
128
129#[derive(Default)]
130struct BlockRootHasher(u64);
131
132impl Hasher for BlockRootHasher {
133    #[inline(always)]
134    fn finish(&self) -> u64 {
135        self.0
136    }
137
138    #[inline(always)]
139    fn write(&mut self, bytes: &[u8]) {
140        let Some(state) = bytes.as_chunks().0.first().copied().map(u64::from_le_bytes) else {
141            return;
142        };
143
144        self.0 = state;
145    }
146}
147
148#[derive(Debug)]
149pub struct GenesisBlockBuilderResult<Block> {
150    /// Genesis block
151    pub block: Block,
152    /// System contracts state in the genesis block
153    pub system_contract_states: StdArc<[ContractSlotState]>,
154}
155
156/// Options for [`ClientDatabase`]
157#[derive(Debug, Copy, Clone)]
158pub struct ClientDatabaseOptions<GBB, StorageBackend> {
159    /// Write buffer size.
160    ///
161    /// Larger buffer allows buffering more async writes for improved responsiveness but requires
162    /// more RAM. Zero buffer size means all writes must be completed before returning from the
163    /// operation that triggered it. Non-zero buffer means writes can happen in the background.
164    ///
165    /// The recommended value is 5.
166    pub write_buffer_size: usize = 5,
167    /// Blocks at this depth are considered to be "confirmed" and irreversible from the consensus
168    /// perspective.
169    ///
170    /// This parameter allows establishing a final canonical order of blocks and eliminating any
171    /// potential forks at a specified depth and beyond.
172    pub block_confirmation_depth: BlockNumber,
173    /// Soft confirmation depth for blocks.
174    ///
175    /// Doesn't prevent forking on the consensus level but makes it extremely unlikely.
176    ///
177    /// This parameter determines how many blocks are retained in memory before being written to
178    /// disk. Writing discarded blocks to disk is a waste of resources, so they are retained in
179    /// memory before being soft-confirmed and written to disk for longer-term storage.
180    ///
181    /// A smaller number reduces memory usage while increasing the probability of unnecessary disk
182    /// writes. A larger number increases memory usage, while avoiding unnecessary disk writes, but
183    /// also increases the chance of recent blocks not being retained on disk in case of a crash.
184    ///
185    /// The recommended value is 3 blocks.
186    pub soft_confirmation_depth: BlockNumber = BlockNumber::from(3),
187    /// Defines how many fork tips should be maintained in total.
188    ///
189    /// As natural forks occur, there may be more than one tip in existence, with only one of them
190    /// being considered "canonical". This parameter defines how many of these tips to maintain in a
191    /// sort of LRU style cache. Tips beyond this limit that were not extended for a long time will
192    /// be pruned automatically.
193    ///
194    /// A larger number results in higher memory usage and higher complexity of pruning algorithms.
195    ///
196    /// The recommended value is 3 blocks.
197    pub max_fork_tips: NonZeroUsize = NonZeroUsize::new(3).expect("Not zero; qed"),
198    /// Max distance between fork tip and the best block.
199    ///
200    /// When forks are this deep, they will be pruned, even without reaching the `max_fork_tips`
201    /// limit. This essentially means the tip was not extended for some time, and while it is
202    /// theoretically possible for the chain to continue from this tip, the probability is so small
203    /// that it is not worth storing it.
204    ///
205    /// A larger value results in higher memory usage and higher complexity of pruning algorithms.
206    ///
207    /// The recommended value is 5 blocks.
208    pub max_fork_tip_distance: BlockNumber = BlockNumber::from(5),
209    /// Genesis block builder is responsible to create genesis block and corresponding state for
210    /// bootstrapping purposes.
211    pub genesis_block_builder: GBB,
212    /// Storage backend to use for storing and retrieving storage items
213    pub storage_backend: StorageBackend,
214}
215
216/// Options for [`ClientDatabase`]
217#[derive(Debug, Copy, Clone)]
218pub struct ClientDatabaseFormatOptions {
219    /// The number of [`AlignedPage`]s in a single page group.
220    ///
221    /// [`AlignedPage`]: crate::storage_backend::AlignedPage
222    ///
223    /// Each group always has a set of storage items with monotonically increasing sequence
224    /// numbers. The database only frees page groups for reuse when all storage items there are
225    /// no longer in use.
226    ///
227    /// A smaller number means storage can be reclaimed for reuse more quickly and higher
228    /// concurrency during restart, but must not be too small that no storage item fits within a
229    /// page group anymore. A larger number allows finding the range of sequence numbers that are
230    /// already used and where potential write interruption happened on restart more efficiently,
231    /// but will use more RAM in the process.
232    ///
233    /// The recommended size is 256 MiB unless a tiny database is used for testing purposes, where
234    /// a smaller value might work too.
235    pub page_group_size: NonZeroU32,
236    /// By default, formatting will be aborted if the database appears to be already formatted.
237    ///
238    /// Setting this option to `true` skips the check and formats the database anyway.
239    pub force: bool,
240}
241
242#[derive(Debug, thiserror::Error)]
243pub enum ClientDatabaseError {
244    /// Invalid soft confirmation depth, it must be smaller than confirmation depth k
245    #[error("Invalid soft confirmation depth, it must be smaller than confirmation depth k")]
246    InvalidSoftConfirmationDepth,
247    /// Invalid max fork tip distance, it must be smaller or equal to confirmation depth k
248    #[error("Invalid max fork tip distance, it must be smaller or equal to confirmation depth k")]
249    InvalidMaxForkTipDistance,
250    /// Storage backend has canceled read request
251    #[error("Storage backend has canceled read request")]
252    ReadRequestCancelled,
253    /// Storage backend read error
254    #[error("Storage backend read error: {error}")]
255    ReadError {
256        /// Low-level error
257        error: io::Error,
258    },
259    /// Unsupported database version
260    #[error("Unsupported database version: {database_version}")]
261    UnsupportedDatabaseVersion {
262        /// Database version
263        database_version: u8,
264    },
265    /// Page group size is too small, must be at least two pages
266    #[error("Page group size is too small ({page_group_size}), must be at least two pages")]
267    PageGroupSizeTooSmall {
268        /// Page group size in pages
269        page_group_size: u32,
270    },
271    /// Unexpected sequence number
272    #[error(
273        "Unexpected sequence number {actual} at page offset {page_offset} (expected \
274        {expected})"
275    )]
276    UnexpectedSequenceNumber {
277        /// Sequence number in the database
278        actual: u64,
279        /// Expected sequence number
280        expected: u64,
281        /// Page offset where storage item is found
282        page_offset: u32,
283    },
284    /// Unexpected storage item
285    #[error("Unexpected storage item at offset {page_offset}: {storage_item:?}")]
286    UnexpectedStorageItem {
287        /// First storage item
288        storage_item: Box<dyn fmt::Debug + Send + Sync>,
289        /// Page offset where storage item is found
290        page_offset: u32,
291    },
292    /// Invalid block
293    #[error("Invalid block at offset {page_offset}")]
294    InvalidBlock {
295        /// Page offset where storage item is found
296        page_offset: u32,
297    },
298    /// Invalid segment headers
299    #[error("Invalid segment headers at offset {page_offset}")]
300    InvalidSegmentHeaders {
301        /// Page offset where storage item is found
302        page_offset: u32,
303    },
304    /// Failed to adjust ancestor block forks
305    #[error("Failed to adjust ancestor block forks")]
306    FailedToAdjustAncestorBlockForks,
307    /// Database is not formatted yet
308    #[error("Database is not formatted yet")]
309    Unformatted,
310    /// Non-permanent first page group
311    #[error("Non-permanent first page group")]
312    NonPermanentFirstPageGroup,
313}
314
315/// Error for [`ClientDatabase::format()`]
316#[derive(Debug, thiserror::Error)]
317pub enum ClientDatabaseFormatError {
318    /// Storage backend has canceled read request
319    #[error("Storage backend has canceled read request")]
320    ReadRequestCancelled,
321    /// Storage backend read error
322    #[error("Storage backend read error: {error}")]
323    ReadError {
324        /// Low-level error
325        error: io::Error,
326    },
327    /// Failed to generate database id
328    #[error("Failed to generate database id")]
329    FailedToGenerateDatabaseId {
330        /// Low-level error
331        #[from]
332        error: SysError,
333    },
334    /// Database is already formatted yet
335    #[error("Database is already formatted yet")]
336    AlreadyFormatted,
337    /// Storage backend has canceled a writing request
338    #[error("Storage backend has canceled a writing request")]
339    WriteRequestCancelled,
340    /// Storage item write error
341    #[error("Storage item write error")]
342    StorageItemWriteError {
343        /// Low-level error
344        #[from]
345        error: io::Error,
346    },
347}
348
349#[derive(Debug, Copy, Clone)]
350struct ForkTip {
351    number: BlockNumber,
352    root: BlockRoot,
353}
354
355enum FullBlock<'a, Block>
356where
357    Block: GenericOwnedBlock,
358{
359    InMemory(&'a Block),
360    Persisted {
361        header: &'a Block::Header,
362        write_location: WriteLocation,
363    },
364}
365
366#[derive(Debug)]
367struct BeaconChainBlockDetails {
368    // Shard segment roots, only present for beacon chain blocks
369    shard_segment_roots: StdArc<[ShardSegmentRoot]>,
370}
371
372impl BeaconChainBlockDetails {
373    fn from_body(body: &BeaconChainBody<'_>) -> Self {
374        let shard_segment_roots = body
375            .intermediate_shard_blocks()
376            .iter()
377            .flat_map(|intermediate_shard_block_info| {
378                let own_segments = intermediate_shard_block_info
379                    .own_segments
380                    .into_iter()
381                    .flat_map({
382                        let shard_index = intermediate_shard_block_info.header.prefix.shard_index;
383
384                        move |own_segments| {
385                            (own_segments.first_local_segment_index..)
386                                .zip(own_segments.segment_roots)
387                                .map(move |(segment_index, &segment_root)| ShardSegmentRoot {
388                                    shard_index,
389                                    segment_index,
390                                    segment_root,
391                                })
392                        }
393                    });
394                let child_shard_segment_roots = intermediate_shard_block_info
395                    .leaf_shards_segments()
396                    .flat_map(move |(shard_index, own_segments)| {
397                        (own_segments.first_local_segment_index..)
398                            .zip(own_segments.segment_roots)
399                            .map(move |(segment_index, &segment_root)| ShardSegmentRoot {
400                                shard_index,
401                                segment_index,
402                                segment_root,
403                            })
404                    });
405
406                own_segments.chain(child_shard_segment_roots)
407            })
408            .collect();
409
410        Self {
411            shard_segment_roots,
412        }
413    }
414}
415
416/// Client database block contains details about the block state in the database.
417///
418/// Originally all blocks are stored in memory. Once a block is soft-confirmed (see
419/// [`ClientDatabaseOptions::soft_confirmation_depth`]), it is persisted (likely on disk). Later
420///  when it is "confirmed" fully (see [`ClientDatabaseOptions::soft_confirmation_depth`]), it
421/// becomes irreversible.
422#[derive(Debug)]
423enum ClientDatabaseBlock<Block>
424where
425    Block: GenericOwnedBlock,
426{
427    /// Block is stored in memory and wasn't persisted yet
428    InMemory {
429        block: Block,
430        block_details: BlockDetails,
431        /// Only present for beacon chain blocks
432        beacon_chain_block_details: Option<BeaconChainBlockDetails>,
433    },
434    /// Block was persisted (likely on disk)
435    Persisted {
436        header: Block::Header,
437        block_details: BlockDetails,
438        /// Only present for beacon chain blocks
439        beacon_chain_block_details: Option<BeaconChainBlockDetails>,
440        write_location: WriteLocation,
441    },
442    /// Block was persisted (likely on disk) and is irreversibly "confirmed" from the consensus
443    /// perspective
444    PersistedConfirmed {
445        header: Block::Header,
446        /// Only present for beacon chain blocks
447        beacon_chain_block_details: Option<BeaconChainBlockDetails>,
448        write_location: WriteLocation,
449    },
450}
451
452impl<Block> ClientDatabaseBlock<Block>
453where
454    Block: GenericOwnedBlock,
455{
456    #[inline(always)]
457    fn header(&self) -> &Block::Header {
458        match self {
459            Self::InMemory { block, .. } => block.header(),
460            Self::Persisted { header, .. } | Self::PersistedConfirmed { header, .. } => header,
461        }
462    }
463
464    #[inline(always)]
465    fn full_block(&self) -> FullBlock<'_, Block> {
466        match self {
467            Self::InMemory { block, .. } => FullBlock::InMemory(block),
468            Self::Persisted {
469                header,
470                write_location,
471                ..
472            }
473            | Self::PersistedConfirmed {
474                header,
475                write_location,
476                ..
477            } => FullBlock::Persisted {
478                header,
479                write_location: *write_location,
480            },
481        }
482    }
483
484    #[inline(always)]
485    fn block_details(&self) -> Option<&BlockDetails> {
486        match self {
487            Self::InMemory { block_details, .. } | Self::Persisted { block_details, .. } => {
488                Some(block_details)
489            }
490            Self::PersistedConfirmed { .. } => None,
491        }
492    }
493
494    #[inline(always)]
495    fn beacon_chain_block_details(&self) -> Option<&BeaconChainBlockDetails> {
496        match self {
497            Self::InMemory {
498                beacon_chain_block_details,
499                ..
500            }
501            | Self::Persisted {
502                beacon_chain_block_details,
503                ..
504            }
505            | Self::PersistedConfirmed {
506                beacon_chain_block_details,
507                ..
508            } => beacon_chain_block_details.as_ref(),
509        }
510    }
511}
512
513#[derive(Debug)]
514struct StateData<Block>
515where
516    Block: GenericOwnedBlock,
517{
518    /// Tips of forks that have no descendants.
519    ///
520    /// The current best block is at the front, the rest are in the order from most recently
521    /// updated towards the front to least recently at the back.
522    fork_tips: VecDeque<ForkTip>,
523    /// Map from block root to block number.
524    ///
525    /// Is meant to be used in conjunction with `headers` and `blocks` fields, which are indexed by
526    /// block numbers.
527    block_roots: HashMap<BlockRoot, BlockNumber, BuildHasherDefault<BlockRootHasher>>,
528    /// List of blocks with the newest at the front.
529    ///
530    /// The first element of the first entry corresponds to the best block.
531    ///
532    /// It is expected that in most block numbers there will be exactly one block, some two,
533    /// anything more than that will be very rare. The list of forks for a block number is
534    /// organized in such a way that the first entry at every block number corresponds to the
535    /// canonical version of the blockchain at any point in time.
536    ///
537    /// A position withing this data structure is called "block offset". This is an ephemeral value
538    /// and changes as new best blocks are added. Blocks at the same height are collectively called
539    /// "block forks" and the position of the block within the same block height is called
540    /// "fork offset". While fork offset `0` always corresponds to the canonical version of the
541    /// blockchain, other offsets are not guaranteed to follow any particular ordering rules.
542    blocks: VecDeque<SmallVec<[ClientDatabaseBlock<Block>; 2]>>,
543}
544
545#[derive(Debug)]
546struct SegmentHeadersCache {
547    segment_headers_cache: Vec<SegmentHeader>,
548}
549
550impl SegmentHeadersCache {
551    #[inline(always)]
552    fn last_segment_header(&self) -> Option<SegmentHeader> {
553        self.segment_headers_cache.last().copied()
554    }
555
556    #[inline(always)]
557    fn max_local_segment_index(&self) -> Option<LocalSegmentIndex> {
558        self.segment_headers_cache
559            .last()
560            .map(|segment_header| segment_header.index.as_inner())
561    }
562
563    #[inline(always)]
564    fn get_segment_header(&self, local_segment_index: LocalSegmentIndex) -> Option<SegmentHeader> {
565        self.segment_headers_cache
566            .get(u64::from(local_segment_index) as usize)
567            .copied()
568    }
569
570    /// Returns actually added segments (some might have been skipped)
571    fn add_segment_headers(
572        &mut self,
573        mut segment_headers: Vec<SegmentHeader>,
574    ) -> Result<Vec<SegmentHeader>, PersistSegmentHeadersError> {
575        self.segment_headers_cache.reserve(segment_headers.len());
576
577        let mut maybe_last_local_segment_index = self.max_local_segment_index();
578
579        if let Some(last_segment_index) = maybe_last_local_segment_index {
580            // Skip already stored segment headers
581            segment_headers
582                .retain(|segment_header| segment_header.index.as_inner() > last_segment_index);
583        }
584
585        // Check all input segment headers to see which ones are not stored yet and verifying that
586        // segment indices are monotonically increasing
587        for segment_header in segment_headers.iter().copied() {
588            let local_segment_index = segment_header.index.as_inner();
589            if let Some(last_local_segment_index) = maybe_last_local_segment_index {
590                if local_segment_index != last_local_segment_index + LocalSegmentIndex::ONE {
591                    return Err(PersistSegmentHeadersError::MustFollowLastSegmentIndex {
592                        local_segment_index,
593                        last_local_segment_index,
594                    });
595                }
596
597                self.segment_headers_cache.push(segment_header);
598                maybe_last_local_segment_index.replace(local_segment_index);
599            } else {
600                if local_segment_index != LocalSegmentIndex::ZERO {
601                    return Err(PersistSegmentHeadersError::FirstSegmentIndexZero {
602                        local_segment_index,
603                    });
604                }
605
606                self.segment_headers_cache.push(segment_header);
607                maybe_last_local_segment_index.replace(local_segment_index);
608            }
609        }
610
611        Ok(segment_headers)
612    }
613}
614
615#[derive(Debug)]
616struct SuperSegmentHeadersCache {
617    super_segment_headers_cache: Vec<SuperSegmentHeader>,
618}
619
620impl SuperSegmentHeadersCache {
621    #[inline(always)]
622    fn last_super_segment_header(&self) -> Option<SuperSegmentHeader> {
623        self.super_segment_headers_cache.last().copied()
624    }
625
626    #[inline]
627    fn previous_super_segment_header(
628        &self,
629        target_block_number: BlockNumber,
630    ) -> Option<SuperSegmentHeader> {
631        let block_number = target_block_number.checked_sub(BlockNumber::ONE)?;
632        let index = match self.super_segment_headers_cache.binary_search_by_key(
633            &block_number,
634            |super_segment_header| {
635                super_segment_header
636                    .target_beacon_chain_block_number
637                    .as_inner()
638            },
639        ) {
640            Ok(found_index) => found_index,
641            Err(insert_index) => insert_index.checked_sub(1)?,
642        };
643
644        self.super_segment_headers_cache.get(index).copied()
645    }
646
647    #[inline(always)]
648    fn get_super_segment_header(
649        &self,
650        local_segment_index: SuperSegmentIndex,
651    ) -> Option<SuperSegmentHeader> {
652        self.super_segment_headers_cache
653            .get(u64::from(local_segment_index) as usize)
654            .copied()
655    }
656
657    #[inline(always)]
658    fn get_super_segment_header_for_segment_index(
659        &self,
660        segment_index: SegmentIndex,
661    ) -> Option<SuperSegmentHeader> {
662        let index = self
663            .super_segment_headers_cache
664            .binary_search_by_key(&segment_index, |super_segment_header| {
665                super_segment_header.max_segment_index.as_inner()
666            })
667            .unwrap_or_else(|insert_index| insert_index);
668
669        let super_segment_header = self.super_segment_headers_cache.get(index).copied()?;
670
671        let max_segment_index = super_segment_header.max_segment_index.as_inner();
672        let first_segment_index = max_segment_index
673            - SegmentIndex::from(u64::from(super_segment_header.num_segments))
674            + SegmentIndex::ONE;
675
676        (first_segment_index..=max_segment_index)
677            .contains(&segment_index)
678            .then_some(super_segment_header)
679    }
680
681    /// Returns actually added super segments (some might have been skipped)
682    fn add_super_segment_headers(
683        &mut self,
684        mut super_segment_headers: Vec<SuperSegmentHeader>,
685    ) -> Result<Vec<SuperSegmentHeader>, PersistSuperSegmentHeadersError> {
686        self.super_segment_headers_cache
687            .reserve(super_segment_headers.len());
688
689        let mut maybe_last_super_segment_index = self
690            .super_segment_headers_cache
691            .last()
692            .map(|header| header.index.as_inner());
693
694        if let Some(last_super_segment_index) = maybe_last_super_segment_index {
695            // Skip already stored super segment headers
696            super_segment_headers.retain(|super_segment_header| {
697                super_segment_header.index.as_inner() > last_super_segment_index
698            });
699        }
700
701        // Check all input super segment headers to see which ones are not stored yet and verifying
702        // that super segment indices are monotonically increasing
703        for super_segment_header in super_segment_headers.iter().copied() {
704            let super_segment_index = super_segment_header.index.as_inner();
705            if let Some(last_super_segment_index) = maybe_last_super_segment_index {
706                if super_segment_index != last_super_segment_index + SuperSegmentIndex::ONE {
707                    return Err(
708                        PersistSuperSegmentHeadersError::MustFollowLastSegmentIndex {
709                            super_segment_index,
710                            last_super_segment_index,
711                        },
712                    );
713                }
714
715                self.super_segment_headers_cache.push(super_segment_header);
716                maybe_last_super_segment_index.replace(super_segment_index);
717            } else {
718                if super_segment_index != SuperSegmentIndex::ZERO {
719                    return Err(PersistSuperSegmentHeadersError::FirstSegmentIndexZero {
720                        super_segment_index,
721                    });
722                }
723
724                self.super_segment_headers_cache.push(super_segment_header);
725                maybe_last_super_segment_index.replace(super_segment_index);
726            }
727        }
728
729        Ok(super_segment_headers)
730    }
731}
732
733// TODO: Hide implementation details
734#[derive(Debug)]
735struct State<Block, StorageBackend>
736where
737    Block: GenericOwnedBlock,
738{
739    data: StateData<Block>,
740    segment_headers_cache: SegmentHeadersCache,
741    super_segment_headers_cache: SuperSegmentHeadersCache,
742    storage_backend_adapter: AsyncRwLock<StorageBackendAdapter<StorageBackend>>,
743}
744
745impl<Block, StorageBackend> State<Block, StorageBackend>
746where
747    Block: GenericOwnedBlock,
748{
749    #[inline(always)]
750    fn best_tip(&self) -> &ForkTip {
751        self.data
752            .fork_tips
753            .front()
754            .expect("The best block is always present; qed")
755    }
756
757    #[inline(always)]
758    fn best_block(&self) -> &ClientDatabaseBlock<Block> {
759        self.data
760            .blocks
761            .front()
762            .expect("The best block is always present; qed")
763            .first()
764            .expect("The best block is always present; qed")
765    }
766}
767
768#[derive(Debug)]
769struct BlockToPersist<'a, Block>
770where
771    Block: GenericOwnedBlock,
772{
773    block_offset: usize,
774    fork_offset: usize,
775    block: &'a Block,
776    block_details: &'a BlockDetails,
777}
778
779#[derive(Debug)]
780struct PersistedBlock {
781    block_offset: usize,
782    fork_offset: usize,
783    write_location: WriteLocation,
784}
785
786#[derive(Debug)]
787struct ClientDatabaseInnerOptions {
788    block_confirmation_depth: BlockNumber,
789    soft_confirmation_depth: BlockNumber,
790    max_fork_tips: NonZeroUsize,
791    max_fork_tip_distance: BlockNumber,
792}
793
794#[derive(Debug)]
795struct Inner<Block, StorageBackend>
796where
797    Block: GenericOwnedBlock,
798{
799    state: AsyncRwLock<State<Block, StorageBackend>>,
800    options: ClientDatabaseInnerOptions,
801}
802
803/// Client database
804#[derive(Debug)]
805pub struct ClientDatabase<Block, StorageBackend>
806where
807    Block: GenericOwnedBlock,
808{
809    inner: Arc<Inner<Block, StorageBackend>>,
810}
811
812impl<Block, StorageBackend> Clone for ClientDatabase<Block, StorageBackend>
813where
814    Block: GenericOwnedBlock,
815{
816    fn clone(&self) -> Self {
817        Self {
818            inner: self.inner.clone(),
819        }
820    }
821}
822
823#[expect(clippy::empty_drop, reason = "Not implemented yet")]
824impl<Block, StorageBackend> Drop for ClientDatabase<Block, StorageBackend>
825where
826    Block: GenericOwnedBlock,
827{
828    fn drop(&mut self) {
829        // TODO: Persist things that were not persisted yet to reduce the data loss on shutdown
830    }
831}
832
833impl<Block, StorageBackend> ChainInfo<Block> for ClientDatabase<Block, StorageBackend>
834where
835    Block: GenericOwnedBlock,
836    StorageBackend: ClientDatabaseStorageBackend,
837{
838    #[inline]
839    fn best_root(&self) -> BlockRoot {
840        // Blocking read lock is fine because where a write lock is only taken for a short time and
841        // most locks are read locks
842        self.inner.state.read_blocking().best_tip().root
843    }
844
845    #[inline]
846    fn best_header(&self) -> Block::Header {
847        // Blocking read lock is fine because where a write lock is only taken for a short time and
848        // most locks are read locks
849        self.inner
850            .state
851            .read_blocking()
852            .best_block()
853            .header()
854            .clone()
855    }
856
857    #[inline]
858    fn best_header_with_details(&self) -> (Block::Header, BlockDetails) {
859        // Blocking read lock is fine because where a write lock is only taken for a short time and
860        // most locks are read locks
861        let state = self.inner.state.read_blocking();
862        let best_block = state.best_block();
863        (
864            best_block.header().clone(),
865            best_block
866                .block_details()
867                .expect("Always present for the best block; qed")
868                .clone(),
869        )
870    }
871
872    // TODO: Add fast path when `descendant_block_root` is the best block
873    #[inline]
874    fn ancestor_header(
875        &self,
876        ancestor_block_number: BlockNumber,
877        descendant_block_root: &BlockRoot,
878    ) -> Option<Block::Header> {
879        // Blocking read lock is fine because where a write lock is only taken for a short time and
880        // most locks are read locks
881        let state = self.inner.state.read_blocking();
882        let best_number = state.best_tip().number;
883
884        let ancestor_block_offset =
885            u64::from(best_number.checked_sub(ancestor_block_number)?) as usize;
886        let ancestor_block_candidates = state.data.blocks.get(ancestor_block_offset)?;
887
888        let descendant_block_number = *state.data.block_roots.get(descendant_block_root)?;
889        if ancestor_block_number > descendant_block_number {
890            return None;
891        }
892        let descendant_block_offset =
893            u64::from(best_number.checked_sub(descendant_block_number)?) as usize;
894
895        // Range of blocks where the first item is expected to contain a descendant
896        let mut blocks_range_iter = state
897            .data
898            .blocks
899            .iter()
900            .enumerate()
901            .skip(descendant_block_offset);
902
903        let (_offset, descendant_block_candidates) = blocks_range_iter.next()?;
904        let descendant_header = descendant_block_candidates
905            .iter()
906            .find(|block| &*block.header().header().root() == descendant_block_root)?
907            .header()
908            .header();
909
910        // If there are no forks at this level, then this is the canonical chain and ancestor
911        // block number we're looking for is the first block at the corresponding block number.
912        // Similarly, if there is just a single ancestor candidate and descendant exists, it must be
913        // the one we care about.
914        if descendant_block_candidates.len() == 1 || ancestor_block_candidates.len() == 1 {
915            return ancestor_block_candidates
916                .iter()
917                .next()
918                .map(|block| block.header().clone());
919        }
920
921        let mut parent_block_root = &descendant_header.prefix.parent_root;
922
923        // Iterate over the blocks following descendant until ancestor is reached
924        for (block_offset, parent_candidates) in blocks_range_iter {
925            let parent_header = parent_candidates
926                .iter()
927                .find(|header| &*header.header().header().root() == parent_block_root)?
928                .header();
929
930            // When header offset matches, we found the header
931            if block_offset == ancestor_block_offset {
932                return Some(parent_header.clone());
933            }
934
935            parent_block_root = &parent_header.header().prefix.parent_root;
936        }
937
938        None
939    }
940
941    #[inline]
942    fn header(&self, block_root: &BlockRoot) -> Option<Block::Header> {
943        // Blocking read lock is fine because where a write lock is only taken for a short time and
944        // most locks are read locks
945        let state = self.inner.state.read_blocking();
946        let best_number = state.best_tip().number;
947
948        let block_number = *state.data.block_roots.get(block_root)?;
949        let block_offset = u64::from(best_number.checked_sub(block_number)?) as usize;
950        let block_candidates = state.data.blocks.get(block_offset)?;
951
952        block_candidates.iter().find_map(|block| {
953            let header = block.header();
954
955            if &*header.header().root() == block_root {
956                Some(header.clone())
957            } else {
958                None
959            }
960        })
961    }
962
963    #[inline]
964    fn header_with_details(&self, block_root: &BlockRoot) -> Option<(Block::Header, BlockDetails)> {
965        // Blocking read lock is fine because where a write lock is only taken for a short time and
966        // most locks are read locks
967        let state = self.inner.state.read_blocking();
968        let best_number = state.best_tip().number;
969
970        let block_number = *state.data.block_roots.get(block_root)?;
971        let block_offset = u64::from(best_number.checked_sub(block_number)?) as usize;
972        let block_candidates = state.data.blocks.get(block_offset)?;
973
974        block_candidates.iter().find_map(|block| {
975            let header = block.header();
976            let block_details = block.block_details().cloned()?;
977
978            if &*header.header().root() == block_root {
979                Some((header.clone(), block_details))
980            } else {
981                None
982            }
983        })
984    }
985
986    #[inline]
987    async fn block(&self, block_root: &BlockRoot) -> Result<Block, ReadBlockError> {
988        let state = self.inner.state.read().await;
989        let best_number = state.best_tip().number;
990
991        let block_number = *state
992            .data
993            .block_roots
994            .get(block_root)
995            .ok_or(ReadBlockError::UnknownBlockRoot)?;
996        let block_offset = u64::from(
997            best_number
998                .checked_sub(block_number)
999                .expect("Known block roots always have valid block offset; qed"),
1000        ) as usize;
1001        let block_candidates = state
1002            .data
1003            .blocks
1004            .get(block_offset)
1005            .expect("Valid block offsets always have block entries; qed");
1006
1007        for block_candidate in block_candidates {
1008            let header = block_candidate.header();
1009
1010            if &*header.header().root() == block_root {
1011                return match block_candidate.full_block() {
1012                    FullBlock::InMemory(block) => Ok(block.clone()),
1013                    FullBlock::Persisted {
1014                        header,
1015                        write_location,
1016                    } => {
1017                        let storage_backend_adapter = state.storage_backend_adapter.read().await;
1018
1019                        let storage_item = storage_backend_adapter
1020                            .read_storage_item::<StorageItemTemporary>(write_location)
1021                            .await?;
1022
1023                        let storage_item_block = match storage_item {
1024                            StorageItemTemporary::Block(storage_item_block) => storage_item_block,
1025                            StorageItemTemporary::SegmentHeaders(_) => {
1026                                return Err(ReadBlockError::StorageItemReadError {
1027                                    error: io::Error::other(
1028                                        "Unexpected storage item: `SegmentHeaders`",
1029                                    ),
1030                                });
1031                            }
1032                            StorageItemTemporary::SuperSegmentHeaders(_) => {
1033                                return Err(ReadBlockError::StorageItemReadError {
1034                                    error: io::Error::other(
1035                                        "Unexpected storage item: `SuperSegmentHeaders`",
1036                                    ),
1037                                });
1038                            }
1039                        };
1040
1041                        let StorageItemTemporaryBlock {
1042                            header: _,
1043                            body,
1044                            mmr_with_block: _,
1045                            system_contract_states: _,
1046                        } = storage_item_block;
1047
1048                        Block::from_buffers(header.buffer().clone(), body)
1049                            .ok_or(ReadBlockError::FailedToDecode)
1050                    }
1051                };
1052            }
1053        }
1054
1055        unreachable!("Known block root always has block candidate associated with it; qed")
1056    }
1057
1058    #[inline]
1059    fn last_segment_header(&self) -> Option<SegmentHeader> {
1060        // Blocking read lock is fine because where a write lock is only taken for a short time and
1061        // most locks are read locks
1062        let state = self.inner.state.read_blocking();
1063        state.segment_headers_cache.last_segment_header()
1064    }
1065
1066    #[inline]
1067    fn get_segment_header(&self, segment_index: LocalSegmentIndex) -> Option<SegmentHeader> {
1068        // Blocking read lock is fine because where a write lock is only taken for a short time and
1069        // most locks are read locks
1070        let state = self.inner.state.read_blocking();
1071
1072        state
1073            .segment_headers_cache
1074            .get_segment_header(segment_index)
1075    }
1076
1077    fn segment_headers_for_block(&self, block_number: BlockNumber) -> Vec<SegmentHeader> {
1078        // Blocking read lock is fine because where a write lock is only taken for a short time and
1079        // most locks are read locks
1080        let state = self.inner.state.read_blocking();
1081
1082        let Some(last_local_segment_index) = state.segment_headers_cache.max_local_segment_index()
1083        else {
1084            // Not initialized
1085            return Vec::new();
1086        };
1087
1088        // Special case for the initial segment (for beacon chain genesis block)
1089        if Block::Block::SHARD_KIND == RealShardKind::BeaconChain
1090            && block_number == BlockNumber::ONE
1091        {
1092            // If there is a segment index present, and we store monotonically increasing segment
1093            // headers, then the first header exists
1094            return vec![
1095                state
1096                    .segment_headers_cache
1097                    .get_segment_header(LocalSegmentIndex::ZERO)
1098                    .expect("Segment headers are stored in monotonically increasing order; qed"),
1099            ];
1100        }
1101
1102        if last_local_segment_index == LocalSegmentIndex::ZERO {
1103            // Genesis segment already included in block #1
1104            return Vec::new();
1105        }
1106
1107        let mut current_local_segment_index = last_local_segment_index;
1108        loop {
1109            // If the current segment index present, and we store monotonically increasing segment
1110            // headers, then the current segment header exists as well.
1111            let current_segment_header = state
1112                .segment_headers_cache
1113                .get_segment_header(current_local_segment_index)
1114                .expect("Segment headers are stored in monotonically increasing order; qed");
1115
1116            // The block immediately after the archived segment adding the confirmation depth
1117            let target_block_number = current_segment_header.last_archived_block.number()
1118                + BlockNumber::ONE
1119                + self.inner.options.block_confirmation_depth;
1120            if target_block_number == block_number {
1121                let mut headers_for_block = vec![current_segment_header];
1122
1123                // Check block spanning multiple segments
1124                let last_archived_block_number = current_segment_header.last_archived_block.number;
1125                let mut local_segment_index = current_local_segment_index - LocalSegmentIndex::ONE;
1126
1127                while let Some(segment_header) = state
1128                    .segment_headers_cache
1129                    .get_segment_header(local_segment_index)
1130                {
1131                    if segment_header.last_archived_block.number == last_archived_block_number {
1132                        headers_for_block.insert(0, segment_header);
1133                        local_segment_index -= LocalSegmentIndex::ONE;
1134                    } else {
1135                        break;
1136                    }
1137                }
1138
1139                return headers_for_block;
1140            }
1141
1142            // iterate segments further
1143            if target_block_number > block_number {
1144                // no need to check the initial segment
1145                if current_local_segment_index > LocalSegmentIndex::ONE {
1146                    current_local_segment_index -= LocalSegmentIndex::ONE;
1147                } else {
1148                    break;
1149                }
1150            } else {
1151                // No segment headers required
1152                return Vec::new();
1153            }
1154        }
1155
1156        // No segment headers required
1157        Vec::new()
1158    }
1159}
1160
1161impl<Block, StorageBackend> ChainInfoWrite<Block> for ClientDatabase<Block, StorageBackend>
1162where
1163    Block: GenericOwnedBlock,
1164    StorageBackend: ClientDatabaseStorageBackend,
1165{
1166    async fn persist_block(
1167        &self,
1168        block: Block,
1169        block_details: BlockDetails,
1170    ) -> Result<(), PersistBlockError> {
1171        let mut state = self.inner.state.write().await;
1172        let best_number = state.best_tip().number;
1173
1174        let header = block.header().header();
1175
1176        let block_number = header.prefix.number;
1177
1178        if best_number == BlockNumber::ZERO && block_number != BlockNumber::ONE {
1179            // Special case when syncing on top of the fresh database
1180            Self::insert_first_block(&mut state.data, block, block_details);
1181
1182            return Ok(());
1183        }
1184
1185        if block_number == best_number + BlockNumber::ONE {
1186            return Self::insert_new_best_block(state, &self.inner, block, block_details).await;
1187        }
1188
1189        let block_offset = u64::from(
1190            best_number
1191                .checked_sub(block_number)
1192                .ok_or(PersistBlockError::MissingParent)?,
1193        ) as usize;
1194
1195        if block_offset >= u64::from(self.inner.options.block_confirmation_depth) as usize {
1196            return Err(PersistBlockError::OutsideAcceptableRange);
1197        }
1198
1199        let state = &mut *state;
1200
1201        let block_forks = state.data.blocks.get_mut(block_offset).ok_or_else(|| {
1202            error!(
1203                %block_number,
1204                %block_offset,
1205                "Failed to store block fork, header offset is missing despite being within \
1206                acceptable range"
1207            );
1208
1209            PersistBlockError::OutsideAcceptableRange
1210        })?;
1211
1212        for (index, fork_tip) in state.data.fork_tips.iter_mut().enumerate() {
1213            // Block's parent is no longer a fork tip, remove it
1214            if fork_tip.root == header.prefix.parent_root {
1215                state.data.fork_tips.remove(index);
1216                break;
1217            }
1218        }
1219
1220        let block_root = *header.root();
1221        // Insert at position 1, which means the most recent tip, which doesn't correspond to
1222        // the best block
1223        state.data.fork_tips.insert(
1224            1,
1225            ForkTip {
1226                number: block_number,
1227                root: block_root,
1228            },
1229        );
1230        state.data.block_roots.insert(block_root, block_number);
1231        let beacon_chain_block_details = <dyn Any>::downcast_ref::<OwnedBeaconChainBlock>(&block)
1232            .map(|block| BeaconChainBlockDetails::from_body(block.body.body()));
1233        block_forks.push(ClientDatabaseBlock::InMemory {
1234            block,
1235            block_details,
1236            beacon_chain_block_details,
1237        });
1238
1239        Self::prune_outdated_fork_tips(block_number, &mut state.data, &self.inner.options);
1240
1241        Ok(())
1242    }
1243
1244    async fn persist_segment_headers(
1245        &self,
1246        segment_headers: Vec<SegmentHeader>,
1247    ) -> Result<(), PersistSegmentHeadersError> {
1248        let mut state = self.inner.state.write().await;
1249
1250        let added_segment_headers = state
1251            .segment_headers_cache
1252            .add_segment_headers(segment_headers)?;
1253
1254        if added_segment_headers.is_empty() {
1255            return Ok(());
1256        }
1257
1258        // Convert write lock into upgradable read lock to allow reads, while preventing segment
1259        // headers modifications
1260        // TODO: This assumes both guarantees in https://github.com/smol-rs/async-lock/issues/100
1261        //  are satisfied. If not, blocking read locks in other places will cause issues.
1262        let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1263
1264        let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1265
1266        storage_backend_adapter
1267            .write_storage_item(StorageItemTemporary::SegmentHeaders(
1268                StorageItemTemporarySegmentHeaders {
1269                    segment_headers: added_segment_headers,
1270                },
1271            ))
1272            .await?;
1273
1274        Ok(())
1275    }
1276}
1277
1278impl<StorageBackend> BeaconChainInfo for ClientDatabase<OwnedBeaconChainBlock, StorageBackend>
1279where
1280    StorageBackend: ClientDatabaseStorageBackend,
1281{
1282    fn shard_segment_roots(
1283        &self,
1284        block_number: BlockNumber,
1285    ) -> Result<StdArc<[ShardSegmentRoot]>, ShardSegmentRootsError> {
1286        // Blocking read lock is fine because where a write lock is only taken for a short time and
1287        // most locks are read locks
1288        let state = self.inner.state.read_blocking();
1289        let best_number = state.best_tip().number;
1290
1291        let block_offset = u64::from(
1292            best_number
1293                .checked_sub(block_number)
1294                .ok_or(ShardSegmentRootsError::BlockMissing { block_number })?,
1295        ) as usize;
1296
1297        let block = state
1298            .data
1299            .blocks
1300            .get(block_offset)
1301            .ok_or(ShardSegmentRootsError::BlockMissing { block_number })?
1302            .first()
1303            .expect("There is always at least one block candidate; qed");
1304
1305        Ok(StdArc::clone(
1306            &block
1307                .beacon_chain_block_details()
1308                .as_ref()
1309                .expect("Always present in the beacon chain block; qed")
1310                .shard_segment_roots,
1311        ))
1312    }
1313
1314    #[inline]
1315    fn last_super_segment_header(&self) -> Option<SuperSegmentHeader> {
1316        // Blocking read lock is fine because where a write lock is only taken for a short time and
1317        // most locks are read locks
1318        let state = self.inner.state.read_blocking();
1319        state
1320            .super_segment_headers_cache
1321            .last_super_segment_header()
1322    }
1323
1324    #[inline]
1325    fn previous_super_segment_header(
1326        &self,
1327        block_number: BlockNumber,
1328    ) -> Option<SuperSegmentHeader> {
1329        // Blocking read lock is fine because where a write lock is only taken for a short time and
1330        // most locks are read locks
1331        let state = self.inner.state.read_blocking();
1332
1333        state
1334            .super_segment_headers_cache
1335            .previous_super_segment_header(block_number)
1336    }
1337
1338    #[inline]
1339    fn get_super_segment_header(
1340        &self,
1341        super_segment_index: SuperSegmentIndex,
1342    ) -> Option<SuperSegmentHeader> {
1343        // Blocking read lock is fine because where a write lock is only taken for a short time and
1344        // most locks are read locks
1345        let state = self.inner.state.read_blocking();
1346
1347        state
1348            .super_segment_headers_cache
1349            .get_super_segment_header(super_segment_index)
1350    }
1351
1352    fn get_super_segment_header_for_segment_index(
1353        &self,
1354        segment_index: SegmentIndex,
1355    ) -> Option<SuperSegmentHeader> {
1356        // Blocking read lock is fine because where a write lock is only taken for a short time and
1357        // most locks are read locks
1358        let state = self.inner.state.read_blocking();
1359
1360        state
1361            .super_segment_headers_cache
1362            .get_super_segment_header_for_segment_index(segment_index)
1363    }
1364}
1365
1366impl<StorageBackend> BeaconChainInfoWrite for ClientDatabase<OwnedBeaconChainBlock, StorageBackend>
1367where
1368    StorageBackend: ClientDatabaseStorageBackend,
1369{
1370    async fn persist_super_segment_header(
1371        &self,
1372        super_segment_header: SuperSegmentHeader,
1373    ) -> Result<bool, PersistSuperSegmentHeadersError> {
1374        let mut state = self.inner.state.write().await;
1375
1376        let added_super_segment_headers = state
1377            .super_segment_headers_cache
1378            .add_super_segment_headers(vec![super_segment_header])?;
1379
1380        if added_super_segment_headers.is_empty() {
1381            return Ok(false);
1382        }
1383
1384        // Convert write lock into upgradable read lock to allow reads, while preventing super
1385        // segment headers modifications
1386        // TODO: This assumes both guarantees in https://github.com/smol-rs/async-lock/issues/100
1387        //  are satisfied. If not, blocking read locks in other places will cause issues.
1388        let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1389
1390        let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1391
1392        storage_backend_adapter
1393            .write_storage_item(StorageItemTemporary::SuperSegmentHeaders(
1394                StorageItemTemporarySuperSegmentHeaders {
1395                    super_segment_headers: added_super_segment_headers,
1396                },
1397            ))
1398            .await?;
1399
1400        Ok(true)
1401    }
1402
1403    async fn persist_super_segment_headers(
1404        &self,
1405        super_segment_headers: Vec<SuperSegmentHeader>,
1406    ) -> Result<(), PersistSuperSegmentHeadersError> {
1407        let mut state = self.inner.state.write().await;
1408
1409        let added_super_segment_headers = state
1410            .super_segment_headers_cache
1411            .add_super_segment_headers(super_segment_headers)?;
1412
1413        if added_super_segment_headers.is_empty() {
1414            return Ok(());
1415        }
1416
1417        // Convert write lock into upgradable read lock to allow reads, while preventing super
1418        // segment headers modifications
1419        // TODO: This assumes both guarantees in https://github.com/smol-rs/async-lock/issues/100
1420        //  are satisfied. If not, blocking read locks in other places will cause issues.
1421        let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1422
1423        let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1424
1425        storage_backend_adapter
1426            .write_storage_item(StorageItemTemporary::SuperSegmentHeaders(
1427                StorageItemTemporarySuperSegmentHeaders {
1428                    super_segment_headers: added_super_segment_headers,
1429                },
1430            ))
1431            .await?;
1432
1433        Ok(())
1434    }
1435}
1436
1437impl<Block, StorageBackend> ClientDatabase<Block, StorageBackend>
1438where
1439    Block: GenericOwnedBlock,
1440    StorageBackend: ClientDatabaseStorageBackend,
1441{
1442    /// Open the existing database.
1443    ///
1444    /// NOTE: The database needs to be formatted with [`Self::format()`] before it can be used.
1445    pub async fn open<GBB>(
1446        options: ClientDatabaseOptions<GBB, StorageBackend>,
1447    ) -> Result<Self, ClientDatabaseError>
1448    where
1449        GBB: FnOnce() -> GenesisBlockBuilderResult<Block>,
1450    {
1451        let ClientDatabaseOptions {
1452            write_buffer_size,
1453            block_confirmation_depth,
1454            soft_confirmation_depth,
1455            max_fork_tips,
1456            max_fork_tip_distance,
1457            genesis_block_builder,
1458            storage_backend,
1459        } = options;
1460        if soft_confirmation_depth >= block_confirmation_depth {
1461            return Err(ClientDatabaseError::InvalidSoftConfirmationDepth);
1462        }
1463
1464        if max_fork_tip_distance > block_confirmation_depth {
1465            return Err(ClientDatabaseError::InvalidMaxForkTipDistance);
1466        }
1467
1468        let mut state_data = StateData {
1469            fork_tips: VecDeque::new(),
1470            block_roots: HashMap::default(),
1471            blocks: VecDeque::new(),
1472        };
1473        let mut segment_headers_cache = SegmentHeadersCache {
1474            segment_headers_cache: Vec::new(),
1475        };
1476        let mut super_segment_headers_cache = SuperSegmentHeadersCache {
1477            super_segment_headers_cache: Vec::new(),
1478        };
1479
1480        let options = ClientDatabaseInnerOptions {
1481            block_confirmation_depth,
1482            soft_confirmation_depth,
1483            max_fork_tips,
1484            max_fork_tip_distance,
1485        };
1486
1487        let storage_item_handlers = StorageItemHandlers {
1488            permanent: |_arg| {
1489                // TODO
1490                Ok(())
1491            },
1492            temporary: |arg| {
1493                let StorageItemHandlerArg {
1494                    storage_item,
1495                    page_offset,
1496                    num_pages,
1497                } = arg;
1498                let storage_item_block = match storage_item {
1499                    StorageItemTemporary::Block(storage_item_block) => storage_item_block,
1500                    StorageItemTemporary::SegmentHeaders(segment_headers) => {
1501                        let num_segment_headers = segment_headers.segment_headers.len();
1502                        return match segment_headers_cache
1503                            .add_segment_headers(segment_headers.segment_headers)
1504                        {
1505                            Ok(_) => Ok(()),
1506                            Err(error) => {
1507                                error!(
1508                                    %page_offset,
1509                                    %num_segment_headers,
1510                                    %error,
1511                                    "Failed to add segment headers from storage item"
1512                                );
1513
1514                                Err(ClientDatabaseError::InvalidSegmentHeaders { page_offset })
1515                            }
1516                        };
1517                    }
1518                    StorageItemTemporary::SuperSegmentHeaders(super_segment_headers) => {
1519                        let num_super_segment_headers =
1520                            super_segment_headers.super_segment_headers.len();
1521                        return match super_segment_headers_cache
1522                            .add_super_segment_headers(super_segment_headers.super_segment_headers)
1523                        {
1524                            Ok(_) => Ok(()),
1525                            Err(error) => {
1526                                error!(
1527                                    %page_offset,
1528                                    %num_super_segment_headers,
1529                                    %error,
1530                                    "Failed to add segment headers from storage item"
1531                                );
1532
1533                                Err(ClientDatabaseError::InvalidSegmentHeaders { page_offset })
1534                            }
1535                        };
1536                    }
1537                };
1538
1539                // TODO: It would be nice to not allocate body here since we'll not use it here
1540                //  anyway
1541                let StorageItemTemporaryBlock {
1542                    header,
1543                    body,
1544                    mmr_with_block,
1545                    system_contract_states,
1546                } = storage_item_block;
1547
1548                let header = Block::Header::from_buffer(header).map_err(|_buffer| {
1549                    error!(%page_offset, "Failed to decode block header from bytes");
1550
1551                    ClientDatabaseError::InvalidBlock { page_offset }
1552                })?;
1553                let body = Block::Body::from_buffer(body).map_err(|_buffer| {
1554                    error!(%page_offset, "Failed to decode block body from bytes");
1555
1556                    ClientDatabaseError::InvalidBlock { page_offset }
1557                })?;
1558
1559                let block_root = *header.header().root();
1560                let block_number = header.header().prefix.number;
1561
1562                state_data.block_roots.insert(block_root, block_number);
1563
1564                let maybe_best_number = state_data
1565                    .blocks
1566                    .front()
1567                    .and_then(|block_forks| block_forks.first())
1568                    .map(|best_block| {
1569                        // Type inference is not working here for some reason
1570                        let header: &Block::Header = best_block.header();
1571
1572                        header.header().prefix.number
1573                    });
1574
1575                let block_offset = if let Some(best_number) = maybe_best_number {
1576                    if block_number <= best_number {
1577                        u64::from(best_number - block_number) as usize
1578                    } else {
1579                        // The new best block must follow the previous best block
1580                        if block_number - best_number != BlockNumber::ONE {
1581                            error!(
1582                                %page_offset,
1583                                %best_number,
1584                                %block_number,
1585                                "Invalid new best block number, it must be only one block \
1586                                higher than the best block"
1587                            );
1588
1589                            return Err(ClientDatabaseError::InvalidBlock { page_offset });
1590                        }
1591
1592                        state_data.blocks.push_front(SmallVec::new());
1593                        // Will insert a new block at the front
1594                        0
1595                    }
1596                } else {
1597                    state_data.blocks.push_front(SmallVec::new());
1598                    // Will insert a new block at the front
1599                    0
1600                };
1601
1602                let Some(block_forks) = state_data.blocks.get_mut(block_offset) else {
1603                    // Ignore the older block, other blocks at its height were already pruned
1604                    // anyway
1605                    return Ok(());
1606                };
1607
1608                // Push a new block to the end of the list, we'll fix it up later
1609                let beacon_chain_block_details =
1610                    <dyn Any>::downcast_ref::<OwnedBeaconChainBody>(&body)
1611                        .map(|body| BeaconChainBlockDetails::from_body(body.body()));
1612                block_forks.push(ClientDatabaseBlock::Persisted {
1613                    header,
1614                    block_details: BlockDetails {
1615                        mmr_with_block,
1616                        system_contract_states,
1617                    },
1618                    beacon_chain_block_details,
1619                    write_location: WriteLocation {
1620                        page_offset,
1621                        num_pages,
1622                    },
1623                });
1624
1625                // If a new block was inserted, confirm a new canonical block to prune extra
1626                // in-memory information
1627                if block_offset == 0 && block_forks.len() == 1 {
1628                    Self::confirm_canonical_block(block_number, &mut state_data, &options);
1629                }
1630
1631                Ok(())
1632            },
1633        };
1634
1635        let storage_backend_adapter =
1636            StorageBackendAdapter::open(write_buffer_size, storage_item_handlers, storage_backend)
1637                .await?;
1638
1639        if let Some(best_block) = state_data.blocks.front().and_then(|block_forks| {
1640            // The best block is last in the list here because that is how it was inserted while
1641            // reading from the database
1642            block_forks.last()
1643        }) {
1644            // Type inference is not working here for some reason
1645            let header: &Block::Header = best_block.header();
1646            let header = header.header();
1647            let block_number = header.prefix.number;
1648            let block_root = *header.root();
1649
1650            if !Self::adjust_ancestor_block_forks(&mut state_data.blocks, block_root) {
1651                return Err(ClientDatabaseError::FailedToAdjustAncestorBlockForks);
1652            }
1653
1654            // Store the best block as the first and only fork tip
1655            state_data.fork_tips.push_front(ForkTip {
1656                number: block_number,
1657                root: block_root,
1658            });
1659        } else {
1660            let GenesisBlockBuilderResult {
1661                block,
1662                system_contract_states,
1663            } = genesis_block_builder();
1664
1665            // If the database is empty, initialize everything with the genesis block
1666            let header = block.header().header();
1667            let block_number = header.prefix.number;
1668            let block_root = *header.root();
1669
1670            state_data.fork_tips.push_front(ForkTip {
1671                number: block_number,
1672                root: block_root,
1673            });
1674            state_data.block_roots.insert(block_root, block_number);
1675            let beacon_chain_block_details =
1676                <dyn Any>::downcast_ref::<OwnedBeaconChainBlock>(&block)
1677                    .map(|block| BeaconChainBlockDetails::from_body(block.body.body()));
1678            state_data
1679                .blocks
1680                .push_front(smallvec![ClientDatabaseBlock::InMemory {
1681                    block,
1682                    block_details: BlockDetails {
1683                        system_contract_states,
1684                        mmr_with_block: Arc::new({
1685                            let mut mmr = BlockMerkleMountainRange::new();
1686                            mmr.add_leaf(&block_root);
1687                            mmr
1688                        })
1689                    },
1690                    beacon_chain_block_details,
1691                }]);
1692        }
1693
1694        let state = State {
1695            data: state_data,
1696            segment_headers_cache,
1697            super_segment_headers_cache,
1698            storage_backend_adapter: AsyncRwLock::new(storage_backend_adapter),
1699        };
1700
1701        let inner = Inner {
1702            state: AsyncRwLock::new(state),
1703            options,
1704        };
1705
1706        Ok(Self {
1707            inner: Arc::new(inner),
1708        })
1709    }
1710
1711    /// Format a new database
1712    pub async fn format(
1713        storage_backend: &StorageBackend,
1714        options: ClientDatabaseFormatOptions,
1715    ) -> Result<(), ClientDatabaseFormatError> {
1716        StorageBackendAdapter::format(storage_backend, options).await
1717    }
1718
1719    fn insert_first_block(state: &mut StateData<Block>, block: Block, block_details: BlockDetails) {
1720        // If the database is empty, initialize everything with the genesis block
1721        let header = block.header().header();
1722        let block_number = header.prefix.number;
1723        let block_root = *header.root();
1724
1725        state.fork_tips.clear();
1726        state.fork_tips.push_front(ForkTip {
1727            number: block_number,
1728            root: block_root,
1729        });
1730        state.block_roots.clear();
1731        state.block_roots.insert(block_root, block_number);
1732        state.blocks.clear();
1733        let beacon_chain_block_details = <dyn Any>::downcast_ref::<OwnedBeaconChainBlock>(&block)
1734            .map(|block| BeaconChainBlockDetails::from_body(block.body.body()));
1735        state
1736            .blocks
1737            .push_front(smallvec![ClientDatabaseBlock::InMemory {
1738                block,
1739                block_details,
1740                beacon_chain_block_details,
1741            }]);
1742    }
1743
1744    async fn insert_new_best_block(
1745        mut state: AsyncRwLockWriteGuard<'_, State<Block, StorageBackend>>,
1746        inner: &Inner<Block, StorageBackend>,
1747        block: Block,
1748        block_details: BlockDetails,
1749    ) -> Result<(), PersistBlockError> {
1750        let header = block.header().header();
1751        let block_number = header.prefix.number;
1752        let block_root = *header.root();
1753        let parent_root = header.prefix.parent_root;
1754
1755        // Adjust the relative order of forks to ensure the first index always corresponds to
1756        // ancestors of the new best block
1757        if !Self::adjust_ancestor_block_forks(&mut state.data.blocks, parent_root) {
1758            return Err(PersistBlockError::MissingParent);
1759        }
1760
1761        // Store new block in the state
1762        {
1763            for (index, fork_tip) in state.data.fork_tips.iter_mut().enumerate() {
1764                // Block's parent is no longer a fork tip, remove it
1765                if fork_tip.root == parent_root {
1766                    state.data.fork_tips.remove(index);
1767                    break;
1768                }
1769            }
1770
1771            state.data.fork_tips.push_front(ForkTip {
1772                number: block_number,
1773                root: block_root,
1774            });
1775            state.data.block_roots.insert(block_root, block_number);
1776            let beacon_chain_block_details =
1777                <dyn Any>::downcast_ref::<OwnedBeaconChainBlock>(&block)
1778                    .map(|block| BeaconChainBlockDetails::from_body(block.body.body()));
1779            state
1780                .data
1781                .blocks
1782                .push_front(smallvec![ClientDatabaseBlock::InMemory {
1783                    block,
1784                    block_details: block_details.clone(),
1785                    beacon_chain_block_details,
1786                }]);
1787        }
1788
1789        let options = &inner.options;
1790
1791        Self::confirm_canonical_block(block_number, &mut state.data, options);
1792        Self::prune_outdated_fork_tips(block_number, &mut state.data, options);
1793
1794        // Convert write lock into upgradable read lock to allow reads, while preventing concurrent
1795        // block modifications
1796        // TODO: This assumes both guarantees in https://github.com/smol-rs/async-lock/issues/100
1797        //  are satisfied. If not, blocking read locks in other places will cause issues.
1798        let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1799
1800        let mut blocks_to_persist = Vec::new();
1801        for block_offset in u64::from(options.soft_confirmation_depth) as usize.. {
1802            let Some(fork_blocks) = state.data.blocks.get(block_offset) else {
1803                break;
1804            };
1805
1806            let len_before = blocks_to_persist.len();
1807            fork_blocks
1808                .iter()
1809                .enumerate()
1810                .filter_map(|(fork_offset, client_database_block)| {
1811                    match client_database_block {
1812                        ClientDatabaseBlock::InMemory {
1813                            block,
1814                            block_details,
1815                            beacon_chain_block_details: _,
1816                        } => Some(BlockToPersist {
1817                            block_offset,
1818                            fork_offset,
1819                            block,
1820                            block_details,
1821                        }),
1822                        ClientDatabaseBlock::Persisted { .. }
1823                        | ClientDatabaseBlock::PersistedConfirmed { .. } => {
1824                            // Already persisted
1825                            None
1826                        }
1827                    }
1828                })
1829                .collect_into(&mut blocks_to_persist);
1830
1831            if blocks_to_persist.len() == len_before {
1832                break;
1833            }
1834        }
1835
1836        // Persist blocks from older to newer
1837        let mut persisted_blocks = Vec::with_capacity(blocks_to_persist.len());
1838        {
1839            let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1840
1841            for block_to_persist in blocks_to_persist.into_iter().rev() {
1842                let BlockToPersist {
1843                    block_offset,
1844                    fork_offset,
1845                    block,
1846                    block_details,
1847                } = block_to_persist;
1848
1849                let write_location = storage_backend_adapter
1850                    .write_storage_item(StorageItemTemporary::Block(StorageItemTemporaryBlock {
1851                        header: block.header().buffer().clone(),
1852                        body: block.body().buffer().clone(),
1853                        mmr_with_block: Arc::clone(&block_details.mmr_with_block),
1854                        system_contract_states: StdArc::clone(
1855                            &block_details.system_contract_states,
1856                        ),
1857                    }))
1858                    .await?;
1859
1860                persisted_blocks.push(PersistedBlock {
1861                    block_offset,
1862                    fork_offset,
1863                    write_location,
1864                });
1865            }
1866        }
1867
1868        // Convert blocks to persisted
1869        let mut state = RwLockUpgradableReadGuard::upgrade(state).await;
1870        for persisted_block in persisted_blocks {
1871            let PersistedBlock {
1872                block_offset,
1873                fork_offset,
1874                write_location,
1875            } = persisted_block;
1876
1877            let block = state
1878                .data
1879                .blocks
1880                .get_mut(block_offset)
1881                .expect("Still holding the same lock since last check; qed")
1882                .get_mut(fork_offset)
1883                .expect("Still holding the same lock since last check; qed");
1884
1885            replace_with_or_abort(block, |block| {
1886                if let ClientDatabaseBlock::InMemory {
1887                    block,
1888                    block_details,
1889                    beacon_chain_block_details,
1890                } = block
1891                {
1892                    let (header, _body) = block.split();
1893
1894                    ClientDatabaseBlock::Persisted {
1895                        header,
1896                        block_details,
1897                        beacon_chain_block_details,
1898                        write_location,
1899                    }
1900                } else {
1901                    unreachable!("Still holding the same lock since last check; qed");
1902                }
1903            });
1904        }
1905
1906        // TODO: Prune blocks that are no longer necessary
1907        // TODO: Prune unused page groups here or elsewhere?
1908
1909        Ok(())
1910    }
1911
1912    /// Adjust the relative order of forks to ensure the first index always corresponds to
1913    /// `parent_block_root` and its ancestors.
1914    ///
1915    /// Returns `true` on success and `false` if one of the parents was not found.
1916    #[must_use]
1917    fn adjust_ancestor_block_forks(
1918        blocks: &mut VecDeque<SmallVec<[ClientDatabaseBlock<Block>; 2]>>,
1919        mut parent_block_root: BlockRoot,
1920    ) -> bool {
1921        let mut ancestor_blocks = blocks.iter_mut();
1922
1923        loop {
1924            if ancestor_blocks.len() == 1 {
1925                // Nothing left to adjust with a single fork
1926                break;
1927            }
1928
1929            let Some(parent_blocks) = ancestor_blocks.next() else {
1930                // No more parent headers present
1931                break;
1932            };
1933
1934            let Some(fork_offset_parent_block_root) =
1935                parent_blocks
1936                    .iter()
1937                    .enumerate()
1938                    .find_map(|(fork_offset, fork_block)| {
1939                        let fork_header = fork_block.header().header();
1940                        if *fork_header.root() == parent_block_root {
1941                            Some((fork_offset, fork_header.prefix.parent_root))
1942                        } else {
1943                            None
1944                        }
1945                    })
1946            else {
1947                return false;
1948            };
1949
1950            let fork_offset;
1951            (fork_offset, parent_block_root) = fork_offset_parent_block_root;
1952
1953            parent_blocks.swap(0, fork_offset);
1954        }
1955
1956        true
1957    }
1958
1959    /// Prune outdated fork tips that are too deep and have not been updated for a long time.
1960    ///
1961    /// Note that actual headers, blocks and MMRs could remain if they are currently used by
1962    /// something or were already persisted on disk. With persisted blocks specifically, RAM usage
1963    /// implications are minimal, and we wouldn't want to re-download already stored blocks in case
1964    /// they end up being necessary later.
1965    fn prune_outdated_fork_tips(
1966        best_number: BlockNumber,
1967        state: &mut StateData<Block>,
1968        options: &ClientDatabaseInnerOptions,
1969    ) {
1970        let state = &mut *state;
1971
1972        // These forks are just candidates because they will not be pruned if the reference count is
1973        // not 1, indicating they are still in use by something
1974        let mut candidate_forks_to_remove = Vec::with_capacity(options.max_fork_tips.get());
1975
1976        // Prune forks that are too far away from the best block
1977        state.fork_tips.retain(|fork_tip| {
1978            if best_number - fork_tip.number > options.max_fork_tip_distance {
1979                candidate_forks_to_remove.push(*fork_tip);
1980                false
1981            } else {
1982                true
1983            }
1984        });
1985        // Prune forks that exceed the maximum number of forks
1986        if state.fork_tips.len() > options.max_fork_tips.get() {
1987            state
1988                .fork_tips
1989                .drain(options.max_fork_tips.get()..)
1990                .collect_into(&mut candidate_forks_to_remove);
1991        }
1992
1993        // Prune all possible candidates
1994        candidate_forks_to_remove
1995            .retain(|fork_tip| !Self::prune_outdated_fork(best_number, fork_tip, state));
1996        // Return those that were not pruned back to the list of tips
1997        state.fork_tips.extend(candidate_forks_to_remove);
1998    }
1999
2000    /// Returns `true` if the tip was pruned successfully and `false` if it should be returned to
2001    /// the list of fork tips
2002    #[must_use]
2003    fn prune_outdated_fork(
2004        best_number: BlockNumber,
2005        fork_tip: &ForkTip,
2006        state: &mut StateData<Block>,
2007    ) -> bool {
2008        let block_offset = u64::from(best_number - fork_tip.number) as usize;
2009
2010        // Prune fork top and all its ancestors that are not used
2011        let mut block_root_to_prune = fork_tip.root;
2012        let mut pruned_tip = false;
2013        for block_offset in block_offset.. {
2014            let Some(fork_blocks) = state.blocks.get_mut(block_offset) else {
2015                if !pruned_tip {
2016                    error!(
2017                        %best_number,
2018                        ?fork_tip,
2019                        block_offset,
2020                        "Block offset was not present in the database, this is an implementation \
2021                        bug #1"
2022                    );
2023                }
2024                // No forks left to prune
2025                break;
2026            };
2027
2028            if fork_blocks.len() == 1 {
2029                if !pruned_tip {
2030                    error!(
2031                        %best_number,
2032                        ?fork_tip,
2033                        block_offset,
2034                        "Block offset was not present in the database, this is an implementation \
2035                        bug #2"
2036                    );
2037                }
2038
2039                // No forks left to prune
2040                break;
2041            }
2042
2043            let Some((fork_offset, block)) = fork_blocks
2044                .iter()
2045                .enumerate()
2046                // Skip ancestor of the best block, it is certainly not a fork to be pruned
2047                .skip(1)
2048                .find(|(_fork_offset, block)| {
2049                    *block.header().header().root() == block_root_to_prune
2050                })
2051            else {
2052                if !pruned_tip {
2053                    error!(
2054                        %best_number,
2055                        ?fork_tip,
2056                        block_offset,
2057                        "Block offset was not present in the database, this is an implementation \
2058                        bug #3"
2059                    );
2060                }
2061
2062                // Nothing left to prune
2063                break;
2064            };
2065
2066            // More than one instance means something somewhere is using or depends on this block
2067            if block.header().ref_count() > 1 {
2068                break;
2069            }
2070
2071            // Blocks that are already persisted
2072            match block {
2073                ClientDatabaseBlock::InMemory { .. } => {
2074                    // Prune
2075                }
2076                ClientDatabaseBlock::Persisted { .. }
2077                | ClientDatabaseBlock::PersistedConfirmed { .. } => {
2078                    // Already on disk, keep it in memory for later, but prune the tip
2079                    pruned_tip = true;
2080                    break;
2081                }
2082            }
2083
2084            state.block_roots.get_mut(&block_root_to_prune);
2085            block_root_to_prune = block.header().header().prefix.parent_root;
2086            fork_blocks.swap_remove(fork_offset);
2087
2088            pruned_tip = true;
2089        }
2090
2091        pruned_tip
2092    }
2093
2094    /// Confirm a block at confirmation depth k and prune any other blocks at the same depth with
2095    /// their descendants
2096    fn confirm_canonical_block(
2097        best_number: BlockNumber,
2098        state_data: &mut StateData<Block>,
2099        options: &ClientDatabaseInnerOptions,
2100    ) {
2101        // `+1` means it effectively confirms parent blocks instead. This is done to keep the parent
2102        // of the confirmed block with its MMR in memory due to confirmed blocks not storing their
2103        // MMRs, which might be needed for reorgs at the lowest possible depth.
2104        let block_offset = u64::from(options.block_confirmation_depth + BlockNumber::ONE) as usize;
2105
2106        let Some(fork_blocks) = state_data.blocks.get_mut(block_offset) else {
2107            // Nothing to confirm yet
2108            return;
2109        };
2110
2111        // Mark the canonical block as confirmed
2112        {
2113            let Some(canonical_block) = fork_blocks.first_mut() else {
2114                error!(
2115                    %best_number,
2116                    block_offset,
2117                    "Have not found a canonical block to confirm, this is an implementation bug"
2118                );
2119                return;
2120            };
2121
2122            replace_with_or_abort(canonical_block, |block| match block {
2123                ClientDatabaseBlock::InMemory { .. } => {
2124                    error!(
2125                        %best_number,
2126                        block_offset,
2127                        header = ?block.header(),
2128                        "Block to be confirmed must not be in memory, this is an implementation bug"
2129                    );
2130                    block
2131                }
2132                ClientDatabaseBlock::Persisted {
2133                    header,
2134                    block_details: _,
2135                    beacon_chain_block_details,
2136                    write_location,
2137                } => ClientDatabaseBlock::PersistedConfirmed {
2138                    header,
2139                    beacon_chain_block_details,
2140                    write_location,
2141                },
2142                ClientDatabaseBlock::PersistedConfirmed { .. } => {
2143                    error!(
2144                        %best_number,
2145                        block_offset,
2146                        header = ?block.header(),
2147                        "Block to be confirmed must not be confirmed yet, this is an \
2148                        implementation bug"
2149                    );
2150                    block
2151                }
2152            });
2153        }
2154
2155        // Prune the rest of the blocks and their descendants
2156        let mut block_roots_to_prune = fork_blocks
2157            .drain(1..)
2158            .map(|block| *block.header().header().root())
2159            .collect::<Vec<_>>();
2160        let mut current_block_offset = block_offset;
2161        while !block_roots_to_prune.is_empty() {
2162            // Prune fork tips (if any)
2163            state_data
2164                .fork_tips
2165                .retain(|fork_tip| !block_roots_to_prune.contains(&fork_tip.root));
2166
2167            // Prune removed block roots
2168            for block_root in &block_roots_to_prune {
2169                state_data.block_roots.remove(block_root);
2170            }
2171
2172            // Block offset for direct descendants
2173            if let Some(next_block_offset) = current_block_offset.checked_sub(1) {
2174                current_block_offset = next_block_offset;
2175            } else {
2176                // Reached the tip
2177                break;
2178            }
2179
2180            let fork_blocks = state_data
2181                .blocks
2182                .get_mut(current_block_offset)
2183                .expect("Lower block offset always exists; qed");
2184
2185            // Collect descendants of pruned blocks to prune them next
2186            block_roots_to_prune = fork_blocks
2187                .drain_filter(|block| {
2188                    let header = block.header().header();
2189
2190                    block_roots_to_prune.contains(&header.prefix.parent_root)
2191                })
2192                .map(|block| *block.header().header().root())
2193                .collect();
2194        }
2195    }
2196}