Skip to main content

ab_client_database/
lib.rs

1//! Client database.
2//!
3//! ## High-level architecture overview
4//!
5//! The database operates on [`ClientDatabaseStorageBackend`], which is backed by [`AlignedPage`]s
6//! that can be read or written. Pages contain `StorageItem`s, one storage item can occupy one or
7//! more pages, but pages always belong to a single storage item. Pages are the smallest unit and
8//! align nicely with the hardware architecture of modern SSDs. Each page starts with a prefix that
9//! describes the contents of the page. `StorageItem` always starts at the multiple of the
10//! `u128`/16 bytes, allowing for direct memory mapping onto target data structures.
11//!
12//! [`AlignedPage`]: crate::storage_backend::AlignedPage
13//!
14//! Individual pages are grouped into page groups (configurable via [`ClientDatabaseOptions`]). Page
15//! groups can be permanent and ephemeral. Permanent page groups store information that is never
16//! going to be deleted, like segment headers. Ephemeral page groups store the majority of the
17//! information about blocks, blockchain state and other things that are being created all the time.
18//! Once information in an ephemeral page group is too old and no longer needed, it can be
19//! repurposed for a new permanent or ephemeral page group. There are different kinds of page groups
20//! defined in `PageGroupKind`, and each variant has independent sequence numbers.
21//!
22//! Page groups are append-only, there is only one active permanent and one ephemeral page group.
23//! They are appended with more pages containing storage items until there is no space to add a
24//! complete storage item, after which the next page group is started.
25//!
26//! Ephemeral page groups can be freed only when they contain 100% outdated storage items.
27//! Individual pages can't be freed.
28//!
29//! Each storage item has a sequence number and checksums that help to define the global ordering
30//! and check whether a storage item was written fully. Upon restart, the page group containing the
31//! latest storage items is found, and the latest fully written storage item is identified to
32//! reconstruct the database state.
33//!
34//! Each page group starts with a `StorageItemPageGroupHeader` storage item for easier
35//! identification.
36//!
37//! The database is typically contained in a single file (though in principle could be contained in
38//! multiple if necessary). Before the database can be used, it needs to be formatted with a
39//! specific size (it is possible to increase the size afterward) before it can be used. It is
40//! expected (but depends on the storage backend) that the whole file size is pre-allocated on disk
41//! and no writes will fail due to lack of disk space (which could be the case with a sparse file).
42
43#![expect(incomplete_features, reason = "generic_const_exprs")]
44// TODO: This feature is not actually used in this crate, but is added as a workaround for
45//  https://github.com/rust-lang/rust/issues/141492
46#![feature(generic_const_exprs)]
47#![feature(
48    const_block_items,
49    default_field_values,
50    get_mut_unchecked,
51    iter_collect_into,
52    maybe_uninit_as_bytes,
53    maybe_uninit_fill,
54    try_blocks
55)]
56
57mod page_group;
58pub mod storage_backend;
59mod storage_backend_adapter;
60
61use crate::page_group::block::StorageItemBlock;
62use crate::page_group::block::block::StorageItemBlockBlock;
63use crate::page_group::block::segment_headers::StorageItemBlockSegmentHeaders;
64use crate::storage_backend::ClientDatabaseStorageBackend;
65use crate::storage_backend_adapter::{
66    StorageBackendAdapter, StorageItemHandlerArg, StorageItemHandlers, WriteLocation,
67};
68use ab_client_api::{
69    BlockDetails, BlockMerkleMountainRange, ChainInfo, ChainInfoWrite, ContractSlotState,
70    PersistBlockError, PersistSegmentHeadersError, ReadBlockError,
71};
72use ab_core_primitives::block::body::owned::GenericOwnedBlockBody;
73use ab_core_primitives::block::header::GenericBlockHeader;
74use ab_core_primitives::block::header::owned::GenericOwnedBlockHeader;
75use ab_core_primitives::block::owned::GenericOwnedBlock;
76use ab_core_primitives::block::{BlockNumber, BlockRoot, GenericBlock};
77use ab_core_primitives::segments::{LocalSegmentIndex, SegmentHeader};
78use ab_core_primitives::shard::RealShardKind;
79use ab_io_type::trivial_type::TrivialType;
80use async_lock::{
81    RwLock as AsyncRwLock, RwLockUpgradableReadGuard, RwLockWriteGuard as AsyncRwLockWriteGuard,
82};
83use rand::rngs::SysError;
84use rclite::Arc;
85use replace_with::replace_with_or_abort;
86use smallvec::{SmallVec, smallvec};
87use std::collections::{HashMap, VecDeque};
88use std::hash::{BuildHasherDefault, Hasher};
89use std::num::{NonZeroU32, NonZeroUsize};
90use std::ops::Deref;
91use std::sync::Arc as StdArc;
92use std::{fmt, io};
93use tracing::error;
94
95/// Unique identifier for a database
96#[derive(Debug, Copy, Clone, Eq, PartialEq, TrivialType)]
97#[repr(C)]
98pub struct DatabaseId([u8; 32]);
99
100impl Deref for DatabaseId {
101    type Target = [u8; 32];
102
103    #[inline(always)]
104    fn deref(&self) -> &Self::Target {
105        &self.0
106    }
107}
108
109impl AsRef<[u8]> for DatabaseId {
110    #[inline(always)]
111    fn as_ref(&self) -> &[u8] {
112        &self.0
113    }
114}
115
116impl DatabaseId {
117    #[inline(always)]
118    pub const fn new(bytes: [u8; 32]) -> Self {
119        Self(bytes)
120    }
121}
122
123#[derive(Default)]
124struct BlockRootHasher(u64);
125
126impl Hasher for BlockRootHasher {
127    #[inline(always)]
128    fn finish(&self) -> u64 {
129        self.0
130    }
131
132    #[inline(always)]
133    fn write(&mut self, bytes: &[u8]) {
134        let Some(state) = bytes.as_chunks().0.first().copied().map(u64::from_le_bytes) else {
135            return;
136        };
137
138        self.0 = state;
139    }
140}
141
142#[derive(Debug)]
143pub struct GenesisBlockBuilderResult<Block> {
144    /// Genesis block
145    pub block: Block,
146    /// System contracts state in the genesis block
147    pub system_contract_states: StdArc<[ContractSlotState]>,
148}
149
150/// Options for [`ClientDatabase`]
151#[derive(Debug, Copy, Clone)]
152pub struct ClientDatabaseOptions<GBB, StorageBackend> {
153    /// Write buffer size.
154    ///
155    /// Larger buffer allows buffering more async writes for improved responsiveness but requires
156    /// more RAM. Zero buffer size means all writes must be completed before returning from the
157    /// operation that triggered it. Non-zero buffer means writes can happen in the background.
158    ///
159    /// The recommended value is 5.
160    pub write_buffer_size: usize = 5,
161    /// Blocks at this depth are considered to be "confirmed" and irreversible from the consensus
162    /// perspective.
163    ///
164    /// This parameter allows establishing a final canonical order of blocks and eliminating any
165    /// potential forks at a specified depth and beyond.
166    pub confirmation_depth_k: BlockNumber,
167    /// Soft confirmation depth for blocks.
168    ///
169    /// Doesn't prevent forking on the consensus level but makes it extremely unlikely.
170    ///
171    /// This parameter determines how many blocks are retained in memory before being written to
172    /// disk. Writing discarded blocks to disk is a waste of resources, so they are retained in
173    /// memory before being soft-confirmed and written to disk for longer-term storage.
174    ///
175    /// A smaller number reduces memory usage while increasing the probability of unnecessary disk
176    /// writes. A larger number increases memory usage, while avoiding unnecessary disk writes, but
177    /// also increases the chance of recent blocks not being retained on disk in case of a crash.
178    ///
179    /// The recommended value is 3 blocks.
180    pub soft_confirmation_depth: BlockNumber = BlockNumber::new(3),
181    /// Defines how many fork tips should be maintained in total.
182    ///
183    /// As natural forks occur, there may be more than one tip in existence, with only one of them
184    /// being considered "canonical". This parameter defines how many of these tips to maintain in a
185    /// sort of LRU style cache. Tips beyond this limit that were not extended for a long time will
186    /// be pruned automatically.
187    ///
188    /// A larger number results in higher memory usage and higher complexity of pruning algorithms.
189    ///
190    /// The recommended value is 3 blocks.
191    pub max_fork_tips: NonZeroUsize = NonZeroUsize::new(3).expect("Not zero; qed"),
192    /// Max distance between fork tip and the best block.
193    ///
194    /// When forks are this deep, they will be pruned, even without reaching the `max_fork_tips`
195    /// limit. This essentially means the tip was not extended for some time, and while it is
196    /// theoretically possible for the chain to continue from this tip, the probability is so small
197    /// that it is not worth storing it.
198    ///
199    /// A larger value results in higher memory usage and higher complexity of pruning algorithms.
200    ///
201    /// The recommended value is 5 blocks.
202    pub max_fork_tip_distance: BlockNumber = BlockNumber::new(5),
203    /// Genesis block builder is responsible to create genesis block and corresponding state for
204    /// bootstrapping purposes.
205    pub genesis_block_builder: GBB,
206    /// Storage backend to use for storing and retrieving storage items
207    pub storage_backend: StorageBackend,
208}
209
210/// Options for [`ClientDatabase`]
211#[derive(Debug, Copy, Clone)]
212pub struct ClientDatabaseFormatOptions {
213    /// The number of [`AlignedPage`]s in a single page group.
214    ///
215    /// [`AlignedPage`]: crate::storage_backend::AlignedPage
216    ///
217    /// Each group always has a set of storage items with monotonically increasing sequence
218    /// numbers. The database only frees page groups for reuse when all storage items there are
219    /// no longer in use.
220    ///
221    /// A smaller number means storage can be reclaimed for reuse more quickly and higher
222    /// concurrency during restart, but must not be too small that no storage item fits within a
223    /// page group anymore. A larger number allows finding the range of sequence numbers that are
224    /// already used and where potential write interruption happened on restart more efficiently,
225    /// but will use more RAM in the process.
226    ///
227    /// The recommended size is 256 MiB unless a tiny database is used for testing purposes, where
228    /// a smaller value might work too.
229    pub page_group_size: NonZeroU32,
230    /// By default, formatting will be aborted if the database appears to be already formatted.
231    ///
232    /// Setting this option to `true` skips the check and formats the database anyway.
233    pub force: bool,
234}
235
236#[derive(Debug, thiserror::Error)]
237pub enum ClientDatabaseError {
238    /// Invalid soft confirmation depth, it must be smaller than confirmation depth k
239    #[error("Invalid soft confirmation depth, it must be smaller than confirmation depth k")]
240    InvalidSoftConfirmationDepth,
241    /// Invalid max fork tip distance, it must be smaller or equal to confirmation depth k
242    #[error("Invalid max fork tip distance, it must be smaller or equal to confirmation depth k")]
243    InvalidMaxForkTipDistance,
244    /// Storage backend has canceled read request
245    #[error("Storage backend has canceled read request")]
246    ReadRequestCancelled,
247    /// Storage backend read error
248    #[error("Storage backend read error: {error}")]
249    ReadError {
250        /// Low-level error
251        error: io::Error,
252    },
253    /// Unsupported database version
254    #[error("Unsupported database version: {database_version}")]
255    UnsupportedDatabaseVersion {
256        /// Database version
257        database_version: u8,
258    },
259    /// Page group size is too small, must be at least two pages
260    #[error("Page group size is too small ({page_group_size}), must be at least two pages")]
261    PageGroupSizeTooSmall {
262        /// Page group size in pages
263        page_group_size: u32,
264    },
265    /// Unexpected sequence number
266    #[error(
267        "Unexpected sequence number {actual} at page offset {page_offset} (expected \
268        {expected})"
269    )]
270    UnexpectedSequenceNumber {
271        /// Sequence number in the database
272        actual: u64,
273        /// Expected sequence number
274        expected: u64,
275        /// Page offset where storage item is found
276        page_offset: u32,
277    },
278    /// Unexpected storage item
279    #[error("Unexpected storage item at offset {page_offset}: {storage_item:?}")]
280    UnexpectedStorageItem {
281        /// First storage item
282        storage_item: Box<dyn fmt::Debug + Send + Sync>,
283        /// Page offset where storage item is found
284        page_offset: u32,
285    },
286    /// Invalid block
287    #[error("Invalid block at offset {page_offset}")]
288    InvalidBlock {
289        /// Page offset where storage item is found
290        page_offset: u32,
291    },
292    /// Invalid segment headers
293    #[error("Invalid segment headers at offset {page_offset}")]
294    InvalidSegmentHeaders {
295        /// Page offset where storage item is found
296        page_offset: u32,
297    },
298    /// Failed to adjust ancestor block forks
299    #[error("Failed to adjust ancestor block forks")]
300    FailedToAdjustAncestorBlockForks,
301    /// Database is not formatted yet
302    #[error("Database is not formatted yet")]
303    Unformatted,
304    /// Non-permanent first page group
305    #[error("Non-permanent first page group")]
306    NonPermanentFirstPageGroup,
307}
308
309/// Error for [`ClientDatabase::format()`]
310#[derive(Debug, thiserror::Error)]
311pub enum ClientDatabaseFormatError {
312    /// Storage backend has canceled read request
313    #[error("Storage backend has canceled read request")]
314    ReadRequestCancelled,
315    /// Storage backend read error
316    #[error("Storage backend read error: {error}")]
317    ReadError {
318        /// Low-level error
319        error: io::Error,
320    },
321    /// Failed to generate database id
322    #[error("Failed to generate database id")]
323    FailedToGenerateDatabaseId {
324        /// Low-level error
325        #[from]
326        error: SysError,
327    },
328    /// Database is already formatted yet
329    #[error("Database is already formatted yet")]
330    AlreadyFormatted,
331    /// Storage backend has canceled a writing request
332    #[error("Storage backend has canceled a writing request")]
333    WriteRequestCancelled,
334    /// Storage item write error
335    #[error("Storage item write error")]
336    StorageItemWriteError {
337        /// Low-level error
338        #[from]
339        error: io::Error,
340    },
341}
342
343#[derive(Debug, Copy, Clone)]
344struct ForkTip {
345    number: BlockNumber,
346    root: BlockRoot,
347}
348
349#[derive(Debug)]
350struct ClientDatabaseBlockInMemory<Block>
351where
352    Block: GenericOwnedBlock,
353{
354    block: Block,
355    block_details: BlockDetails,
356}
357
358enum FullBlock<'a, Block>
359where
360    Block: GenericOwnedBlock,
361{
362    InMemory(&'a Block),
363    Persisted {
364        header: &'a Block::Header,
365        write_location: WriteLocation,
366    },
367}
368
369/// Client database block contains details about the block state in the database.
370///
371/// Originally all blocks are stored in memory. Once a block is soft-confirmed (see
372/// [`ClientDatabaseOptions::soft_confirmation_depth`]), it is persisted (likely on disk). Later
373///  when it is "confirmed" fully (see [`ClientDatabaseOptions::soft_confirmation_depth`]), it
374/// becomes irreversible.
375#[derive(Debug)]
376enum ClientDatabaseBlock<Block>
377where
378    Block: GenericOwnedBlock,
379{
380    /// Block is stored in memory and wasn't persisted yet
381    InMemory(ClientDatabaseBlockInMemory<Block>),
382    /// Block was persisted (likely on disk)
383    Persisted {
384        header: Block::Header,
385        block_details: BlockDetails,
386        write_location: WriteLocation,
387    },
388    /// Block was persisted (likely on disk) and is irreversibly "confirmed" from the consensus
389    /// perspective
390    PersistedConfirmed {
391        header: Block::Header,
392        write_location: WriteLocation,
393    },
394}
395
396impl<Block> ClientDatabaseBlock<Block>
397where
398    Block: GenericOwnedBlock,
399{
400    #[inline(always)]
401    fn header(&self) -> &Block::Header {
402        match self {
403            Self::InMemory(in_memory) => in_memory.block.header(),
404            Self::Persisted { header, .. } => header,
405            Self::PersistedConfirmed { header, .. } => header,
406        }
407    }
408
409    #[inline(always)]
410    fn full_block(&self) -> FullBlock<'_, Block> {
411        match self {
412            Self::InMemory(in_memory) => FullBlock::InMemory(&in_memory.block),
413            Self::Persisted {
414                header,
415                write_location,
416                ..
417            }
418            | Self::PersistedConfirmed {
419                header,
420                write_location,
421            } => FullBlock::Persisted {
422                header,
423                write_location: *write_location,
424            },
425        }
426    }
427
428    #[inline(always)]
429    fn block_details(&self) -> Option<&BlockDetails> {
430        match self {
431            Self::InMemory(in_memory) => Some(&in_memory.block_details),
432            Self::Persisted { block_details, .. } => Some(block_details),
433            Self::PersistedConfirmed { .. } => None,
434        }
435    }
436}
437
438#[derive(Debug)]
439struct StateData<Block>
440where
441    Block: GenericOwnedBlock,
442{
443    /// Tips of forks that have no descendants.
444    ///
445    /// The current best block is at the front, the rest are in the order from most recently
446    /// updated towards the front to least recently at the back.
447    fork_tips: VecDeque<ForkTip>,
448    /// Map from block root to block number.
449    ///
450    /// Is meant to be used in conjunction with `headers` and `blocks` fields, which are indexed by
451    /// block numbers.
452    block_roots: HashMap<BlockRoot, BlockNumber, BuildHasherDefault<BlockRootHasher>>,
453    /// List of blocks with the newest at the front.
454    ///
455    /// The first element of the first entry corresponds to the best block.
456    ///
457    /// It is expected that in most block numbers there will be exactly one block, some two,
458    /// anything more than that will be very rare. The list of forks for a block number is
459    /// organized in such a way that the first entry at every block number corresponds to the
460    /// canonical version of the blockchain at any point in time.
461    ///
462    /// A position withing this data structure is called "block offset". This is an ephemeral value
463    /// and changes as new best blocks are added. Blocks at the same height are collectively called
464    /// "block forks" and the position of the block within the same block height is called
465    /// "fork offset". While fork offset `0` always corresponds to the canonical version of the
466    /// blockchain, other offsets are not guaranteed to follow any particular ordering rules.
467    blocks: VecDeque<SmallVec<[ClientDatabaseBlock<Block>; 2]>>,
468}
469
470#[derive(Debug)]
471struct SegmentHeadersCache {
472    segment_headers_cache: Vec<SegmentHeader>,
473}
474
475impl SegmentHeadersCache {
476    #[inline(always)]
477    fn last_segment_header(&self) -> Option<SegmentHeader> {
478        self.segment_headers_cache.last().cloned()
479    }
480
481    #[inline(always)]
482    fn max_local_segment_index(&self) -> Option<LocalSegmentIndex> {
483        self.segment_headers_cache
484            .last()
485            .map(|segment_header| segment_header.segment_index.as_inner())
486    }
487
488    #[inline(always)]
489    fn get_segment_header(&self, local_segment_index: LocalSegmentIndex) -> Option<SegmentHeader> {
490        self.segment_headers_cache
491            .get(u64::from(local_segment_index) as usize)
492            .copied()
493    }
494
495    /// Returns actually added segments (some might have been skipped)
496    fn add_segment_headers(
497        &mut self,
498        mut segment_headers: Vec<SegmentHeader>,
499    ) -> Result<Vec<SegmentHeader>, PersistSegmentHeadersError> {
500        self.segment_headers_cache.reserve(segment_headers.len());
501
502        let mut maybe_last_local_segment_index = self.max_local_segment_index();
503
504        if let Some(last_segment_index) = maybe_last_local_segment_index {
505            // Skip already stored segment headers
506            segment_headers.retain(|segment_header| {
507                segment_header.segment_index.as_inner() > last_segment_index
508            });
509        }
510
511        // Check all input segment headers to see which ones are not stored yet and verifying that
512        // segment indices are monotonically increasing
513        for segment_header in segment_headers.iter().copied() {
514            let local_segment_index = segment_header.local_segment_index();
515            match maybe_last_local_segment_index {
516                Some(last_local_segment_index) => {
517                    if local_segment_index != last_local_segment_index + LocalSegmentIndex::ONE {
518                        return Err(PersistSegmentHeadersError::MustFollowLastSegmentIndex {
519                            local_segment_index,
520                            last_local_segment_index,
521                        });
522                    }
523
524                    self.segment_headers_cache.push(segment_header);
525                    maybe_last_local_segment_index.replace(local_segment_index);
526                }
527                None => {
528                    if local_segment_index != LocalSegmentIndex::ZERO {
529                        return Err(PersistSegmentHeadersError::FirstSegmentIndexZero {
530                            local_segment_index,
531                        });
532                    }
533
534                    self.segment_headers_cache.push(segment_header);
535                    maybe_last_local_segment_index.replace(local_segment_index);
536                }
537            }
538        }
539
540        Ok(segment_headers)
541    }
542}
543
544// TODO: Hide implementation details
545#[derive(Debug)]
546struct State<Block, StorageBackend>
547where
548    Block: GenericOwnedBlock,
549{
550    data: StateData<Block>,
551    segment_headers_cache: SegmentHeadersCache,
552    storage_backend_adapter: AsyncRwLock<StorageBackendAdapter<StorageBackend>>,
553}
554
555impl<Block, StorageBackend> State<Block, StorageBackend>
556where
557    Block: GenericOwnedBlock,
558{
559    #[inline(always)]
560    fn best_tip(&self) -> &ForkTip {
561        self.data
562            .fork_tips
563            .front()
564            .expect("The best block is always present; qed")
565    }
566
567    #[inline(always)]
568    fn best_block(&self) -> &ClientDatabaseBlock<Block> {
569        self.data
570            .blocks
571            .front()
572            .expect("The best block is always present; qed")
573            .first()
574            .expect("The best block is always present; qed")
575    }
576}
577
578#[derive(Debug)]
579struct BlockToPersist<'a, Block>
580where
581    Block: GenericOwnedBlock,
582{
583    block_offset: usize,
584    fork_offset: usize,
585    block: &'a ClientDatabaseBlockInMemory<Block>,
586}
587
588#[derive(Debug)]
589struct PersistedBlock {
590    block_offset: usize,
591    fork_offset: usize,
592    write_location: WriteLocation,
593}
594
595#[derive(Debug)]
596struct ClientDatabaseInnerOptions {
597    confirmation_depth_k: BlockNumber,
598    soft_confirmation_depth: BlockNumber,
599    max_fork_tips: NonZeroUsize,
600    max_fork_tip_distance: BlockNumber,
601}
602
603#[derive(Debug)]
604struct Inner<Block, StorageBackend>
605where
606    Block: GenericOwnedBlock,
607{
608    state: AsyncRwLock<State<Block, StorageBackend>>,
609    options: ClientDatabaseInnerOptions,
610}
611
612/// Client database
613#[derive(Debug)]
614pub struct ClientDatabase<Block, StorageBackend>
615where
616    Block: GenericOwnedBlock,
617{
618    inner: Arc<Inner<Block, StorageBackend>>,
619}
620
621impl<Block, StorageBackend> Clone for ClientDatabase<Block, StorageBackend>
622where
623    Block: GenericOwnedBlock,
624{
625    fn clone(&self) -> Self {
626        Self {
627            inner: self.inner.clone(),
628        }
629    }
630}
631
632impl<Block, StorageBackend> Drop for ClientDatabase<Block, StorageBackend>
633where
634    Block: GenericOwnedBlock,
635{
636    fn drop(&mut self) {
637        // TODO: Persist things that were not persisted yet to reduce the data loss on shutdown
638    }
639}
640
641impl<Block, StorageBackend> ChainInfo<Block> for ClientDatabase<Block, StorageBackend>
642where
643    Block: GenericOwnedBlock,
644    StorageBackend: ClientDatabaseStorageBackend,
645{
646    #[inline]
647    fn best_root(&self) -> BlockRoot {
648        // Blocking read lock is fine because where a write lock is only taken for a short time and
649        // all other locks are read locks
650        self.inner.state.read_blocking().best_tip().root
651    }
652
653    #[inline]
654    fn best_header(&self) -> Block::Header {
655        // Blocking read lock is fine because where a write lock is only taken for a short time and
656        // all other locks are read locks
657        self.inner
658            .state
659            .read_blocking()
660            .best_block()
661            .header()
662            .clone()
663    }
664
665    #[inline]
666    fn best_header_with_details(&self) -> (Block::Header, BlockDetails) {
667        // Blocking read lock is fine because where a write lock is only taken for a short time and
668        // all other locks are read locks
669        let state = self.inner.state.read_blocking();
670        let best_block = state.best_block();
671        (
672            best_block.header().clone(),
673            best_block
674                .block_details()
675                .expect("Always present for the best block; qed")
676                .clone(),
677        )
678    }
679
680    // TODO: Add fast path when `descendant_block_root` is the best block
681    #[inline]
682    fn ancestor_header(
683        &self,
684        ancestor_block_number: BlockNumber,
685        descendant_block_root: &BlockRoot,
686    ) -> Option<Block::Header> {
687        // Blocking read lock is fine because where a write lock is only taken for a short time and
688        // all other locks are read locks
689        let state = self.inner.state.read_blocking();
690        let best_number = state.best_tip().number;
691
692        let ancestor_block_offset =
693            best_number.checked_sub(ancestor_block_number)?.as_u64() as usize;
694        let ancestor_block_candidates = state.data.blocks.get(ancestor_block_offset)?;
695
696        let descendant_block_number = *state.data.block_roots.get(descendant_block_root)?;
697        if ancestor_block_number > descendant_block_number {
698            return None;
699        }
700        let descendant_block_offset =
701            best_number.checked_sub(descendant_block_number)?.as_u64() as usize;
702
703        // Range of blocks where the first item is expected to contain a descendant
704        let mut blocks_range_iter = state
705            .data
706            .blocks
707            .iter()
708            .enumerate()
709            .skip(descendant_block_offset);
710
711        let (_offset, descendant_block_candidates) = blocks_range_iter.next()?;
712        let descendant_header = descendant_block_candidates
713            .iter()
714            .find(|block| &*block.header().header().root() == descendant_block_root)?
715            .header()
716            .header();
717
718        // If there are no forks at this level, then this is the canonical chain and ancestor
719        // block number we're looking for is the first block at the corresponding block number.
720        // Similarly, if there is just a single ancestor candidate and descendant exists, it must be
721        // the one we care about.
722        if descendant_block_candidates.len() == 1 || ancestor_block_candidates.len() == 1 {
723            return ancestor_block_candidates
724                .iter()
725                .next()
726                .map(|block| block.header().clone());
727        }
728
729        let mut parent_block_root = &descendant_header.prefix.parent_root;
730
731        // Iterate over the blocks following descendant until ancestor is reached
732        for (block_offset, parent_candidates) in blocks_range_iter {
733            let parent_header = parent_candidates
734                .iter()
735                .find(|header| &*header.header().header().root() == parent_block_root)?
736                .header();
737
738            // When header offset matches, we found the header
739            if block_offset == ancestor_block_offset {
740                return Some(parent_header.clone());
741            }
742
743            parent_block_root = &parent_header.header().prefix.parent_root;
744        }
745
746        None
747    }
748
749    #[inline]
750    fn header(&self, block_root: &BlockRoot) -> Option<Block::Header> {
751        // Blocking read lock is fine because where a write lock is only taken for a short time and
752        // all other locks are read locks
753        let state = self.inner.state.read_blocking();
754        let best_number = state.best_tip().number;
755
756        let block_number = *state.data.block_roots.get(block_root)?;
757        let block_offset = best_number.checked_sub(block_number)?.as_u64() as usize;
758        let block_candidates = state.data.blocks.get(block_offset)?;
759
760        block_candidates.iter().find_map(|block| {
761            let header = block.header();
762
763            if &*header.header().root() == block_root {
764                Some(header.clone())
765            } else {
766                None
767            }
768        })
769    }
770
771    #[inline]
772    fn header_with_details(&self, block_root: &BlockRoot) -> Option<(Block::Header, BlockDetails)> {
773        // Blocking read lock is fine because where a write lock is only taken for a short time and
774        // all other locks are read locks
775        let state = self.inner.state.read_blocking();
776        let best_number = state.best_tip().number;
777
778        let block_number = *state.data.block_roots.get(block_root)?;
779        let block_offset = best_number.checked_sub(block_number)?.as_u64() as usize;
780        let block_candidates = state.data.blocks.get(block_offset)?;
781
782        block_candidates.iter().find_map(|block| {
783            let header = block.header();
784            let block_details = block.block_details().cloned()?;
785
786            if &*header.header().root() == block_root {
787                Some((header.clone(), block_details))
788            } else {
789                None
790            }
791        })
792    }
793
794    #[inline]
795    async fn block(&self, block_root: &BlockRoot) -> Result<Block, ReadBlockError> {
796        let state = self.inner.state.read().await;
797        let best_number = state.best_tip().number;
798
799        let block_number = *state
800            .data
801            .block_roots
802            .get(block_root)
803            .ok_or(ReadBlockError::UnknownBlockRoot)?;
804        let block_offset = best_number
805            .checked_sub(block_number)
806            .expect("Known block roots always have valid block offset; qed")
807            .as_u64() as usize;
808        let block_candidates = state
809            .data
810            .blocks
811            .get(block_offset)
812            .expect("Valid block offsets always have block entries; qed");
813
814        for block_candidate in block_candidates {
815            let header = block_candidate.header();
816
817            if &*header.header().root() == block_root {
818                return match block_candidate.full_block() {
819                    FullBlock::InMemory(block) => Ok(block.clone()),
820                    FullBlock::Persisted {
821                        header,
822                        write_location,
823                    } => {
824                        let storage_backend_adapter = state.storage_backend_adapter.read().await;
825
826                        let storage_item = storage_backend_adapter
827                            .read_storage_item::<StorageItemBlock>(write_location)
828                            .await?;
829
830                        let storage_item_block = match storage_item {
831                            StorageItemBlock::Block(storage_item_block) => storage_item_block,
832                            StorageItemBlock::SegmentHeaders(_) => {
833                                return Err(ReadBlockError::StorageItemReadError {
834                                    error: io::Error::other(
835                                        "Unexpected storage item: SegmentHeaders",
836                                    ),
837                                });
838                            }
839                        };
840
841                        let StorageItemBlockBlock {
842                            header: _,
843                            body,
844                            mmr_with_block: _,
845                            system_contract_states: _,
846                        } = storage_item_block;
847
848                        Block::from_buffers(header.buffer().clone(), body)
849                            .ok_or(ReadBlockError::FailedToDecode)
850                    }
851                };
852            }
853        }
854
855        unreachable!("Known block root always has block candidate associated with it; qed")
856    }
857
858    #[inline]
859    fn last_segment_header(&self) -> Option<SegmentHeader> {
860        // Blocking read lock is fine because where a write lock is only taken for a short time and
861        // all other locks are read locks
862        let state = self.inner.state.read_blocking();
863        state.segment_headers_cache.last_segment_header()
864    }
865
866    #[inline]
867    fn max_local_segment_index(&self) -> Option<LocalSegmentIndex> {
868        // Blocking read lock is fine because where a write lock is only taken for a short time and
869        // all other locks are read locks
870        let state = self.inner.state.read_blocking();
871
872        state.segment_headers_cache.max_local_segment_index()
873    }
874
875    #[inline]
876    fn get_segment_header(&self, segment_index: LocalSegmentIndex) -> Option<SegmentHeader> {
877        // Blocking read lock is fine because where a write lock is only taken for a short time and
878        // all other locks are read locks
879        let state = self.inner.state.read_blocking();
880
881        state
882            .segment_headers_cache
883            .get_segment_header(segment_index)
884    }
885
886    #[inline]
887    fn segment_headers_for_block(&self, block_number: BlockNumber) -> Vec<SegmentHeader> {
888        // Blocking read lock is fine because where a write lock is only taken for a short time and
889        // all other locks are read locks
890        let state = self.inner.state.read_blocking();
891
892        let Some(last_local_segment_index) = state.segment_headers_cache.max_local_segment_index()
893        else {
894            // Not initialized
895            return Vec::new();
896        };
897
898        // Special case for the initial segment (for beacon chain genesis block)
899        if Block::Block::SHARD_KIND == RealShardKind::BeaconChain
900            && block_number == BlockNumber::ONE
901        {
902            // If there is a segment index present, and we store monotonically increasing segment
903            // headers, then the first header exists
904            return vec![
905                state
906                    .segment_headers_cache
907                    .get_segment_header(LocalSegmentIndex::ZERO)
908                    .expect("Segment headers are stored in monotonically increasing order; qed"),
909            ];
910        }
911
912        if last_local_segment_index == LocalSegmentIndex::ZERO {
913            // Genesis segment already included in block #1
914            return Vec::new();
915        }
916
917        let mut current_local_segment_index = last_local_segment_index;
918        loop {
919            // If the current segment index present, and we store monotonically increasing segment
920            // headers, then the current segment header exists as well.
921            let current_segment_header = state
922                .segment_headers_cache
923                .get_segment_header(current_local_segment_index)
924                .expect("Segment headers are stored in monotonically increasing order; qed");
925
926            // The block immediately after the archived segment adding the confirmation depth
927            let target_block_number = current_segment_header.last_archived_block.number()
928                + BlockNumber::ONE
929                + self.inner.options.confirmation_depth_k;
930            if target_block_number == block_number {
931                let mut headers_for_block = vec![current_segment_header];
932
933                // Check block spanning multiple segments
934                let last_archived_block_number = current_segment_header.last_archived_block.number;
935                let mut local_segment_index = current_local_segment_index - LocalSegmentIndex::ONE;
936
937                while let Some(segment_header) = state
938                    .segment_headers_cache
939                    .get_segment_header(local_segment_index)
940                {
941                    if segment_header.last_archived_block.number == last_archived_block_number {
942                        headers_for_block.insert(0, segment_header);
943                        local_segment_index -= LocalSegmentIndex::ONE;
944                    } else {
945                        break;
946                    }
947                }
948
949                return headers_for_block;
950            }
951
952            // iterate segments further
953            if target_block_number > block_number {
954                // no need to check the initial segment
955                if current_local_segment_index > LocalSegmentIndex::ONE {
956                    current_local_segment_index -= LocalSegmentIndex::ONE
957                } else {
958                    break;
959                }
960            } else {
961                // No segment headers required
962                return Vec::new();
963            }
964        }
965
966        // No segment headers required
967        Vec::new()
968    }
969}
970
971impl<Block, StorageBackend> ChainInfoWrite<Block> for ClientDatabase<Block, StorageBackend>
972where
973    Block: GenericOwnedBlock,
974    StorageBackend: ClientDatabaseStorageBackend,
975{
976    async fn persist_block(
977        &self,
978        block: Block,
979        block_details: BlockDetails,
980    ) -> Result<(), PersistBlockError> {
981        let mut state = self.inner.state.write().await;
982        let best_number = state.best_tip().number;
983
984        let header = block.header().header();
985
986        let block_number = header.prefix.number;
987
988        if best_number == BlockNumber::ZERO && block_number != BlockNumber::ONE {
989            // Special case when syncing on top of the fresh database
990            Self::insert_first_block(&mut state.data, block, block_details);
991
992            return Ok(());
993        }
994
995        if block_number == best_number + BlockNumber::ONE {
996            return Self::insert_new_best_block(state, &self.inner, block, block_details).await;
997        }
998
999        let block_offset = best_number
1000            .checked_sub(block_number)
1001            .ok_or(PersistBlockError::MissingParent)?
1002            .as_u64() as usize;
1003
1004        if block_offset >= self.inner.options.confirmation_depth_k.as_u64() as usize {
1005            return Err(PersistBlockError::OutsideAcceptableRange);
1006        }
1007
1008        let state = &mut *state;
1009
1010        let block_forks = state.data.blocks.get_mut(block_offset).ok_or_else(|| {
1011            error!(
1012                %block_number,
1013                %block_offset,
1014                "Failed to store block fork, header offset is missing despite being within \
1015                acceptable range"
1016            );
1017
1018            PersistBlockError::OutsideAcceptableRange
1019        })?;
1020
1021        for (index, fork_tip) in state.data.fork_tips.iter_mut().enumerate() {
1022            // Block's parent is no longer a fork tip, remove it
1023            if fork_tip.root == header.prefix.parent_root {
1024                state.data.fork_tips.remove(index);
1025                break;
1026            }
1027        }
1028
1029        let block_root = *header.root();
1030        // Insert at position 1, which means the most recent tip, which doesn't correspond to
1031        // the best block
1032        state.data.fork_tips.insert(
1033            1,
1034            ForkTip {
1035                number: block_number,
1036                root: block_root,
1037            },
1038        );
1039        state.data.block_roots.insert(block_root, block_number);
1040        block_forks.push(ClientDatabaseBlock::InMemory(ClientDatabaseBlockInMemory {
1041            block,
1042            block_details,
1043        }));
1044
1045        Self::prune_outdated_fork_tips(block_number, &mut state.data, &self.inner.options);
1046
1047        Ok(())
1048    }
1049
1050    async fn persist_segment_headers(
1051        &self,
1052        segment_headers: Vec<SegmentHeader>,
1053    ) -> Result<(), PersistSegmentHeadersError> {
1054        let mut state = self.inner.state.write().await;
1055
1056        let added_segment_headers = state
1057            .segment_headers_cache
1058            .add_segment_headers(segment_headers)?;
1059
1060        if added_segment_headers.is_empty() {
1061            return Ok(());
1062        }
1063
1064        // Convert write lock into upgradable read lock to allow reads, while preventing segment
1065        // headers modifications
1066        // TODO: This assumes both guarantees in https://github.com/smol-rs/async-lock/issues/100
1067        //  are satisfied. If not, blocking read locks in other places will cause issues.
1068        let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1069
1070        let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1071
1072        storage_backend_adapter
1073            .write_storage_item(StorageItemBlock::SegmentHeaders(
1074                StorageItemBlockSegmentHeaders {
1075                    segment_headers: added_segment_headers,
1076                },
1077            ))
1078            .await?;
1079
1080        Ok(())
1081    }
1082}
1083
1084impl<Block, StorageBackend> ClientDatabase<Block, StorageBackend>
1085where
1086    Block: GenericOwnedBlock,
1087    StorageBackend: ClientDatabaseStorageBackend,
1088{
1089    /// Open the existing database.
1090    ///
1091    /// NOTE: The database needs to be formatted with [`Self::format()`] before it can be used.
1092    pub async fn open<GBB>(
1093        options: ClientDatabaseOptions<GBB, StorageBackend>,
1094    ) -> Result<Self, ClientDatabaseError>
1095    where
1096        GBB: FnOnce() -> GenesisBlockBuilderResult<Block>,
1097    {
1098        let ClientDatabaseOptions {
1099            write_buffer_size,
1100            confirmation_depth_k,
1101            soft_confirmation_depth,
1102            max_fork_tips,
1103            max_fork_tip_distance,
1104            genesis_block_builder,
1105            storage_backend,
1106        } = options;
1107        if soft_confirmation_depth >= confirmation_depth_k {
1108            return Err(ClientDatabaseError::InvalidSoftConfirmationDepth);
1109        }
1110
1111        if max_fork_tip_distance > confirmation_depth_k {
1112            return Err(ClientDatabaseError::InvalidMaxForkTipDistance);
1113        }
1114
1115        let mut state_data = StateData {
1116            fork_tips: VecDeque::new(),
1117            block_roots: HashMap::default(),
1118            blocks: VecDeque::new(),
1119        };
1120        let mut segment_headers_cache = SegmentHeadersCache {
1121            segment_headers_cache: Vec::new(),
1122        };
1123
1124        let options = ClientDatabaseInnerOptions {
1125            confirmation_depth_k,
1126            soft_confirmation_depth,
1127            max_fork_tips,
1128            max_fork_tip_distance,
1129        };
1130
1131        let storage_item_handlers = StorageItemHandlers {
1132            permanent: |_arg| {
1133                // TODO
1134                Ok(())
1135            },
1136            block: |arg| {
1137                let StorageItemHandlerArg {
1138                    storage_item,
1139                    page_offset,
1140                    num_pages,
1141                } = arg;
1142                let storage_item_block = match storage_item {
1143                    StorageItemBlock::Block(storage_item_block) => storage_item_block,
1144                    StorageItemBlock::SegmentHeaders(segment_headers) => {
1145                        let num_segment_headers = segment_headers.segment_headers.len();
1146                        return match segment_headers_cache
1147                            .add_segment_headers(segment_headers.segment_headers)
1148                        {
1149                            Ok(_) => Ok(()),
1150                            Err(error) => {
1151                                error!(
1152                                    %page_offset,
1153                                    %num_segment_headers,
1154                                    %error,
1155                                    "Failed to add segment headers from storage item"
1156                                );
1157
1158                                Err(ClientDatabaseError::InvalidSegmentHeaders { page_offset })
1159                            }
1160                        };
1161                    }
1162                };
1163
1164                // TODO: It would be nice to not allocate body here since we'll not use it here
1165                //  anyway
1166                let StorageItemBlockBlock {
1167                    header,
1168                    body: _,
1169                    mmr_with_block,
1170                    system_contract_states,
1171                } = storage_item_block;
1172
1173                let header = Block::Header::from_buffer(header).map_err(|_buffer| {
1174                    error!(%page_offset, "Failed to decode block header from bytes");
1175
1176                    ClientDatabaseError::InvalidBlock { page_offset }
1177                })?;
1178
1179                let block_root = *header.header().root();
1180                let block_number = header.header().prefix.number;
1181
1182                state_data.block_roots.insert(block_root, block_number);
1183
1184                let maybe_best_number = state_data
1185                    .blocks
1186                    .front()
1187                    .and_then(|block_forks| block_forks.first())
1188                    .map(|best_block| {
1189                        // Type inference is not working here for some reason
1190                        let header: &Block::Header = best_block.header();
1191
1192                        header.header().prefix.number
1193                    });
1194
1195                let block_offset = if let Some(best_number) = maybe_best_number {
1196                    if block_number <= best_number {
1197                        (best_number - block_number).as_u64() as usize
1198                    } else {
1199                        // The new best block must follow the previous best block
1200                        if block_number - best_number != BlockNumber::ONE {
1201                            error!(
1202                                %page_offset,
1203                                %best_number,
1204                                %block_number,
1205                                "Invalid new best block number, it must be only one block \
1206                                higher than the best block"
1207                            );
1208
1209                            return Err(ClientDatabaseError::InvalidBlock { page_offset });
1210                        }
1211
1212                        state_data.blocks.push_front(SmallVec::new());
1213                        // Will insert a new block at the front
1214                        0
1215                    }
1216                } else {
1217                    state_data.blocks.push_front(SmallVec::new());
1218                    // Will insert a new block at the front
1219                    0
1220                };
1221
1222                let block_forks = match state_data.blocks.get_mut(block_offset) {
1223                    Some(block_forks) => block_forks,
1224                    None => {
1225                        // Ignore the older block, other blocks at its height were already pruned
1226                        // anyway
1227
1228                        return Ok(());
1229                    }
1230                };
1231
1232                // Push a new block to the end of the list, we'll fix it up later
1233                block_forks.push(ClientDatabaseBlock::Persisted {
1234                    header,
1235                    block_details: BlockDetails {
1236                        mmr_with_block,
1237                        system_contract_states,
1238                    },
1239                    write_location: WriteLocation {
1240                        page_offset,
1241                        num_pages,
1242                    },
1243                });
1244
1245                // If a new block was inserted, confirm a new canonical block to prune extra
1246                // in-memory information
1247                if block_offset == 0 && block_forks.len() == 1 {
1248                    Self::confirm_canonical_block(block_number, &mut state_data, &options);
1249                }
1250
1251                Ok(())
1252            },
1253        };
1254
1255        let storage_backend_adapter =
1256            StorageBackendAdapter::open(write_buffer_size, storage_item_handlers, storage_backend)
1257                .await?;
1258
1259        if let Some(best_block) = state_data.blocks.front().and_then(|block_forks| {
1260            // The best block is last in the list here because that is how it was inserted while
1261            // reading from the database
1262            block_forks.last()
1263        }) {
1264            // Type inference is not working here for some reason
1265            let header: &Block::Header = best_block.header();
1266            let header = header.header();
1267            let block_number = header.prefix.number;
1268            let block_root = *header.root();
1269
1270            if !Self::adjust_ancestor_block_forks(&mut state_data.blocks, block_root) {
1271                return Err(ClientDatabaseError::FailedToAdjustAncestorBlockForks);
1272            }
1273
1274            // Store the best block as the first and only fork tip
1275            state_data.fork_tips.push_front(ForkTip {
1276                number: block_number,
1277                root: block_root,
1278            });
1279        } else {
1280            let GenesisBlockBuilderResult {
1281                block,
1282                system_contract_states,
1283            } = genesis_block_builder();
1284
1285            // If the database is empty, initialize everything with the genesis block
1286            let header = block.header().header();
1287            let block_number = header.prefix.number;
1288            let block_root = *header.root();
1289
1290            state_data.fork_tips.push_front(ForkTip {
1291                number: block_number,
1292                root: block_root,
1293            });
1294            state_data.block_roots.insert(block_root, block_number);
1295            state_data
1296                .blocks
1297                .push_front(smallvec![ClientDatabaseBlock::InMemory(
1298                    ClientDatabaseBlockInMemory {
1299                        block,
1300                        block_details: BlockDetails {
1301                            system_contract_states,
1302                            mmr_with_block: Arc::new({
1303                                let mut mmr = BlockMerkleMountainRange::new();
1304                                mmr.add_leaf(&block_root);
1305                                mmr
1306                            })
1307                        },
1308                    }
1309                )]);
1310        }
1311
1312        let state = State {
1313            data: state_data,
1314            segment_headers_cache,
1315            storage_backend_adapter: AsyncRwLock::new(storage_backend_adapter),
1316        };
1317
1318        let inner = Inner {
1319            state: AsyncRwLock::new(state),
1320            options,
1321        };
1322
1323        Ok(Self {
1324            inner: Arc::new(inner),
1325        })
1326    }
1327
1328    /// Format a new database
1329    pub async fn format(
1330        storage_backend: &StorageBackend,
1331        options: ClientDatabaseFormatOptions,
1332    ) -> Result<(), ClientDatabaseFormatError> {
1333        StorageBackendAdapter::format(storage_backend, options).await
1334    }
1335
1336    fn insert_first_block(state: &mut StateData<Block>, block: Block, block_details: BlockDetails) {
1337        // If the database is empty, initialize everything with the genesis block
1338        let header = block.header().header();
1339        let block_number = header.prefix.number;
1340        let block_root = *header.root();
1341
1342        state.fork_tips.clear();
1343        state.fork_tips.push_front(ForkTip {
1344            number: block_number,
1345            root: block_root,
1346        });
1347        state.block_roots.clear();
1348        state.block_roots.insert(block_root, block_number);
1349        state.blocks.clear();
1350        state
1351            .blocks
1352            .push_front(smallvec![ClientDatabaseBlock::InMemory(
1353                ClientDatabaseBlockInMemory {
1354                    block,
1355                    block_details,
1356                }
1357            )]);
1358    }
1359
1360    async fn insert_new_best_block(
1361        mut state: AsyncRwLockWriteGuard<'_, State<Block, StorageBackend>>,
1362        inner: &Inner<Block, StorageBackend>,
1363        block: Block,
1364        block_details: BlockDetails,
1365    ) -> Result<(), PersistBlockError> {
1366        let header = block.header().header();
1367        let block_number = header.prefix.number;
1368        let block_root = *header.root();
1369        let parent_root = header.prefix.parent_root;
1370
1371        // Adjust the relative order of forks to ensure the first index always corresponds to
1372        // ancestors of the new best block
1373        if !Self::adjust_ancestor_block_forks(&mut state.data.blocks, parent_root) {
1374            return Err(PersistBlockError::MissingParent);
1375        }
1376
1377        // Store new block in the state
1378        {
1379            for (index, fork_tip) in state.data.fork_tips.iter_mut().enumerate() {
1380                // Block's parent is no longer a fork tip, remove it
1381                if fork_tip.root == parent_root {
1382                    state.data.fork_tips.remove(index);
1383                    break;
1384                }
1385            }
1386
1387            state.data.fork_tips.push_front(ForkTip {
1388                number: block_number,
1389                root: block_root,
1390            });
1391            state.data.block_roots.insert(block_root, block_number);
1392            state
1393                .data
1394                .blocks
1395                .push_front(smallvec![ClientDatabaseBlock::InMemory(
1396                    ClientDatabaseBlockInMemory {
1397                        block,
1398                        block_details: block_details.clone()
1399                    }
1400                )]);
1401        }
1402
1403        let options = &inner.options;
1404
1405        Self::confirm_canonical_block(block_number, &mut state.data, options);
1406        Self::prune_outdated_fork_tips(block_number, &mut state.data, options);
1407
1408        // Convert write lock into upgradable read lock to allow reads, while preventing concurrent
1409        // block modifications
1410        // TODO: This assumes both guarantees in https://github.com/smol-rs/async-lock/issues/100
1411        //  are satisfied. If not, blocking read locks in other places will cause issues.
1412        let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1413
1414        let mut blocks_to_persist = Vec::new();
1415        for block_offset in options.soft_confirmation_depth.as_u64() as usize.. {
1416            let Some(fork_blocks) = state.data.blocks.get(block_offset) else {
1417                break;
1418            };
1419
1420            let len_before = blocks_to_persist.len();
1421            fork_blocks
1422                .iter()
1423                .enumerate()
1424                .filter_map(|(fork_offset, client_database_block)| {
1425                    match client_database_block {
1426                        ClientDatabaseBlock::InMemory(block) => Some(BlockToPersist {
1427                            block_offset,
1428                            fork_offset,
1429                            block,
1430                        }),
1431                        ClientDatabaseBlock::Persisted { .. }
1432                        | ClientDatabaseBlock::PersistedConfirmed { .. } => {
1433                            // Already persisted
1434                            None
1435                        }
1436                    }
1437                })
1438                .collect_into(&mut blocks_to_persist);
1439
1440            if blocks_to_persist.len() == len_before {
1441                break;
1442            }
1443        }
1444
1445        // Persist blocks from older to newer
1446        let mut persisted_blocks = Vec::with_capacity(blocks_to_persist.len());
1447        {
1448            let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1449
1450            for block_to_persist in blocks_to_persist.into_iter().rev() {
1451                let BlockToPersist {
1452                    block_offset,
1453                    fork_offset,
1454                    block,
1455                } = block_to_persist;
1456
1457                let write_location = storage_backend_adapter
1458                    .write_storage_item(StorageItemBlock::Block(StorageItemBlockBlock {
1459                        header: block.block.header().buffer().clone(),
1460                        body: block.block.body().buffer().clone(),
1461                        mmr_with_block: Arc::clone(&block.block_details.mmr_with_block),
1462                        system_contract_states: StdArc::clone(
1463                            &block.block_details.system_contract_states,
1464                        ),
1465                    }))
1466                    .await?;
1467
1468                persisted_blocks.push(PersistedBlock {
1469                    block_offset,
1470                    fork_offset,
1471                    write_location,
1472                });
1473            }
1474        }
1475
1476        // Convert blocks to persisted
1477        let mut state = RwLockUpgradableReadGuard::upgrade(state).await;
1478        for persisted_block in persisted_blocks {
1479            let PersistedBlock {
1480                block_offset,
1481                fork_offset,
1482                write_location,
1483            } = persisted_block;
1484
1485            let block = state
1486                .data
1487                .blocks
1488                .get_mut(block_offset)
1489                .expect("Still holding the same lock since last check; qed")
1490                .get_mut(fork_offset)
1491                .expect("Still holding the same lock since last check; qed");
1492
1493            replace_with_or_abort(block, |block| {
1494                if let ClientDatabaseBlock::InMemory(in_memory) = block {
1495                    let (header, _body) = in_memory.block.split();
1496
1497                    ClientDatabaseBlock::Persisted {
1498                        header,
1499                        block_details: in_memory.block_details,
1500                        write_location,
1501                    }
1502                } else {
1503                    unreachable!("Still holding the same lock since last check; qed");
1504                }
1505            });
1506        }
1507
1508        // TODO: Prune blocks that are no longer necessary
1509        // TODO: Prune unused page groups here or elsewhere?
1510
1511        Ok(())
1512    }
1513
1514    /// Adjust the relative order of forks to ensure the first index always corresponds to
1515    /// `parent_block_root` and its ancestors.
1516    ///
1517    /// Returns `true` on success and `false` if one of the parents was not found.
1518    #[must_use]
1519    fn adjust_ancestor_block_forks(
1520        blocks: &mut VecDeque<SmallVec<[ClientDatabaseBlock<Block>; 2]>>,
1521        mut parent_block_root: BlockRoot,
1522    ) -> bool {
1523        let mut ancestor_blocks = blocks.iter_mut();
1524
1525        loop {
1526            if ancestor_blocks.len() == 1 {
1527                // Nothing left to adjust with a single fork
1528                break;
1529            }
1530
1531            let Some(parent_blocks) = ancestor_blocks.next() else {
1532                // No more parent headers present
1533                break;
1534            };
1535
1536            let Some(fork_offset_parent_block_root) =
1537                parent_blocks
1538                    .iter()
1539                    .enumerate()
1540                    .find_map(|(fork_offset, fork_block)| {
1541                        let fork_header = fork_block.header().header();
1542                        if *fork_header.root() == parent_block_root {
1543                            Some((fork_offset, fork_header.prefix.parent_root))
1544                        } else {
1545                            None
1546                        }
1547                    })
1548            else {
1549                return false;
1550            };
1551
1552            let fork_offset;
1553            (fork_offset, parent_block_root) = fork_offset_parent_block_root;
1554
1555            parent_blocks.swap(0, fork_offset);
1556        }
1557
1558        true
1559    }
1560
1561    /// Prune outdated fork tips that are too deep and have not been updated for a long time.
1562    ///
1563    /// Note that actual headers, blocks and MMRs could remain if they are currently used by
1564    /// something or were already persisted on disk. With persisted blocks specifically, RAM usage
1565    /// implications are minimal, and we wouldn't want to re-download already stored blocks in case
1566    /// they end up being necessary later.
1567    fn prune_outdated_fork_tips(
1568        best_number: BlockNumber,
1569        state: &mut StateData<Block>,
1570        options: &ClientDatabaseInnerOptions,
1571    ) {
1572        let state = &mut *state;
1573
1574        // These forks are just candidates because they will not be pruned if the reference count is
1575        // not 1, indicating they are still in use by something
1576        let mut candidate_forks_to_remove = Vec::with_capacity(options.max_fork_tips.get());
1577
1578        // Prune forks that are too far away from the best block
1579        state.fork_tips.retain(|fork_tip| {
1580            if best_number - fork_tip.number > options.max_fork_tip_distance {
1581                candidate_forks_to_remove.push(*fork_tip);
1582                false
1583            } else {
1584                true
1585            }
1586        });
1587        // Prune forks that exceed the maximum number of forks
1588        if state.fork_tips.len() > options.max_fork_tips.get() {
1589            state
1590                .fork_tips
1591                .drain(options.max_fork_tips.get()..)
1592                .collect_into(&mut candidate_forks_to_remove);
1593        }
1594
1595        // Prune all possible candidates
1596        candidate_forks_to_remove
1597            .retain(|fork_tip| !Self::prune_outdated_fork(best_number, fork_tip, state));
1598        // Return those that were not pruned back to the list of tips
1599        state.fork_tips.extend(candidate_forks_to_remove);
1600    }
1601
1602    /// Returns `true` if the tip was pruned successfully and `false` if it should be returned to
1603    /// the list of fork tips
1604    #[must_use]
1605    fn prune_outdated_fork(
1606        best_number: BlockNumber,
1607        fork_tip: &ForkTip,
1608        state: &mut StateData<Block>,
1609    ) -> bool {
1610        let block_offset = (best_number - fork_tip.number).as_u64() as usize;
1611
1612        // Prune fork top and all its ancestors that are not used
1613        let mut block_root_to_prune = fork_tip.root;
1614        let mut pruned_tip = false;
1615        for block_offset in block_offset.. {
1616            let Some(fork_blocks) = state.blocks.get_mut(block_offset) else {
1617                if !pruned_tip {
1618                    error!(
1619                        %best_number,
1620                        ?fork_tip,
1621                        block_offset,
1622                        "Block offset was not present in the database, this is an implementation \
1623                        bug #1"
1624                    );
1625                }
1626                // No forks left to prune
1627                break;
1628            };
1629
1630            if fork_blocks.len() == 1 {
1631                if !pruned_tip {
1632                    error!(
1633                        %best_number,
1634                        ?fork_tip,
1635                        block_offset,
1636                        "Block offset was not present in the database, this is an implementation \
1637                        bug #2"
1638                    );
1639                }
1640
1641                // No forks left to prune
1642                break;
1643            }
1644
1645            let Some((fork_offset, block)) = fork_blocks
1646                .iter()
1647                .enumerate()
1648                // Skip ancestor of the best block, it is certainly not a fork to be pruned
1649                .skip(1)
1650                .find(|(_fork_offset, block)| {
1651                    *block.header().header().root() == block_root_to_prune
1652                })
1653            else {
1654                if !pruned_tip {
1655                    error!(
1656                        %best_number,
1657                        ?fork_tip,
1658                        block_offset,
1659                        "Block offset was not present in the database, this is an implementation \
1660                        bug #3"
1661                    );
1662                }
1663
1664                // Nothing left to prune
1665                break;
1666            };
1667
1668            // More than one instance means something somewhere is using or depends on this block
1669            if block.header().ref_count() > 1 {
1670                break;
1671            }
1672
1673            // Blocks that are already persisted
1674            match block {
1675                ClientDatabaseBlock::InMemory(_) => {
1676                    // Prune
1677                }
1678                ClientDatabaseBlock::Persisted { .. }
1679                | ClientDatabaseBlock::PersistedConfirmed { .. } => {
1680                    // Already on disk, keep it in memory for later, but prune the tip
1681                    pruned_tip = true;
1682                    break;
1683                }
1684            }
1685
1686            state.block_roots.get_mut(&block_root_to_prune);
1687            block_root_to_prune = block.header().header().prefix.parent_root;
1688            fork_blocks.swap_remove(fork_offset);
1689
1690            pruned_tip = true;
1691        }
1692
1693        pruned_tip
1694    }
1695
1696    /// Confirm a block at confirmation depth k and prune any other blocks at the same depth with
1697    /// their descendants
1698    fn confirm_canonical_block(
1699        best_number: BlockNumber,
1700        state_data: &mut StateData<Block>,
1701        options: &ClientDatabaseInnerOptions,
1702    ) {
1703        // `+1` means it effectively confirms parent blocks instead. This is done to keep the parent
1704        // of the confirmed block with its MMR in memory due to confirmed blocks not storing their
1705        // MMRs, which might be needed for reorgs at the lowest possible depth.
1706        let block_offset = (options.confirmation_depth_k + BlockNumber::ONE).as_u64() as usize;
1707
1708        let Some(fork_blocks) = state_data.blocks.get_mut(block_offset) else {
1709            // Nothing to confirm yet
1710            return;
1711        };
1712
1713        // Mark the canonical block as confirmed
1714        {
1715            let Some(canonical_block) = fork_blocks.first_mut() else {
1716                error!(
1717                    %best_number,
1718                    block_offset,
1719                    "Have not found a canonical block to confirm, this is an implementation bug"
1720                );
1721                return;
1722            };
1723
1724            replace_with_or_abort(canonical_block, |block| match block {
1725                ClientDatabaseBlock::InMemory(_) => {
1726                    error!(
1727                        %best_number,
1728                        block_offset,
1729                        header = ?block.header(),
1730                        "Block to be confirmed must not be in memory, this is an implementation bug"
1731                    );
1732                    block
1733                }
1734                ClientDatabaseBlock::Persisted {
1735                    header,
1736                    block_details: _,
1737                    write_location,
1738                } => ClientDatabaseBlock::PersistedConfirmed {
1739                    header,
1740                    write_location,
1741                },
1742                ClientDatabaseBlock::PersistedConfirmed { .. } => {
1743                    error!(
1744                        %best_number,
1745                        block_offset,
1746                        header = ?block.header(),
1747                        "Block to be confirmed must not be confirmed yet, this is an \
1748                        implementation bug"
1749                    );
1750                    block
1751                }
1752            });
1753        }
1754
1755        // Prune the rest of the blocks and their descendants
1756        let mut block_roots_to_prune = fork_blocks
1757            .drain(1..)
1758            .map(|block| *block.header().header().root())
1759            .collect::<Vec<_>>();
1760        let mut current_block_offset = block_offset;
1761        while !block_roots_to_prune.is_empty() {
1762            // Prune fork tips (if any)
1763            state_data
1764                .fork_tips
1765                .retain(|fork_tip| !block_roots_to_prune.contains(&fork_tip.root));
1766
1767            // Prune removed block roots
1768            for block_root in &block_roots_to_prune {
1769                state_data.block_roots.remove(block_root);
1770            }
1771
1772            // Block offset for direct descendants
1773            if let Some(next_block_offset) = current_block_offset.checked_sub(1) {
1774                current_block_offset = next_block_offset;
1775            } else {
1776                // Reached the tip
1777                break;
1778            }
1779
1780            let fork_blocks = state_data
1781                .blocks
1782                .get_mut(current_block_offset)
1783                .expect("Lower block offset always exists; qed");
1784
1785            // Collect descendants of pruned blocks to prune them next
1786            block_roots_to_prune = fork_blocks
1787                .drain_filter(|block| {
1788                    let header = block.header().header();
1789
1790                    block_roots_to_prune.contains(&header.prefix.parent_root)
1791                })
1792                .map(|block| *block.header().header().root())
1793                .collect();
1794        }
1795    }
1796}