Skip to main content

ab_client_database/
lib.rs

1//! Client database.
2//!
3//! ## High-level architecture overview
4//!
5//! The database operates on [`ClientDatabaseStorageBackend`], which is backed by [`AlignedPage`]s
6//! that can be read or written. Pages contain `StorageItem`s, one storage item can occupy one or
7//! more pages, but pages always belong to a single storage item. Pages are the smallest unit and
8//! align nicely with the hardware architecture of modern SSDs. Each page starts with a prefix that
9//! describes the contents of the page. `StorageItem` always starts at the multiple of the
10//! `u128`/16 bytes, allowing for direct memory mapping onto target data structures.
11//!
12//! [`AlignedPage`]: crate::storage_backend::AlignedPage
13//!
14//! Individual pages are grouped into page groups (configurable via [`ClientDatabaseOptions`]). Page
15//! groups can be permanent and ephemeral. Permanent page groups store information that is never
16//! going to be deleted, like segment headers. Ephemeral page groups store the majority of the
17//! information about blocks, blockchain state and other things that are being created all the time.
18//! Once information in an ephemeral page group is too old and no longer needed, it can be
19//! repurposed for a new permanent or ephemeral page group. There are different kinds of page groups
20//! defined in `PageGroupKind`, and each variant has independent sequence numbers.
21//!
22//! Page groups are append-only, there is only one active permanent and one ephemeral page group.
23//! They are appended with more pages containing storage items until there is no space to add a
24//! complete storage item, after which the next page group is started.
25//!
26//! Ephemeral page groups can be freed only when they contain 100% outdated storage items.
27//! Individual pages can't be freed.
28//!
29//! Each storage item has a sequence number and checksums that help to define the global ordering
30//! and check whether a storage item was written fully. Upon restart, the page group containing the
31//! latest storage items is found, and the latest fully written storage item is identified to
32//! reconstruct the database state.
33//!
34//! Each page group starts with a `StorageItemPageGroupHeader` storage item for easier
35//! identification.
36//!
37//! The database is typically contained in a single file (though in principle could be contained in
38//! multiple if necessary). Before the database can be used, it needs to be formatted with a
39//! specific size (it is possible to increase the size afterward) before it can be used. It is
40//! expected (but depends on the storage backend) that the whole file size is pre-allocated on disk
41//! and no writes will fail due to lack of disk space (which could be the case with a sparse file).
42
43#![expect(incomplete_features, reason = "generic_const_exprs")]
44// TODO: This feature is not actually used in this crate, but is added as a workaround for
45//  https://github.com/rust-lang/rust/issues/141492
46#![feature(generic_const_exprs)]
47#![feature(
48    const_block_items,
49    const_convert,
50    const_trait_impl,
51    default_field_values,
52    get_mut_unchecked,
53    iter_collect_into,
54    maybe_uninit_fill
55)]
56
57mod page_group;
58pub mod storage_backend;
59mod storage_backend_adapter;
60
61use crate::page_group::temporary::StorageItemTemporary;
62use crate::page_group::temporary::block::StorageItemTemporaryBlock;
63use crate::page_group::temporary::segment_headers::StorageItemTemporarySegmentHeaders;
64use crate::page_group::temporary::super_segment_headers::StorageItemTemporarySuperSegmentHeaders;
65use crate::storage_backend::ClientDatabaseStorageBackend;
66use crate::storage_backend_adapter::{
67    StorageBackendAdapter, StorageItemHandlerArg, StorageItemHandlers, WriteLocation,
68};
69use ab_client_api::{
70    BeaconChainInfo, BeaconChainInfoWrite, BlockDetails, BlockMerkleMountainRange, ChainInfo,
71    ChainInfoWrite, ContractSlotState, PersistBlockError, PersistSegmentHeadersError,
72    PersistSuperSegmentHeadersError, ReadBlockError, ShardSegmentRoot, ShardSegmentRootsError,
73};
74use ab_core_primitives::block::body::BeaconChainBody;
75use ab_core_primitives::block::body::owned::{GenericOwnedBlockBody, OwnedBeaconChainBody};
76use ab_core_primitives::block::header::GenericBlockHeader;
77use ab_core_primitives::block::header::owned::GenericOwnedBlockHeader;
78use ab_core_primitives::block::owned::{GenericOwnedBlock, OwnedBeaconChainBlock};
79use ab_core_primitives::block::{BlockNumber, BlockRoot, GenericBlock};
80use ab_core_primitives::segments::{
81    LocalSegmentIndex, SegmentHeader, SegmentIndex, SuperSegmentHeader, SuperSegmentIndex,
82};
83use ab_core_primitives::shard::RealShardKind;
84use ab_io_type::trivial_type::TrivialType;
85use async_lock::{
86    RwLock as AsyncRwLock, RwLockUpgradableReadGuard, RwLockWriteGuard as AsyncRwLockWriteGuard,
87};
88use rand::rngs::SysError;
89use rclite::Arc;
90use replace_with::replace_with_or_abort;
91use smallvec::{SmallVec, smallvec};
92use std::any::Any;
93use std::collections::{HashMap, VecDeque};
94use std::hash::{BuildHasherDefault, Hasher};
95use std::num::{NonZeroU32, NonZeroUsize};
96use std::ops::Deref;
97use std::sync::Arc as StdArc;
98use std::{fmt, io};
99use tracing::error;
100
101/// Unique identifier for a database
102#[derive(Debug, Copy, Clone, Eq, PartialEq, TrivialType)]
103#[repr(C)]
104pub struct DatabaseId([u8; 32]);
105
106impl Deref for DatabaseId {
107    type Target = [u8; 32];
108
109    #[inline(always)]
110    fn deref(&self) -> &Self::Target {
111        &self.0
112    }
113}
114
115impl AsRef<[u8]> for DatabaseId {
116    #[inline(always)]
117    fn as_ref(&self) -> &[u8] {
118        &self.0
119    }
120}
121
122impl DatabaseId {
123    #[inline(always)]
124    pub const fn new(bytes: [u8; 32]) -> Self {
125        Self(bytes)
126    }
127}
128
129#[derive(Default)]
130struct BlockRootHasher(u64);
131
132impl Hasher for BlockRootHasher {
133    #[inline(always)]
134    fn finish(&self) -> u64 {
135        self.0
136    }
137
138    #[inline(always)]
139    fn write(&mut self, bytes: &[u8]) {
140        let Some(state) = bytes.as_chunks().0.first().copied().map(u64::from_le_bytes) else {
141            return;
142        };
143
144        self.0 = state;
145    }
146}
147
148#[derive(Debug)]
149pub struct GenesisBlockBuilderResult<Block> {
150    /// Genesis block
151    pub block: Block,
152    /// System contracts state in the genesis block
153    pub system_contract_states: StdArc<[ContractSlotState]>,
154}
155
156/// Options for [`ClientDatabase`]
157#[derive(Debug, Copy, Clone)]
158pub struct ClientDatabaseOptions<GBB, StorageBackend> {
159    /// Write buffer size.
160    ///
161    /// Larger buffer allows buffering more async writes for improved responsiveness but requires
162    /// more RAM. Zero buffer size means all writes must be completed before returning from the
163    /// operation that triggered it. Non-zero buffer means writes can happen in the background.
164    ///
165    /// The recommended value is 5.
166    pub write_buffer_size: usize = 5,
167    /// Blocks at this depth are considered to be "confirmed" and irreversible from the consensus
168    /// perspective.
169    ///
170    /// This parameter allows establishing a final canonical order of blocks and eliminating any
171    /// potential forks at a specified depth and beyond.
172    pub block_confirmation_depth: BlockNumber,
173    /// Soft confirmation depth for blocks.
174    ///
175    /// Doesn't prevent forking on the consensus level but makes it extremely unlikely.
176    ///
177    /// This parameter determines how many blocks are retained in memory before being written to
178    /// disk. Writing discarded blocks to disk is a waste of resources, so they are retained in
179    /// memory before being soft-confirmed and written to disk for longer-term storage.
180    ///
181    /// A smaller number reduces memory usage while increasing the probability of unnecessary disk
182    /// writes. A larger number increases memory usage, while avoiding unnecessary disk writes, but
183    /// also increases the chance of recent blocks not being retained on disk in case of a crash.
184    ///
185    /// The recommended value is 3 blocks.
186    pub soft_confirmation_depth: BlockNumber = BlockNumber::from(3),
187    /// Defines how many fork tips should be maintained in total.
188    ///
189    /// As natural forks occur, there may be more than one tip in existence, with only one of them
190    /// being considered "canonical". This parameter defines how many of these tips to maintain in a
191    /// sort of LRU style cache. Tips beyond this limit that were not extended for a long time will
192    /// be pruned automatically.
193    ///
194    /// A larger number results in higher memory usage and higher complexity of pruning algorithms.
195    ///
196    /// The recommended value is 3 blocks.
197    pub max_fork_tips: NonZeroUsize = NonZeroUsize::new(3).expect("Not zero; qed"),
198    /// Max distance between fork tip and the best block.
199    ///
200    /// When forks are this deep, they will be pruned, even without reaching the `max_fork_tips`
201    /// limit. This essentially means the tip was not extended for some time, and while it is
202    /// theoretically possible for the chain to continue from this tip, the probability is so small
203    /// that it is not worth storing it.
204    ///
205    /// A larger value results in higher memory usage and higher complexity of pruning algorithms.
206    ///
207    /// The recommended value is 5 blocks.
208    pub max_fork_tip_distance: BlockNumber = BlockNumber::from(5),
209    /// Genesis block builder is responsible to create genesis block and corresponding state for
210    /// bootstrapping purposes.
211    pub genesis_block_builder: GBB,
212    /// Storage backend to use for storing and retrieving storage items
213    pub storage_backend: StorageBackend,
214}
215
216/// Options for [`ClientDatabase`]
217#[derive(Debug, Copy, Clone)]
218pub struct ClientDatabaseFormatOptions {
219    /// The number of [`AlignedPage`]s in a single page group.
220    ///
221    /// [`AlignedPage`]: crate::storage_backend::AlignedPage
222    ///
223    /// Each group always has a set of storage items with monotonically increasing sequence
224    /// numbers. The database only frees page groups for reuse when all storage items there are
225    /// no longer in use.
226    ///
227    /// A smaller number means storage can be reclaimed for reuse more quickly and higher
228    /// concurrency during restart, but must not be too small that no storage item fits within a
229    /// page group anymore. A larger number allows finding the range of sequence numbers that are
230    /// already used and where potential write interruption happened on restart more efficiently,
231    /// but will use more RAM in the process.
232    ///
233    /// The recommended size is 256 MiB unless a tiny database is used for testing purposes, where
234    /// a smaller value might work too.
235    pub page_group_size: NonZeroU32,
236    /// By default, formatting will be aborted if the database appears to be already formatted.
237    ///
238    /// Setting this option to `true` skips the check and formats the database anyway.
239    pub force: bool,
240}
241
242#[derive(Debug, thiserror::Error)]
243pub enum ClientDatabaseError {
244    /// Invalid soft confirmation depth, it must be smaller than confirmation depth k
245    #[error("Invalid soft confirmation depth, it must be smaller than confirmation depth k")]
246    InvalidSoftConfirmationDepth,
247    /// Invalid max fork tip distance, it must be smaller or equal to confirmation depth k
248    #[error("Invalid max fork tip distance, it must be smaller or equal to confirmation depth k")]
249    InvalidMaxForkTipDistance,
250    /// Storage backend has canceled read request
251    #[error("Storage backend has canceled read request")]
252    ReadRequestCancelled,
253    /// Storage backend read error
254    #[error("Storage backend read error: {error}")]
255    ReadError {
256        /// Low-level error
257        error: io::Error,
258    },
259    /// Unsupported database version
260    #[error("Unsupported database version: {database_version}")]
261    UnsupportedDatabaseVersion {
262        /// Database version
263        database_version: u8,
264    },
265    /// Page group size is too small, must be at least two pages
266    #[error("Page group size is too small ({page_group_size}), must be at least two pages")]
267    PageGroupSizeTooSmall {
268        /// Page group size in pages
269        page_group_size: u32,
270    },
271    /// Unexpected sequence number
272    #[error(
273        "Unexpected sequence number {actual} at page offset {page_offset} (expected \
274        {expected})"
275    )]
276    UnexpectedSequenceNumber {
277        /// Sequence number in the database
278        actual: u64,
279        /// Expected sequence number
280        expected: u64,
281        /// Page offset where storage item is found
282        page_offset: u32,
283    },
284    /// Unexpected storage item
285    #[error("Unexpected storage item at offset {page_offset}: {storage_item:?}")]
286    UnexpectedStorageItem {
287        /// First storage item
288        storage_item: Box<dyn fmt::Debug + Send + Sync>,
289        /// Page offset where storage item is found
290        page_offset: u32,
291    },
292    /// Invalid block
293    #[error("Invalid block at offset {page_offset}")]
294    InvalidBlock {
295        /// Page offset where storage item is found
296        page_offset: u32,
297    },
298    /// Invalid segment headers
299    #[error("Invalid segment headers at offset {page_offset}")]
300    InvalidSegmentHeaders {
301        /// Page offset where storage item is found
302        page_offset: u32,
303    },
304    /// Failed to adjust ancestor block forks
305    #[error("Failed to adjust ancestor block forks")]
306    FailedToAdjustAncestorBlockForks,
307    /// Database is not formatted yet
308    #[error("Database is not formatted yet")]
309    Unformatted,
310    /// Non-permanent first page group
311    #[error("Non-permanent first page group")]
312    NonPermanentFirstPageGroup,
313}
314
315/// Error for [`ClientDatabase::format()`]
316#[derive(Debug, thiserror::Error)]
317pub enum ClientDatabaseFormatError {
318    /// Storage backend has canceled read request
319    #[error("Storage backend has canceled read request")]
320    ReadRequestCancelled,
321    /// Storage backend read error
322    #[error("Storage backend read error: {error}")]
323    ReadError {
324        /// Low-level error
325        error: io::Error,
326    },
327    /// Failed to generate database id
328    #[error("Failed to generate database id")]
329    FailedToGenerateDatabaseId {
330        /// Low-level error
331        #[from]
332        error: SysError,
333    },
334    /// Database is already formatted yet
335    #[error("Database is already formatted yet")]
336    AlreadyFormatted,
337    /// Storage backend has canceled a writing request
338    #[error("Storage backend has canceled a writing request")]
339    WriteRequestCancelled,
340    /// Storage item write error
341    #[error("Storage item write error")]
342    StorageItemWriteError {
343        /// Low-level error
344        #[from]
345        error: io::Error,
346    },
347}
348
349#[derive(Debug, Copy, Clone)]
350struct ForkTip {
351    number: BlockNumber,
352    root: BlockRoot,
353}
354
355enum FullBlock<'a, Block>
356where
357    Block: GenericOwnedBlock,
358{
359    InMemory(&'a Block),
360    Persisted {
361        header: &'a Block::Header,
362        write_location: WriteLocation,
363    },
364}
365
366#[derive(Debug)]
367struct BeaconChainBlockDetails {
368    // Shard segment roots, only present for beacon chain blocks
369    shard_segment_roots: StdArc<[ShardSegmentRoot]>,
370}
371
372impl BeaconChainBlockDetails {
373    fn from_body(body: &BeaconChainBody<'_>) -> Self {
374        let shard_segment_roots = body
375            .intermediate_shard_blocks()
376            .iter()
377            .flat_map(|intermediate_shard_block_info| {
378                let own_segments = intermediate_shard_block_info
379                    .own_segments
380                    .into_iter()
381                    .flat_map({
382                        let shard_index = intermediate_shard_block_info.header.prefix.shard_index;
383
384                        move |own_segments| {
385                            (own_segments.first_local_segment_index..)
386                                .zip(own_segments.segment_roots)
387                                .map(move |(segment_index, &segment_root)| ShardSegmentRoot {
388                                    shard_index,
389                                    segment_index,
390                                    segment_root,
391                                })
392                        }
393                    });
394                let child_shard_segment_roots = intermediate_shard_block_info
395                    .leaf_shards_segments()
396                    .flat_map(move |(shard_index, own_segments)| {
397                        (own_segments.first_local_segment_index..)
398                            .zip(own_segments.segment_roots)
399                            .map(move |(segment_index, &segment_root)| ShardSegmentRoot {
400                                shard_index,
401                                segment_index,
402                                segment_root,
403                            })
404                    });
405
406                own_segments.chain(child_shard_segment_roots)
407            })
408            .collect();
409
410        Self {
411            shard_segment_roots,
412        }
413    }
414}
415
416/// Client database block contains details about the block state in the database.
417///
418/// Originally all blocks are stored in memory. Once a block is soft-confirmed (see
419/// [`ClientDatabaseOptions::soft_confirmation_depth`]), it is persisted (likely on disk). Later
420///  when it is "confirmed" fully (see [`ClientDatabaseOptions::soft_confirmation_depth`]), it
421/// becomes irreversible.
422#[derive(Debug)]
423enum ClientDatabaseBlock<Block>
424where
425    Block: GenericOwnedBlock,
426{
427    /// Block is stored in memory and wasn't persisted yet
428    InMemory {
429        block: Block,
430        block_details: BlockDetails,
431        /// Only present for beacon chain blocks
432        beacon_chain_block_details: Option<BeaconChainBlockDetails>,
433    },
434    /// Block was persisted (likely on disk)
435    Persisted {
436        header: Block::Header,
437        block_details: BlockDetails,
438        /// Only present for beacon chain blocks
439        beacon_chain_block_details: Option<BeaconChainBlockDetails>,
440        write_location: WriteLocation,
441    },
442    /// Block was persisted (likely on disk) and is irreversibly "confirmed" from the consensus
443    /// perspective
444    PersistedConfirmed {
445        header: Block::Header,
446        /// Only present for beacon chain blocks
447        beacon_chain_block_details: Option<BeaconChainBlockDetails>,
448        write_location: WriteLocation,
449    },
450}
451
452impl<Block> ClientDatabaseBlock<Block>
453where
454    Block: GenericOwnedBlock,
455{
456    #[inline(always)]
457    fn header(&self) -> &Block::Header {
458        match self {
459            Self::InMemory { block, .. } => block.header(),
460            Self::Persisted { header, .. } | Self::PersistedConfirmed { header, .. } => header,
461        }
462    }
463
464    #[inline(always)]
465    fn full_block(&self) -> FullBlock<'_, Block> {
466        match self {
467            Self::InMemory { block, .. } => FullBlock::InMemory(block),
468            Self::Persisted {
469                header,
470                write_location,
471                ..
472            }
473            | Self::PersistedConfirmed {
474                header,
475                write_location,
476                ..
477            } => FullBlock::Persisted {
478                header,
479                write_location: *write_location,
480            },
481        }
482    }
483
484    #[inline(always)]
485    fn block_details(&self) -> Option<&BlockDetails> {
486        match self {
487            Self::InMemory { block_details, .. } | Self::Persisted { block_details, .. } => {
488                Some(block_details)
489            }
490            Self::PersistedConfirmed { .. } => None,
491        }
492    }
493
494    #[inline(always)]
495    fn beacon_chain_block_details(&self) -> &Option<BeaconChainBlockDetails> {
496        match self {
497            Self::InMemory {
498                beacon_chain_block_details,
499                ..
500            }
501            | Self::Persisted {
502                beacon_chain_block_details,
503                ..
504            }
505            | Self::PersistedConfirmed {
506                beacon_chain_block_details,
507                ..
508            } => beacon_chain_block_details,
509        }
510    }
511}
512
513#[derive(Debug)]
514struct StateData<Block>
515where
516    Block: GenericOwnedBlock,
517{
518    /// Tips of forks that have no descendants.
519    ///
520    /// The current best block is at the front, the rest are in the order from most recently
521    /// updated towards the front to least recently at the back.
522    fork_tips: VecDeque<ForkTip>,
523    /// Map from block root to block number.
524    ///
525    /// Is meant to be used in conjunction with `headers` and `blocks` fields, which are indexed by
526    /// block numbers.
527    block_roots: HashMap<BlockRoot, BlockNumber, BuildHasherDefault<BlockRootHasher>>,
528    /// List of blocks with the newest at the front.
529    ///
530    /// The first element of the first entry corresponds to the best block.
531    ///
532    /// It is expected that in most block numbers there will be exactly one block, some two,
533    /// anything more than that will be very rare. The list of forks for a block number is
534    /// organized in such a way that the first entry at every block number corresponds to the
535    /// canonical version of the blockchain at any point in time.
536    ///
537    /// A position withing this data structure is called "block offset". This is an ephemeral value
538    /// and changes as new best blocks are added. Blocks at the same height are collectively called
539    /// "block forks" and the position of the block within the same block height is called
540    /// "fork offset". While fork offset `0` always corresponds to the canonical version of the
541    /// blockchain, other offsets are not guaranteed to follow any particular ordering rules.
542    blocks: VecDeque<SmallVec<[ClientDatabaseBlock<Block>; 2]>>,
543}
544
545#[derive(Debug)]
546struct SegmentHeadersCache {
547    segment_headers_cache: Vec<SegmentHeader>,
548}
549
550impl SegmentHeadersCache {
551    #[inline(always)]
552    fn last_segment_header(&self) -> Option<SegmentHeader> {
553        self.segment_headers_cache.last().cloned()
554    }
555
556    #[inline(always)]
557    fn max_local_segment_index(&self) -> Option<LocalSegmentIndex> {
558        self.segment_headers_cache
559            .last()
560            .map(|segment_header| segment_header.index.as_inner())
561    }
562
563    #[inline(always)]
564    fn get_segment_header(&self, local_segment_index: LocalSegmentIndex) -> Option<SegmentHeader> {
565        self.segment_headers_cache
566            .get(u64::from(local_segment_index) as usize)
567            .copied()
568    }
569
570    /// Returns actually added segments (some might have been skipped)
571    fn add_segment_headers(
572        &mut self,
573        mut segment_headers: Vec<SegmentHeader>,
574    ) -> Result<Vec<SegmentHeader>, PersistSegmentHeadersError> {
575        self.segment_headers_cache.reserve(segment_headers.len());
576
577        let mut maybe_last_local_segment_index = self.max_local_segment_index();
578
579        if let Some(last_segment_index) = maybe_last_local_segment_index {
580            // Skip already stored segment headers
581            segment_headers
582                .retain(|segment_header| segment_header.index.as_inner() > last_segment_index);
583        }
584
585        // Check all input segment headers to see which ones are not stored yet and verifying that
586        // segment indices are monotonically increasing
587        for segment_header in segment_headers.iter().copied() {
588            let local_segment_index = segment_header.index.as_inner();
589            match maybe_last_local_segment_index {
590                Some(last_local_segment_index) => {
591                    if local_segment_index != last_local_segment_index + LocalSegmentIndex::ONE {
592                        return Err(PersistSegmentHeadersError::MustFollowLastSegmentIndex {
593                            local_segment_index,
594                            last_local_segment_index,
595                        });
596                    }
597
598                    self.segment_headers_cache.push(segment_header);
599                    maybe_last_local_segment_index.replace(local_segment_index);
600                }
601                None => {
602                    if local_segment_index != LocalSegmentIndex::ZERO {
603                        return Err(PersistSegmentHeadersError::FirstSegmentIndexZero {
604                            local_segment_index,
605                        });
606                    }
607
608                    self.segment_headers_cache.push(segment_header);
609                    maybe_last_local_segment_index.replace(local_segment_index);
610                }
611            }
612        }
613
614        Ok(segment_headers)
615    }
616}
617
618#[derive(Debug)]
619struct SuperSegmentHeadersCache {
620    super_segment_headers_cache: Vec<SuperSegmentHeader>,
621}
622
623impl SuperSegmentHeadersCache {
624    #[inline(always)]
625    fn last_super_segment_header(&self) -> Option<SuperSegmentHeader> {
626        self.super_segment_headers_cache.last().cloned()
627    }
628
629    #[inline]
630    fn previous_super_segment_header(
631        &self,
632        target_block_number: BlockNumber,
633    ) -> Option<SuperSegmentHeader> {
634        let block_number = target_block_number.checked_sub(BlockNumber::ONE)?;
635        let index = match self.super_segment_headers_cache.binary_search_by_key(
636            &block_number,
637            |super_segment_header| {
638                super_segment_header
639                    .target_beacon_chain_block_number
640                    .as_inner()
641            },
642        ) {
643            Ok(found_index) => found_index,
644            Err(insert_index) => insert_index.checked_sub(1)?,
645        };
646
647        self.super_segment_headers_cache.get(index).copied()
648    }
649
650    #[inline(always)]
651    fn get_super_segment_header(
652        &self,
653        local_segment_index: SuperSegmentIndex,
654    ) -> Option<SuperSegmentHeader> {
655        self.super_segment_headers_cache
656            .get(u64::from(local_segment_index) as usize)
657            .copied()
658    }
659
660    #[inline(always)]
661    fn get_super_segment_header_for_segment_index(
662        &self,
663        segment_index: SegmentIndex,
664    ) -> Option<SuperSegmentHeader> {
665        let index = self
666            .super_segment_headers_cache
667            .binary_search_by_key(&segment_index, |super_segment_header| {
668                super_segment_header.max_segment_index.as_inner()
669            })
670            .unwrap_or_else(|insert_index| insert_index);
671
672        let super_segment_header = self.super_segment_headers_cache.get(index).copied()?;
673
674        let max_segment_index = super_segment_header.max_segment_index.as_inner();
675        let first_segment_index = max_segment_index
676            - SegmentIndex::from(u64::from(super_segment_header.num_segments))
677            + SegmentIndex::ONE;
678
679        (first_segment_index..=max_segment_index)
680            .contains(&segment_index)
681            .then_some(super_segment_header)
682    }
683
684    /// Returns actually added super segments (some might have been skipped)
685    fn add_super_segment_headers(
686        &mut self,
687        mut super_segment_headers: Vec<SuperSegmentHeader>,
688    ) -> Result<Vec<SuperSegmentHeader>, PersistSuperSegmentHeadersError> {
689        self.super_segment_headers_cache
690            .reserve(super_segment_headers.len());
691
692        let mut maybe_last_super_segment_index = self
693            .super_segment_headers_cache
694            .last()
695            .map(|header| header.index.as_inner());
696
697        if let Some(last_super_segment_index) = maybe_last_super_segment_index {
698            // Skip already stored super segment headers
699            super_segment_headers.retain(|super_segment_header| {
700                super_segment_header.index.as_inner() > last_super_segment_index
701            });
702        }
703
704        // Check all input super segment headers to see which ones are not stored yet and verifying
705        // that super segment indices are monotonically increasing
706        for super_segment_header in super_segment_headers.iter().copied() {
707            let super_segment_index = super_segment_header.index.as_inner();
708            match maybe_last_super_segment_index {
709                Some(last_super_segment_index) => {
710                    if super_segment_index != last_super_segment_index + SuperSegmentIndex::ONE {
711                        return Err(
712                            PersistSuperSegmentHeadersError::MustFollowLastSegmentIndex {
713                                super_segment_index,
714                                last_super_segment_index,
715                            },
716                        );
717                    }
718
719                    self.super_segment_headers_cache.push(super_segment_header);
720                    maybe_last_super_segment_index.replace(super_segment_index);
721                }
722                None => {
723                    if super_segment_index != SuperSegmentIndex::ZERO {
724                        return Err(PersistSuperSegmentHeadersError::FirstSegmentIndexZero {
725                            super_segment_index,
726                        });
727                    }
728
729                    self.super_segment_headers_cache.push(super_segment_header);
730                    maybe_last_super_segment_index.replace(super_segment_index);
731                }
732            }
733        }
734
735        Ok(super_segment_headers)
736    }
737}
738
739// TODO: Hide implementation details
740#[derive(Debug)]
741struct State<Block, StorageBackend>
742where
743    Block: GenericOwnedBlock,
744{
745    data: StateData<Block>,
746    segment_headers_cache: SegmentHeadersCache,
747    super_segment_headers_cache: SuperSegmentHeadersCache,
748    storage_backend_adapter: AsyncRwLock<StorageBackendAdapter<StorageBackend>>,
749}
750
751impl<Block, StorageBackend> State<Block, StorageBackend>
752where
753    Block: GenericOwnedBlock,
754{
755    #[inline(always)]
756    fn best_tip(&self) -> &ForkTip {
757        self.data
758            .fork_tips
759            .front()
760            .expect("The best block is always present; qed")
761    }
762
763    #[inline(always)]
764    fn best_block(&self) -> &ClientDatabaseBlock<Block> {
765        self.data
766            .blocks
767            .front()
768            .expect("The best block is always present; qed")
769            .first()
770            .expect("The best block is always present; qed")
771    }
772}
773
774#[derive(Debug)]
775struct BlockToPersist<'a, Block>
776where
777    Block: GenericOwnedBlock,
778{
779    block_offset: usize,
780    fork_offset: usize,
781    block: &'a Block,
782    block_details: &'a BlockDetails,
783}
784
785#[derive(Debug)]
786struct PersistedBlock {
787    block_offset: usize,
788    fork_offset: usize,
789    write_location: WriteLocation,
790}
791
792#[derive(Debug)]
793struct ClientDatabaseInnerOptions {
794    block_confirmation_depth: BlockNumber,
795    soft_confirmation_depth: BlockNumber,
796    max_fork_tips: NonZeroUsize,
797    max_fork_tip_distance: BlockNumber,
798}
799
800#[derive(Debug)]
801struct Inner<Block, StorageBackend>
802where
803    Block: GenericOwnedBlock,
804{
805    state: AsyncRwLock<State<Block, StorageBackend>>,
806    options: ClientDatabaseInnerOptions,
807}
808
809/// Client database
810#[derive(Debug)]
811pub struct ClientDatabase<Block, StorageBackend>
812where
813    Block: GenericOwnedBlock,
814{
815    inner: Arc<Inner<Block, StorageBackend>>,
816}
817
818impl<Block, StorageBackend> Clone for ClientDatabase<Block, StorageBackend>
819where
820    Block: GenericOwnedBlock,
821{
822    fn clone(&self) -> Self {
823        Self {
824            inner: self.inner.clone(),
825        }
826    }
827}
828
829impl<Block, StorageBackend> Drop for ClientDatabase<Block, StorageBackend>
830where
831    Block: GenericOwnedBlock,
832{
833    fn drop(&mut self) {
834        // TODO: Persist things that were not persisted yet to reduce the data loss on shutdown
835    }
836}
837
838impl<Block, StorageBackend> ChainInfo<Block> for ClientDatabase<Block, StorageBackend>
839where
840    Block: GenericOwnedBlock,
841    StorageBackend: ClientDatabaseStorageBackend,
842{
843    #[inline]
844    fn best_root(&self) -> BlockRoot {
845        // Blocking read lock is fine because where a write lock is only taken for a short time and
846        // most locks are read locks
847        self.inner.state.read_blocking().best_tip().root
848    }
849
850    #[inline]
851    fn best_header(&self) -> Block::Header {
852        // Blocking read lock is fine because where a write lock is only taken for a short time and
853        // most locks are read locks
854        self.inner
855            .state
856            .read_blocking()
857            .best_block()
858            .header()
859            .clone()
860    }
861
862    #[inline]
863    fn best_header_with_details(&self) -> (Block::Header, BlockDetails) {
864        // Blocking read lock is fine because where a write lock is only taken for a short time and
865        // most locks are read locks
866        let state = self.inner.state.read_blocking();
867        let best_block = state.best_block();
868        (
869            best_block.header().clone(),
870            best_block
871                .block_details()
872                .expect("Always present for the best block; qed")
873                .clone(),
874        )
875    }
876
877    // TODO: Add fast path when `descendant_block_root` is the best block
878    #[inline]
879    fn ancestor_header(
880        &self,
881        ancestor_block_number: BlockNumber,
882        descendant_block_root: &BlockRoot,
883    ) -> Option<Block::Header> {
884        // Blocking read lock is fine because where a write lock is only taken for a short time and
885        // most locks are read locks
886        let state = self.inner.state.read_blocking();
887        let best_number = state.best_tip().number;
888
889        let ancestor_block_offset =
890            u64::from(best_number.checked_sub(ancestor_block_number)?) as usize;
891        let ancestor_block_candidates = state.data.blocks.get(ancestor_block_offset)?;
892
893        let descendant_block_number = *state.data.block_roots.get(descendant_block_root)?;
894        if ancestor_block_number > descendant_block_number {
895            return None;
896        }
897        let descendant_block_offset =
898            u64::from(best_number.checked_sub(descendant_block_number)?) as usize;
899
900        // Range of blocks where the first item is expected to contain a descendant
901        let mut blocks_range_iter = state
902            .data
903            .blocks
904            .iter()
905            .enumerate()
906            .skip(descendant_block_offset);
907
908        let (_offset, descendant_block_candidates) = blocks_range_iter.next()?;
909        let descendant_header = descendant_block_candidates
910            .iter()
911            .find(|block| &*block.header().header().root() == descendant_block_root)?
912            .header()
913            .header();
914
915        // If there are no forks at this level, then this is the canonical chain and ancestor
916        // block number we're looking for is the first block at the corresponding block number.
917        // Similarly, if there is just a single ancestor candidate and descendant exists, it must be
918        // the one we care about.
919        if descendant_block_candidates.len() == 1 || ancestor_block_candidates.len() == 1 {
920            return ancestor_block_candidates
921                .iter()
922                .next()
923                .map(|block| block.header().clone());
924        }
925
926        let mut parent_block_root = &descendant_header.prefix.parent_root;
927
928        // Iterate over the blocks following descendant until ancestor is reached
929        for (block_offset, parent_candidates) in blocks_range_iter {
930            let parent_header = parent_candidates
931                .iter()
932                .find(|header| &*header.header().header().root() == parent_block_root)?
933                .header();
934
935            // When header offset matches, we found the header
936            if block_offset == ancestor_block_offset {
937                return Some(parent_header.clone());
938            }
939
940            parent_block_root = &parent_header.header().prefix.parent_root;
941        }
942
943        None
944    }
945
946    #[inline]
947    fn header(&self, block_root: &BlockRoot) -> Option<Block::Header> {
948        // Blocking read lock is fine because where a write lock is only taken for a short time and
949        // most locks are read locks
950        let state = self.inner.state.read_blocking();
951        let best_number = state.best_tip().number;
952
953        let block_number = *state.data.block_roots.get(block_root)?;
954        let block_offset = u64::from(best_number.checked_sub(block_number)?) as usize;
955        let block_candidates = state.data.blocks.get(block_offset)?;
956
957        block_candidates.iter().find_map(|block| {
958            let header = block.header();
959
960            if &*header.header().root() == block_root {
961                Some(header.clone())
962            } else {
963                None
964            }
965        })
966    }
967
968    #[inline]
969    fn header_with_details(&self, block_root: &BlockRoot) -> Option<(Block::Header, BlockDetails)> {
970        // Blocking read lock is fine because where a write lock is only taken for a short time and
971        // most locks are read locks
972        let state = self.inner.state.read_blocking();
973        let best_number = state.best_tip().number;
974
975        let block_number = *state.data.block_roots.get(block_root)?;
976        let block_offset = u64::from(best_number.checked_sub(block_number)?) as usize;
977        let block_candidates = state.data.blocks.get(block_offset)?;
978
979        block_candidates.iter().find_map(|block| {
980            let header = block.header();
981            let block_details = block.block_details().cloned()?;
982
983            if &*header.header().root() == block_root {
984                Some((header.clone(), block_details))
985            } else {
986                None
987            }
988        })
989    }
990
991    #[inline]
992    async fn block(&self, block_root: &BlockRoot) -> Result<Block, ReadBlockError> {
993        let state = self.inner.state.read().await;
994        let best_number = state.best_tip().number;
995
996        let block_number = *state
997            .data
998            .block_roots
999            .get(block_root)
1000            .ok_or(ReadBlockError::UnknownBlockRoot)?;
1001        let block_offset = u64::from(
1002            best_number
1003                .checked_sub(block_number)
1004                .expect("Known block roots always have valid block offset; qed"),
1005        ) as usize;
1006        let block_candidates = state
1007            .data
1008            .blocks
1009            .get(block_offset)
1010            .expect("Valid block offsets always have block entries; qed");
1011
1012        for block_candidate in block_candidates {
1013            let header = block_candidate.header();
1014
1015            if &*header.header().root() == block_root {
1016                return match block_candidate.full_block() {
1017                    FullBlock::InMemory(block) => Ok(block.clone()),
1018                    FullBlock::Persisted {
1019                        header,
1020                        write_location,
1021                    } => {
1022                        let storage_backend_adapter = state.storage_backend_adapter.read().await;
1023
1024                        let storage_item = storage_backend_adapter
1025                            .read_storage_item::<StorageItemTemporary>(write_location)
1026                            .await?;
1027
1028                        let storage_item_block = match storage_item {
1029                            StorageItemTemporary::Block(storage_item_block) => storage_item_block,
1030                            StorageItemTemporary::SegmentHeaders(_) => {
1031                                return Err(ReadBlockError::StorageItemReadError {
1032                                    error: io::Error::other(
1033                                        "Unexpected storage item: `SegmentHeaders`",
1034                                    ),
1035                                });
1036                            }
1037                            StorageItemTemporary::SuperSegmentHeaders(_) => {
1038                                return Err(ReadBlockError::StorageItemReadError {
1039                                    error: io::Error::other(
1040                                        "Unexpected storage item: `SuperSegmentHeaders`",
1041                                    ),
1042                                });
1043                            }
1044                        };
1045
1046                        let StorageItemTemporaryBlock {
1047                            header: _,
1048                            body,
1049                            mmr_with_block: _,
1050                            system_contract_states: _,
1051                        } = storage_item_block;
1052
1053                        Block::from_buffers(header.buffer().clone(), body)
1054                            .ok_or(ReadBlockError::FailedToDecode)
1055                    }
1056                };
1057            }
1058        }
1059
1060        unreachable!("Known block root always has block candidate associated with it; qed")
1061    }
1062
1063    #[inline]
1064    fn last_segment_header(&self) -> Option<SegmentHeader> {
1065        // Blocking read lock is fine because where a write lock is only taken for a short time and
1066        // most locks are read locks
1067        let state = self.inner.state.read_blocking();
1068        state.segment_headers_cache.last_segment_header()
1069    }
1070
1071    #[inline]
1072    fn get_segment_header(&self, segment_index: LocalSegmentIndex) -> Option<SegmentHeader> {
1073        // Blocking read lock is fine because where a write lock is only taken for a short time and
1074        // most locks are read locks
1075        let state = self.inner.state.read_blocking();
1076
1077        state
1078            .segment_headers_cache
1079            .get_segment_header(segment_index)
1080    }
1081
1082    fn segment_headers_for_block(&self, block_number: BlockNumber) -> Vec<SegmentHeader> {
1083        // Blocking read lock is fine because where a write lock is only taken for a short time and
1084        // most locks are read locks
1085        let state = self.inner.state.read_blocking();
1086
1087        let Some(last_local_segment_index) = state.segment_headers_cache.max_local_segment_index()
1088        else {
1089            // Not initialized
1090            return Vec::new();
1091        };
1092
1093        // Special case for the initial segment (for beacon chain genesis block)
1094        if Block::Block::SHARD_KIND == RealShardKind::BeaconChain
1095            && block_number == BlockNumber::ONE
1096        {
1097            // If there is a segment index present, and we store monotonically increasing segment
1098            // headers, then the first header exists
1099            return vec![
1100                state
1101                    .segment_headers_cache
1102                    .get_segment_header(LocalSegmentIndex::ZERO)
1103                    .expect("Segment headers are stored in monotonically increasing order; qed"),
1104            ];
1105        }
1106
1107        if last_local_segment_index == LocalSegmentIndex::ZERO {
1108            // Genesis segment already included in block #1
1109            return Vec::new();
1110        }
1111
1112        let mut current_local_segment_index = last_local_segment_index;
1113        loop {
1114            // If the current segment index present, and we store monotonically increasing segment
1115            // headers, then the current segment header exists as well.
1116            let current_segment_header = state
1117                .segment_headers_cache
1118                .get_segment_header(current_local_segment_index)
1119                .expect("Segment headers are stored in monotonically increasing order; qed");
1120
1121            // The block immediately after the archived segment adding the confirmation depth
1122            let target_block_number = current_segment_header.last_archived_block.number()
1123                + BlockNumber::ONE
1124                + self.inner.options.block_confirmation_depth;
1125            if target_block_number == block_number {
1126                let mut headers_for_block = vec![current_segment_header];
1127
1128                // Check block spanning multiple segments
1129                let last_archived_block_number = current_segment_header.last_archived_block.number;
1130                let mut local_segment_index = current_local_segment_index - LocalSegmentIndex::ONE;
1131
1132                while let Some(segment_header) = state
1133                    .segment_headers_cache
1134                    .get_segment_header(local_segment_index)
1135                {
1136                    if segment_header.last_archived_block.number == last_archived_block_number {
1137                        headers_for_block.insert(0, segment_header);
1138                        local_segment_index -= LocalSegmentIndex::ONE;
1139                    } else {
1140                        break;
1141                    }
1142                }
1143
1144                return headers_for_block;
1145            }
1146
1147            // iterate segments further
1148            if target_block_number > block_number {
1149                // no need to check the initial segment
1150                if current_local_segment_index > LocalSegmentIndex::ONE {
1151                    current_local_segment_index -= LocalSegmentIndex::ONE
1152                } else {
1153                    break;
1154                }
1155            } else {
1156                // No segment headers required
1157                return Vec::new();
1158            }
1159        }
1160
1161        // No segment headers required
1162        Vec::new()
1163    }
1164}
1165
1166impl<Block, StorageBackend> ChainInfoWrite<Block> for ClientDatabase<Block, StorageBackend>
1167where
1168    Block: GenericOwnedBlock,
1169    StorageBackend: ClientDatabaseStorageBackend,
1170{
1171    async fn persist_block(
1172        &self,
1173        block: Block,
1174        block_details: BlockDetails,
1175    ) -> Result<(), PersistBlockError> {
1176        let mut state = self.inner.state.write().await;
1177        let best_number = state.best_tip().number;
1178
1179        let header = block.header().header();
1180
1181        let block_number = header.prefix.number;
1182
1183        if best_number == BlockNumber::ZERO && block_number != BlockNumber::ONE {
1184            // Special case when syncing on top of the fresh database
1185            Self::insert_first_block(&mut state.data, block, block_details);
1186
1187            return Ok(());
1188        }
1189
1190        if block_number == best_number + BlockNumber::ONE {
1191            return Self::insert_new_best_block(state, &self.inner, block, block_details).await;
1192        }
1193
1194        let block_offset = u64::from(
1195            best_number
1196                .checked_sub(block_number)
1197                .ok_or(PersistBlockError::MissingParent)?,
1198        ) as usize;
1199
1200        if block_offset >= u64::from(self.inner.options.block_confirmation_depth) as usize {
1201            return Err(PersistBlockError::OutsideAcceptableRange);
1202        }
1203
1204        let state = &mut *state;
1205
1206        let block_forks = state.data.blocks.get_mut(block_offset).ok_or_else(|| {
1207            error!(
1208                %block_number,
1209                %block_offset,
1210                "Failed to store block fork, header offset is missing despite being within \
1211                acceptable range"
1212            );
1213
1214            PersistBlockError::OutsideAcceptableRange
1215        })?;
1216
1217        for (index, fork_tip) in state.data.fork_tips.iter_mut().enumerate() {
1218            // Block's parent is no longer a fork tip, remove it
1219            if fork_tip.root == header.prefix.parent_root {
1220                state.data.fork_tips.remove(index);
1221                break;
1222            }
1223        }
1224
1225        let block_root = *header.root();
1226        // Insert at position 1, which means the most recent tip, which doesn't correspond to
1227        // the best block
1228        state.data.fork_tips.insert(
1229            1,
1230            ForkTip {
1231                number: block_number,
1232                root: block_root,
1233            },
1234        );
1235        state.data.block_roots.insert(block_root, block_number);
1236        let beacon_chain_block_details = <dyn Any>::downcast_ref::<OwnedBeaconChainBlock>(&block)
1237            .map(|block| BeaconChainBlockDetails::from_body(block.body.body()));
1238        block_forks.push(ClientDatabaseBlock::InMemory {
1239            block,
1240            block_details,
1241            beacon_chain_block_details,
1242        });
1243
1244        Self::prune_outdated_fork_tips(block_number, &mut state.data, &self.inner.options);
1245
1246        Ok(())
1247    }
1248
1249    async fn persist_segment_headers(
1250        &self,
1251        segment_headers: Vec<SegmentHeader>,
1252    ) -> Result<(), PersistSegmentHeadersError> {
1253        let mut state = self.inner.state.write().await;
1254
1255        let added_segment_headers = state
1256            .segment_headers_cache
1257            .add_segment_headers(segment_headers)?;
1258
1259        if added_segment_headers.is_empty() {
1260            return Ok(());
1261        }
1262
1263        // Convert write lock into upgradable read lock to allow reads, while preventing segment
1264        // headers modifications
1265        // TODO: This assumes both guarantees in https://github.com/smol-rs/async-lock/issues/100
1266        //  are satisfied. If not, blocking read locks in other places will cause issues.
1267        let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1268
1269        let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1270
1271        storage_backend_adapter
1272            .write_storage_item(StorageItemTemporary::SegmentHeaders(
1273                StorageItemTemporarySegmentHeaders {
1274                    segment_headers: added_segment_headers,
1275                },
1276            ))
1277            .await?;
1278
1279        Ok(())
1280    }
1281}
1282
1283impl<StorageBackend> BeaconChainInfo for ClientDatabase<OwnedBeaconChainBlock, StorageBackend>
1284where
1285    StorageBackend: ClientDatabaseStorageBackend,
1286{
1287    fn shard_segment_roots(
1288        &self,
1289        block_number: BlockNumber,
1290    ) -> Result<StdArc<[ShardSegmentRoot]>, ShardSegmentRootsError> {
1291        // Blocking read lock is fine because where a write lock is only taken for a short time and
1292        // most locks are read locks
1293        let state = self.inner.state.read_blocking();
1294        let best_number = state.best_tip().number;
1295
1296        let block_offset = u64::from(
1297            best_number
1298                .checked_sub(block_number)
1299                .ok_or(ShardSegmentRootsError::BlockMissing { block_number })?,
1300        ) as usize;
1301
1302        let block = state
1303            .data
1304            .blocks
1305            .get(block_offset)
1306            .ok_or(ShardSegmentRootsError::BlockMissing { block_number })?
1307            .first()
1308            .expect("There is always at least one block candidate; qed");
1309
1310        Ok(StdArc::clone(
1311            &block
1312                .beacon_chain_block_details()
1313                .as_ref()
1314                .expect("Always present in the beacon chain block; qed")
1315                .shard_segment_roots,
1316        ))
1317    }
1318
1319    #[inline]
1320    fn last_super_segment_header(&self) -> Option<SuperSegmentHeader> {
1321        // Blocking read lock is fine because where a write lock is only taken for a short time and
1322        // most locks are read locks
1323        let state = self.inner.state.read_blocking();
1324        state
1325            .super_segment_headers_cache
1326            .last_super_segment_header()
1327    }
1328
1329    #[inline]
1330    fn previous_super_segment_header(
1331        &self,
1332        target_block_number: BlockNumber,
1333    ) -> Option<SuperSegmentHeader> {
1334        // Blocking read lock is fine because where a write lock is only taken for a short time and
1335        // most locks are read locks
1336        let state = self.inner.state.read_blocking();
1337
1338        state
1339            .super_segment_headers_cache
1340            .previous_super_segment_header(target_block_number)
1341    }
1342
1343    #[inline]
1344    fn get_super_segment_header(
1345        &self,
1346        super_segment_index: SuperSegmentIndex,
1347    ) -> Option<SuperSegmentHeader> {
1348        // Blocking read lock is fine because where a write lock is only taken for a short time and
1349        // most locks are read locks
1350        let state = self.inner.state.read_blocking();
1351
1352        state
1353            .super_segment_headers_cache
1354            .get_super_segment_header(super_segment_index)
1355    }
1356
1357    fn get_super_segment_header_for_segment_index(
1358        &self,
1359        segment_index: SegmentIndex,
1360    ) -> Option<SuperSegmentHeader> {
1361        // Blocking read lock is fine because where a write lock is only taken for a short time and
1362        // most locks are read locks
1363        let state = self.inner.state.read_blocking();
1364
1365        state
1366            .super_segment_headers_cache
1367            .get_super_segment_header_for_segment_index(segment_index)
1368    }
1369}
1370
1371impl<StorageBackend> BeaconChainInfoWrite for ClientDatabase<OwnedBeaconChainBlock, StorageBackend>
1372where
1373    StorageBackend: ClientDatabaseStorageBackend,
1374{
1375    async fn persist_super_segment_header(
1376        &self,
1377        super_segment_header: SuperSegmentHeader,
1378    ) -> Result<bool, PersistSuperSegmentHeadersError> {
1379        let mut state = self.inner.state.write().await;
1380
1381        let added_super_segment_headers = state
1382            .super_segment_headers_cache
1383            .add_super_segment_headers(vec![super_segment_header])?;
1384
1385        if added_super_segment_headers.is_empty() {
1386            return Ok(false);
1387        }
1388
1389        // Convert write lock into upgradable read lock to allow reads, while preventing super
1390        // segment headers modifications
1391        // TODO: This assumes both guarantees in https://github.com/smol-rs/async-lock/issues/100
1392        //  are satisfied. If not, blocking read locks in other places will cause issues.
1393        let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1394
1395        let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1396
1397        storage_backend_adapter
1398            .write_storage_item(StorageItemTemporary::SuperSegmentHeaders(
1399                StorageItemTemporarySuperSegmentHeaders {
1400                    super_segment_headers: added_super_segment_headers,
1401                },
1402            ))
1403            .await?;
1404
1405        Ok(true)
1406    }
1407
1408    async fn persist_super_segment_headers(
1409        &self,
1410        super_segment_headers: Vec<SuperSegmentHeader>,
1411    ) -> Result<(), PersistSuperSegmentHeadersError> {
1412        let mut state = self.inner.state.write().await;
1413
1414        let added_super_segment_headers = state
1415            .super_segment_headers_cache
1416            .add_super_segment_headers(super_segment_headers)?;
1417
1418        if added_super_segment_headers.is_empty() {
1419            return Ok(());
1420        }
1421
1422        // Convert write lock into upgradable read lock to allow reads, while preventing super
1423        // segment headers modifications
1424        // TODO: This assumes both guarantees in https://github.com/smol-rs/async-lock/issues/100
1425        //  are satisfied. If not, blocking read locks in other places will cause issues.
1426        let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1427
1428        let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1429
1430        storage_backend_adapter
1431            .write_storage_item(StorageItemTemporary::SuperSegmentHeaders(
1432                StorageItemTemporarySuperSegmentHeaders {
1433                    super_segment_headers: added_super_segment_headers,
1434                },
1435            ))
1436            .await?;
1437
1438        Ok(())
1439    }
1440}
1441
1442impl<Block, StorageBackend> ClientDatabase<Block, StorageBackend>
1443where
1444    Block: GenericOwnedBlock,
1445    StorageBackend: ClientDatabaseStorageBackend,
1446{
1447    /// Open the existing database.
1448    ///
1449    /// NOTE: The database needs to be formatted with [`Self::format()`] before it can be used.
1450    pub async fn open<GBB>(
1451        options: ClientDatabaseOptions<GBB, StorageBackend>,
1452    ) -> Result<Self, ClientDatabaseError>
1453    where
1454        GBB: FnOnce() -> GenesisBlockBuilderResult<Block>,
1455    {
1456        let ClientDatabaseOptions {
1457            write_buffer_size,
1458            block_confirmation_depth,
1459            soft_confirmation_depth,
1460            max_fork_tips,
1461            max_fork_tip_distance,
1462            genesis_block_builder,
1463            storage_backend,
1464        } = options;
1465        if soft_confirmation_depth >= block_confirmation_depth {
1466            return Err(ClientDatabaseError::InvalidSoftConfirmationDepth);
1467        }
1468
1469        if max_fork_tip_distance > block_confirmation_depth {
1470            return Err(ClientDatabaseError::InvalidMaxForkTipDistance);
1471        }
1472
1473        let mut state_data = StateData {
1474            fork_tips: VecDeque::new(),
1475            block_roots: HashMap::default(),
1476            blocks: VecDeque::new(),
1477        };
1478        let mut segment_headers_cache = SegmentHeadersCache {
1479            segment_headers_cache: Vec::new(),
1480        };
1481        let mut super_segment_headers_cache = SuperSegmentHeadersCache {
1482            super_segment_headers_cache: Vec::new(),
1483        };
1484
1485        let options = ClientDatabaseInnerOptions {
1486            block_confirmation_depth,
1487            soft_confirmation_depth,
1488            max_fork_tips,
1489            max_fork_tip_distance,
1490        };
1491
1492        let storage_item_handlers = StorageItemHandlers {
1493            permanent: |_arg| {
1494                // TODO
1495                Ok(())
1496            },
1497            temporary: |arg| {
1498                let StorageItemHandlerArg {
1499                    storage_item,
1500                    page_offset,
1501                    num_pages,
1502                } = arg;
1503                let storage_item_block = match storage_item {
1504                    StorageItemTemporary::Block(storage_item_block) => storage_item_block,
1505                    StorageItemTemporary::SegmentHeaders(segment_headers) => {
1506                        let num_segment_headers = segment_headers.segment_headers.len();
1507                        return match segment_headers_cache
1508                            .add_segment_headers(segment_headers.segment_headers)
1509                        {
1510                            Ok(_) => Ok(()),
1511                            Err(error) => {
1512                                error!(
1513                                    %page_offset,
1514                                    %num_segment_headers,
1515                                    %error,
1516                                    "Failed to add segment headers from storage item"
1517                                );
1518
1519                                Err(ClientDatabaseError::InvalidSegmentHeaders { page_offset })
1520                            }
1521                        };
1522                    }
1523                    StorageItemTemporary::SuperSegmentHeaders(super_segment_headers) => {
1524                        let num_super_segment_headers =
1525                            super_segment_headers.super_segment_headers.len();
1526                        return match super_segment_headers_cache
1527                            .add_super_segment_headers(super_segment_headers.super_segment_headers)
1528                        {
1529                            Ok(_) => Ok(()),
1530                            Err(error) => {
1531                                error!(
1532                                    %page_offset,
1533                                    %num_super_segment_headers,
1534                                    %error,
1535                                    "Failed to add segment headers from storage item"
1536                                );
1537
1538                                Err(ClientDatabaseError::InvalidSegmentHeaders { page_offset })
1539                            }
1540                        };
1541                    }
1542                };
1543
1544                // TODO: It would be nice to not allocate body here since we'll not use it here
1545                //  anyway
1546                let StorageItemTemporaryBlock {
1547                    header,
1548                    body,
1549                    mmr_with_block,
1550                    system_contract_states,
1551                } = storage_item_block;
1552
1553                let header = Block::Header::from_buffer(header).map_err(|_buffer| {
1554                    error!(%page_offset, "Failed to decode block header from bytes");
1555
1556                    ClientDatabaseError::InvalidBlock { page_offset }
1557                })?;
1558                let body = Block::Body::from_buffer(body).map_err(|_buffer| {
1559                    error!(%page_offset, "Failed to decode block body from bytes");
1560
1561                    ClientDatabaseError::InvalidBlock { page_offset }
1562                })?;
1563
1564                let block_root = *header.header().root();
1565                let block_number = header.header().prefix.number;
1566
1567                state_data.block_roots.insert(block_root, block_number);
1568
1569                let maybe_best_number = state_data
1570                    .blocks
1571                    .front()
1572                    .and_then(|block_forks| block_forks.first())
1573                    .map(|best_block| {
1574                        // Type inference is not working here for some reason
1575                        let header: &Block::Header = best_block.header();
1576
1577                        header.header().prefix.number
1578                    });
1579
1580                let block_offset = if let Some(best_number) = maybe_best_number {
1581                    if block_number <= best_number {
1582                        u64::from(best_number - block_number) as usize
1583                    } else {
1584                        // The new best block must follow the previous best block
1585                        if block_number - best_number != BlockNumber::ONE {
1586                            error!(
1587                                %page_offset,
1588                                %best_number,
1589                                %block_number,
1590                                "Invalid new best block number, it must be only one block \
1591                                higher than the best block"
1592                            );
1593
1594                            return Err(ClientDatabaseError::InvalidBlock { page_offset });
1595                        }
1596
1597                        state_data.blocks.push_front(SmallVec::new());
1598                        // Will insert a new block at the front
1599                        0
1600                    }
1601                } else {
1602                    state_data.blocks.push_front(SmallVec::new());
1603                    // Will insert a new block at the front
1604                    0
1605                };
1606
1607                let block_forks = match state_data.blocks.get_mut(block_offset) {
1608                    Some(block_forks) => block_forks,
1609                    None => {
1610                        // Ignore the older block, other blocks at its height were already pruned
1611                        // anyway
1612
1613                        return Ok(());
1614                    }
1615                };
1616
1617                // Push a new block to the end of the list, we'll fix it up later
1618                let beacon_chain_block_details =
1619                    <dyn Any>::downcast_ref::<OwnedBeaconChainBody>(&body)
1620                        .map(|body| BeaconChainBlockDetails::from_body(body.body()));
1621                block_forks.push(ClientDatabaseBlock::Persisted {
1622                    header,
1623                    block_details: BlockDetails {
1624                        mmr_with_block,
1625                        system_contract_states,
1626                    },
1627                    beacon_chain_block_details,
1628                    write_location: WriteLocation {
1629                        page_offset,
1630                        num_pages,
1631                    },
1632                });
1633
1634                // If a new block was inserted, confirm a new canonical block to prune extra
1635                // in-memory information
1636                if block_offset == 0 && block_forks.len() == 1 {
1637                    Self::confirm_canonical_block(block_number, &mut state_data, &options);
1638                }
1639
1640                Ok(())
1641            },
1642        };
1643
1644        let storage_backend_adapter =
1645            StorageBackendAdapter::open(write_buffer_size, storage_item_handlers, storage_backend)
1646                .await?;
1647
1648        if let Some(best_block) = state_data.blocks.front().and_then(|block_forks| {
1649            // The best block is last in the list here because that is how it was inserted while
1650            // reading from the database
1651            block_forks.last()
1652        }) {
1653            // Type inference is not working here for some reason
1654            let header: &Block::Header = best_block.header();
1655            let header = header.header();
1656            let block_number = header.prefix.number;
1657            let block_root = *header.root();
1658
1659            if !Self::adjust_ancestor_block_forks(&mut state_data.blocks, block_root) {
1660                return Err(ClientDatabaseError::FailedToAdjustAncestorBlockForks);
1661            }
1662
1663            // Store the best block as the first and only fork tip
1664            state_data.fork_tips.push_front(ForkTip {
1665                number: block_number,
1666                root: block_root,
1667            });
1668        } else {
1669            let GenesisBlockBuilderResult {
1670                block,
1671                system_contract_states,
1672            } = genesis_block_builder();
1673
1674            // If the database is empty, initialize everything with the genesis block
1675            let header = block.header().header();
1676            let block_number = header.prefix.number;
1677            let block_root = *header.root();
1678
1679            state_data.fork_tips.push_front(ForkTip {
1680                number: block_number,
1681                root: block_root,
1682            });
1683            state_data.block_roots.insert(block_root, block_number);
1684            let beacon_chain_block_details =
1685                <dyn Any>::downcast_ref::<OwnedBeaconChainBlock>(&block)
1686                    .map(|block| BeaconChainBlockDetails::from_body(block.body.body()));
1687            state_data
1688                .blocks
1689                .push_front(smallvec![ClientDatabaseBlock::InMemory {
1690                    block,
1691                    block_details: BlockDetails {
1692                        system_contract_states,
1693                        mmr_with_block: Arc::new({
1694                            let mut mmr = BlockMerkleMountainRange::new();
1695                            mmr.add_leaf(&block_root);
1696                            mmr
1697                        })
1698                    },
1699                    beacon_chain_block_details,
1700                }]);
1701        }
1702
1703        let state = State {
1704            data: state_data,
1705            segment_headers_cache,
1706            super_segment_headers_cache,
1707            storage_backend_adapter: AsyncRwLock::new(storage_backend_adapter),
1708        };
1709
1710        let inner = Inner {
1711            state: AsyncRwLock::new(state),
1712            options,
1713        };
1714
1715        Ok(Self {
1716            inner: Arc::new(inner),
1717        })
1718    }
1719
1720    /// Format a new database
1721    pub async fn format(
1722        storage_backend: &StorageBackend,
1723        options: ClientDatabaseFormatOptions,
1724    ) -> Result<(), ClientDatabaseFormatError> {
1725        StorageBackendAdapter::format(storage_backend, options).await
1726    }
1727
1728    fn insert_first_block(state: &mut StateData<Block>, block: Block, block_details: BlockDetails) {
1729        // If the database is empty, initialize everything with the genesis block
1730        let header = block.header().header();
1731        let block_number = header.prefix.number;
1732        let block_root = *header.root();
1733
1734        state.fork_tips.clear();
1735        state.fork_tips.push_front(ForkTip {
1736            number: block_number,
1737            root: block_root,
1738        });
1739        state.block_roots.clear();
1740        state.block_roots.insert(block_root, block_number);
1741        state.blocks.clear();
1742        let beacon_chain_block_details = <dyn Any>::downcast_ref::<OwnedBeaconChainBlock>(&block)
1743            .map(|block| BeaconChainBlockDetails::from_body(block.body.body()));
1744        state
1745            .blocks
1746            .push_front(smallvec![ClientDatabaseBlock::InMemory {
1747                block,
1748                block_details,
1749                beacon_chain_block_details,
1750            }]);
1751    }
1752
1753    async fn insert_new_best_block(
1754        mut state: AsyncRwLockWriteGuard<'_, State<Block, StorageBackend>>,
1755        inner: &Inner<Block, StorageBackend>,
1756        block: Block,
1757        block_details: BlockDetails,
1758    ) -> Result<(), PersistBlockError> {
1759        let header = block.header().header();
1760        let block_number = header.prefix.number;
1761        let block_root = *header.root();
1762        let parent_root = header.prefix.parent_root;
1763
1764        // Adjust the relative order of forks to ensure the first index always corresponds to
1765        // ancestors of the new best block
1766        if !Self::adjust_ancestor_block_forks(&mut state.data.blocks, parent_root) {
1767            return Err(PersistBlockError::MissingParent);
1768        }
1769
1770        // Store new block in the state
1771        {
1772            for (index, fork_tip) in state.data.fork_tips.iter_mut().enumerate() {
1773                // Block's parent is no longer a fork tip, remove it
1774                if fork_tip.root == parent_root {
1775                    state.data.fork_tips.remove(index);
1776                    break;
1777                }
1778            }
1779
1780            state.data.fork_tips.push_front(ForkTip {
1781                number: block_number,
1782                root: block_root,
1783            });
1784            state.data.block_roots.insert(block_root, block_number);
1785            let beacon_chain_block_details =
1786                <dyn Any>::downcast_ref::<OwnedBeaconChainBlock>(&block)
1787                    .map(|block| BeaconChainBlockDetails::from_body(block.body.body()));
1788            state
1789                .data
1790                .blocks
1791                .push_front(smallvec![ClientDatabaseBlock::InMemory {
1792                    block,
1793                    block_details: block_details.clone(),
1794                    beacon_chain_block_details,
1795                }]);
1796        }
1797
1798        let options = &inner.options;
1799
1800        Self::confirm_canonical_block(block_number, &mut state.data, options);
1801        Self::prune_outdated_fork_tips(block_number, &mut state.data, options);
1802
1803        // Convert write lock into upgradable read lock to allow reads, while preventing concurrent
1804        // block modifications
1805        // TODO: This assumes both guarantees in https://github.com/smol-rs/async-lock/issues/100
1806        //  are satisfied. If not, blocking read locks in other places will cause issues.
1807        let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1808
1809        let mut blocks_to_persist = Vec::new();
1810        for block_offset in u64::from(options.soft_confirmation_depth) as usize.. {
1811            let Some(fork_blocks) = state.data.blocks.get(block_offset) else {
1812                break;
1813            };
1814
1815            let len_before = blocks_to_persist.len();
1816            fork_blocks
1817                .iter()
1818                .enumerate()
1819                .filter_map(|(fork_offset, client_database_block)| {
1820                    match client_database_block {
1821                        ClientDatabaseBlock::InMemory {
1822                            block,
1823                            block_details,
1824                            beacon_chain_block_details: _,
1825                        } => Some(BlockToPersist {
1826                            block_offset,
1827                            fork_offset,
1828                            block,
1829                            block_details,
1830                        }),
1831                        ClientDatabaseBlock::Persisted { .. }
1832                        | ClientDatabaseBlock::PersistedConfirmed { .. } => {
1833                            // Already persisted
1834                            None
1835                        }
1836                    }
1837                })
1838                .collect_into(&mut blocks_to_persist);
1839
1840            if blocks_to_persist.len() == len_before {
1841                break;
1842            }
1843        }
1844
1845        // Persist blocks from older to newer
1846        let mut persisted_blocks = Vec::with_capacity(blocks_to_persist.len());
1847        {
1848            let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1849
1850            for block_to_persist in blocks_to_persist.into_iter().rev() {
1851                let BlockToPersist {
1852                    block_offset,
1853                    fork_offset,
1854                    block,
1855                    block_details,
1856                } = block_to_persist;
1857
1858                let write_location = storage_backend_adapter
1859                    .write_storage_item(StorageItemTemporary::Block(StorageItemTemporaryBlock {
1860                        header: block.header().buffer().clone(),
1861                        body: block.body().buffer().clone(),
1862                        mmr_with_block: Arc::clone(&block_details.mmr_with_block),
1863                        system_contract_states: StdArc::clone(
1864                            &block_details.system_contract_states,
1865                        ),
1866                    }))
1867                    .await?;
1868
1869                persisted_blocks.push(PersistedBlock {
1870                    block_offset,
1871                    fork_offset,
1872                    write_location,
1873                });
1874            }
1875        }
1876
1877        // Convert blocks to persisted
1878        let mut state = RwLockUpgradableReadGuard::upgrade(state).await;
1879        for persisted_block in persisted_blocks {
1880            let PersistedBlock {
1881                block_offset,
1882                fork_offset,
1883                write_location,
1884            } = persisted_block;
1885
1886            let block = state
1887                .data
1888                .blocks
1889                .get_mut(block_offset)
1890                .expect("Still holding the same lock since last check; qed")
1891                .get_mut(fork_offset)
1892                .expect("Still holding the same lock since last check; qed");
1893
1894            replace_with_or_abort(block, |block| {
1895                if let ClientDatabaseBlock::InMemory {
1896                    block,
1897                    block_details,
1898                    beacon_chain_block_details,
1899                } = block
1900                {
1901                    let (header, _body) = block.split();
1902
1903                    ClientDatabaseBlock::Persisted {
1904                        header,
1905                        block_details,
1906                        beacon_chain_block_details,
1907                        write_location,
1908                    }
1909                } else {
1910                    unreachable!("Still holding the same lock since last check; qed");
1911                }
1912            });
1913        }
1914
1915        // TODO: Prune blocks that are no longer necessary
1916        // TODO: Prune unused page groups here or elsewhere?
1917
1918        Ok(())
1919    }
1920
1921    /// Adjust the relative order of forks to ensure the first index always corresponds to
1922    /// `parent_block_root` and its ancestors.
1923    ///
1924    /// Returns `true` on success and `false` if one of the parents was not found.
1925    #[must_use]
1926    fn adjust_ancestor_block_forks(
1927        blocks: &mut VecDeque<SmallVec<[ClientDatabaseBlock<Block>; 2]>>,
1928        mut parent_block_root: BlockRoot,
1929    ) -> bool {
1930        let mut ancestor_blocks = blocks.iter_mut();
1931
1932        loop {
1933            if ancestor_blocks.len() == 1 {
1934                // Nothing left to adjust with a single fork
1935                break;
1936            }
1937
1938            let Some(parent_blocks) = ancestor_blocks.next() else {
1939                // No more parent headers present
1940                break;
1941            };
1942
1943            let Some(fork_offset_parent_block_root) =
1944                parent_blocks
1945                    .iter()
1946                    .enumerate()
1947                    .find_map(|(fork_offset, fork_block)| {
1948                        let fork_header = fork_block.header().header();
1949                        if *fork_header.root() == parent_block_root {
1950                            Some((fork_offset, fork_header.prefix.parent_root))
1951                        } else {
1952                            None
1953                        }
1954                    })
1955            else {
1956                return false;
1957            };
1958
1959            let fork_offset;
1960            (fork_offset, parent_block_root) = fork_offset_parent_block_root;
1961
1962            parent_blocks.swap(0, fork_offset);
1963        }
1964
1965        true
1966    }
1967
1968    /// Prune outdated fork tips that are too deep and have not been updated for a long time.
1969    ///
1970    /// Note that actual headers, blocks and MMRs could remain if they are currently used by
1971    /// something or were already persisted on disk. With persisted blocks specifically, RAM usage
1972    /// implications are minimal, and we wouldn't want to re-download already stored blocks in case
1973    /// they end up being necessary later.
1974    fn prune_outdated_fork_tips(
1975        best_number: BlockNumber,
1976        state: &mut StateData<Block>,
1977        options: &ClientDatabaseInnerOptions,
1978    ) {
1979        let state = &mut *state;
1980
1981        // These forks are just candidates because they will not be pruned if the reference count is
1982        // not 1, indicating they are still in use by something
1983        let mut candidate_forks_to_remove = Vec::with_capacity(options.max_fork_tips.get());
1984
1985        // Prune forks that are too far away from the best block
1986        state.fork_tips.retain(|fork_tip| {
1987            if best_number - fork_tip.number > options.max_fork_tip_distance {
1988                candidate_forks_to_remove.push(*fork_tip);
1989                false
1990            } else {
1991                true
1992            }
1993        });
1994        // Prune forks that exceed the maximum number of forks
1995        if state.fork_tips.len() > options.max_fork_tips.get() {
1996            state
1997                .fork_tips
1998                .drain(options.max_fork_tips.get()..)
1999                .collect_into(&mut candidate_forks_to_remove);
2000        }
2001
2002        // Prune all possible candidates
2003        candidate_forks_to_remove
2004            .retain(|fork_tip| !Self::prune_outdated_fork(best_number, fork_tip, state));
2005        // Return those that were not pruned back to the list of tips
2006        state.fork_tips.extend(candidate_forks_to_remove);
2007    }
2008
2009    /// Returns `true` if the tip was pruned successfully and `false` if it should be returned to
2010    /// the list of fork tips
2011    #[must_use]
2012    fn prune_outdated_fork(
2013        best_number: BlockNumber,
2014        fork_tip: &ForkTip,
2015        state: &mut StateData<Block>,
2016    ) -> bool {
2017        let block_offset = u64::from(best_number - fork_tip.number) as usize;
2018
2019        // Prune fork top and all its ancestors that are not used
2020        let mut block_root_to_prune = fork_tip.root;
2021        let mut pruned_tip = false;
2022        for block_offset in block_offset.. {
2023            let Some(fork_blocks) = state.blocks.get_mut(block_offset) else {
2024                if !pruned_tip {
2025                    error!(
2026                        %best_number,
2027                        ?fork_tip,
2028                        block_offset,
2029                        "Block offset was not present in the database, this is an implementation \
2030                        bug #1"
2031                    );
2032                }
2033                // No forks left to prune
2034                break;
2035            };
2036
2037            if fork_blocks.len() == 1 {
2038                if !pruned_tip {
2039                    error!(
2040                        %best_number,
2041                        ?fork_tip,
2042                        block_offset,
2043                        "Block offset was not present in the database, this is an implementation \
2044                        bug #2"
2045                    );
2046                }
2047
2048                // No forks left to prune
2049                break;
2050            }
2051
2052            let Some((fork_offset, block)) = fork_blocks
2053                .iter()
2054                .enumerate()
2055                // Skip ancestor of the best block, it is certainly not a fork to be pruned
2056                .skip(1)
2057                .find(|(_fork_offset, block)| {
2058                    *block.header().header().root() == block_root_to_prune
2059                })
2060            else {
2061                if !pruned_tip {
2062                    error!(
2063                        %best_number,
2064                        ?fork_tip,
2065                        block_offset,
2066                        "Block offset was not present in the database, this is an implementation \
2067                        bug #3"
2068                    );
2069                }
2070
2071                // Nothing left to prune
2072                break;
2073            };
2074
2075            // More than one instance means something somewhere is using or depends on this block
2076            if block.header().ref_count() > 1 {
2077                break;
2078            }
2079
2080            // Blocks that are already persisted
2081            match block {
2082                ClientDatabaseBlock::InMemory { .. } => {
2083                    // Prune
2084                }
2085                ClientDatabaseBlock::Persisted { .. }
2086                | ClientDatabaseBlock::PersistedConfirmed { .. } => {
2087                    // Already on disk, keep it in memory for later, but prune the tip
2088                    pruned_tip = true;
2089                    break;
2090                }
2091            }
2092
2093            state.block_roots.get_mut(&block_root_to_prune);
2094            block_root_to_prune = block.header().header().prefix.parent_root;
2095            fork_blocks.swap_remove(fork_offset);
2096
2097            pruned_tip = true;
2098        }
2099
2100        pruned_tip
2101    }
2102
2103    /// Confirm a block at confirmation depth k and prune any other blocks at the same depth with
2104    /// their descendants
2105    fn confirm_canonical_block(
2106        best_number: BlockNumber,
2107        state_data: &mut StateData<Block>,
2108        options: &ClientDatabaseInnerOptions,
2109    ) {
2110        // `+1` means it effectively confirms parent blocks instead. This is done to keep the parent
2111        // of the confirmed block with its MMR in memory due to confirmed blocks not storing their
2112        // MMRs, which might be needed for reorgs at the lowest possible depth.
2113        let block_offset = u64::from(options.block_confirmation_depth + BlockNumber::ONE) as usize;
2114
2115        let Some(fork_blocks) = state_data.blocks.get_mut(block_offset) else {
2116            // Nothing to confirm yet
2117            return;
2118        };
2119
2120        // Mark the canonical block as confirmed
2121        {
2122            let Some(canonical_block) = fork_blocks.first_mut() else {
2123                error!(
2124                    %best_number,
2125                    block_offset,
2126                    "Have not found a canonical block to confirm, this is an implementation bug"
2127                );
2128                return;
2129            };
2130
2131            replace_with_or_abort(canonical_block, |block| match block {
2132                ClientDatabaseBlock::InMemory { .. } => {
2133                    error!(
2134                        %best_number,
2135                        block_offset,
2136                        header = ?block.header(),
2137                        "Block to be confirmed must not be in memory, this is an implementation bug"
2138                    );
2139                    block
2140                }
2141                ClientDatabaseBlock::Persisted {
2142                    header,
2143                    block_details: _,
2144                    beacon_chain_block_details,
2145                    write_location,
2146                } => ClientDatabaseBlock::PersistedConfirmed {
2147                    header,
2148                    beacon_chain_block_details,
2149                    write_location,
2150                },
2151                ClientDatabaseBlock::PersistedConfirmed { .. } => {
2152                    error!(
2153                        %best_number,
2154                        block_offset,
2155                        header = ?block.header(),
2156                        "Block to be confirmed must not be confirmed yet, this is an \
2157                        implementation bug"
2158                    );
2159                    block
2160                }
2161            });
2162        }
2163
2164        // Prune the rest of the blocks and their descendants
2165        let mut block_roots_to_prune = fork_blocks
2166            .drain(1..)
2167            .map(|block| *block.header().header().root())
2168            .collect::<Vec<_>>();
2169        let mut current_block_offset = block_offset;
2170        while !block_roots_to_prune.is_empty() {
2171            // Prune fork tips (if any)
2172            state_data
2173                .fork_tips
2174                .retain(|fork_tip| !block_roots_to_prune.contains(&fork_tip.root));
2175
2176            // Prune removed block roots
2177            for block_root in &block_roots_to_prune {
2178                state_data.block_roots.remove(block_root);
2179            }
2180
2181            // Block offset for direct descendants
2182            if let Some(next_block_offset) = current_block_offset.checked_sub(1) {
2183                current_block_offset = next_block_offset;
2184            } else {
2185                // Reached the tip
2186                break;
2187            }
2188
2189            let fork_blocks = state_data
2190                .blocks
2191                .get_mut(current_block_offset)
2192                .expect("Lower block offset always exists; qed");
2193
2194            // Collect descendants of pruned blocks to prune them next
2195            block_roots_to_prune = fork_blocks
2196                .drain_filter(|block| {
2197                    let header = block.header().header();
2198
2199                    block_roots_to_prune.contains(&header.prefix.parent_root)
2200                })
2201                .map(|block| *block.header().header().root())
2202                .collect();
2203        }
2204    }
2205}