ab_client_database/
lib.rs

1//! Client database.
2//!
3//! ## High-level architecture overview
4//!
5//! The database operates on [`ClientDatabaseStorageBackend`], which is backed by [`AlignedPage`]s
6//! that can be read or written. Pages contain `StorageItem`s, one storage item can occupy one or
7//! more pages, but pages always belong to a single storage item. Pages are the smallest unit and
8//! align nicely with the hardware architecture of modern SSDs. Each page starts with a prefix that
9//! describes the contents of the page. `StorageItem` always starts at the multiple of the
10//! `u128`/16 bytes, allowing for direct memory mapping onto target data structures.
11//!
12//! [`AlignedPage`]: crate::storage_backend::AlignedPage
13//!
14//! Individual pages are grouped into page groups (configurable via [`ClientDatabaseOptions`]). Page
15//! groups can be permanent and ephemeral. Permanent page groups store information that is never
16//! going to be deleted, like segment headers. Ephemeral page groups store the majority of the
17//! information about blocks, blockchain state and other things that are being created all the time.
18//! Once information in an ephemeral page group is too old and no longer needed, it can be
19//! repurposed for a new permanent or ephemeral page group. There are different kinds of page groups
20//! defined in `PageGroupKind`, and each variant has independent sequence numbers.
21//!
22//! Page groups are append-only, there is only one active permanent and one ephemeral page group.
23//! They are appended with more pages containing storage items until there is no space to add a
24//! complete storage item, after which the next page group is started.
25//!
26//! Ephemeral page groups can be freed only when they contain 100% outdated storage items.
27//! Individual pages can't be freed.
28//!
29//! Each storage item has a sequence number and checksums that help to define the global ordering
30//! and check whether a storage item was written fully. Upon restart, the page group containing the
31//! latest storage items is found, and the latest fully written storage item is identified to
32//! reconstruct the database state.
33//!
34//! Each page group starts with a `StorageItemPageGroupHeader` storage item for easier
35//! identification.
36//!
37//! The database is typically contained in a single file (though in principle could be contained in
38//! multiple if necessary). Before the database can be used, it needs to be formatted with a
39//! specific size (it is possible to increase the size afterward) before it can be used. It is
40//! expected (but depends on the storage backend) that the whole file size is pre-allocated on disk
41//! and no writes will fail due to lack of disk space (which could be the case with a sparse file).
42
43#![expect(incomplete_features, reason = "generic_const_exprs")]
44// TODO: This feature is not actually used in this crate, but is added as a workaround for
45//  https://github.com/rust-lang/rust/issues/141492
46#![feature(generic_const_exprs)]
47#![feature(
48    default_field_values,
49    get_mut_unchecked,
50    iter_collect_into,
51    maybe_uninit_as_bytes,
52    maybe_uninit_fill,
53    push_mut,
54    try_blocks
55)]
56
57mod page_group;
58pub mod storage_backend;
59mod storage_backend_adapter;
60
61use crate::page_group::block::StorageItemBlock;
62use crate::page_group::block::block::StorageItemBlockBlock;
63use crate::page_group::block::segment_headers::StorageItemBlockSegmentHeaders;
64use crate::storage_backend::ClientDatabaseStorageBackend;
65use crate::storage_backend_adapter::{
66    StorageBackendAdapter, StorageItemHandlerArg, StorageItemHandlers, WriteLocation,
67};
68use ab_client_api::{
69    BlockDetails, BlockMerkleMountainRange, ChainInfo, ChainInfoWrite, ContractSlotState,
70    PersistBlockError, PersistSegmentHeadersError, ReadBlockError,
71};
72use ab_core_primitives::block::body::owned::GenericOwnedBlockBody;
73use ab_core_primitives::block::header::GenericBlockHeader;
74use ab_core_primitives::block::header::owned::GenericOwnedBlockHeader;
75use ab_core_primitives::block::owned::GenericOwnedBlock;
76use ab_core_primitives::block::{BlockNumber, BlockRoot, GenericBlock};
77use ab_core_primitives::segments::{SegmentHeader, SegmentIndex};
78use ab_core_primitives::shard::RealShardKind;
79use ab_io_type::trivial_type::TrivialType;
80use async_lock::{
81    RwLock as AsyncRwLock, RwLockUpgradableReadGuard, RwLockWriteGuard as AsyncRwLockWriteGuard,
82};
83use rand::rngs::OsError;
84use rclite::Arc;
85use replace_with::replace_with_or_abort;
86use smallvec::{SmallVec, smallvec};
87use std::collections::{HashMap, VecDeque};
88use std::hash::{BuildHasherDefault, Hasher};
89use std::num::{NonZeroU32, NonZeroUsize};
90use std::ops::Deref;
91use std::sync::Arc as StdArc;
92use std::{fmt, io};
93use tracing::error;
94
95/// Unique identifier for a database
96#[derive(Debug, Copy, Clone, Eq, PartialEq, TrivialType)]
97#[repr(C)]
98pub struct DatabaseId([u8; 32]);
99
100impl Deref for DatabaseId {
101    type Target = [u8; 32];
102
103    #[inline(always)]
104    fn deref(&self) -> &Self::Target {
105        &self.0
106    }
107}
108
109impl AsRef<[u8]> for DatabaseId {
110    #[inline(always)]
111    fn as_ref(&self) -> &[u8] {
112        &self.0
113    }
114}
115
116impl DatabaseId {
117    #[inline(always)]
118    pub const fn new(bytes: [u8; 32]) -> Self {
119        Self(bytes)
120    }
121}
122
123#[derive(Default)]
124struct BlockRootHasher(u64);
125
126impl Hasher for BlockRootHasher {
127    #[inline(always)]
128    fn finish(&self) -> u64 {
129        self.0
130    }
131
132    #[inline(always)]
133    fn write(&mut self, bytes: &[u8]) {
134        let Some(state) = bytes.as_chunks().0.first().copied().map(u64::from_le_bytes) else {
135            return;
136        };
137
138        self.0 = state;
139    }
140}
141
142#[derive(Debug)]
143pub struct GenesisBlockBuilderResult<Block> {
144    /// Genesis block
145    pub block: Block,
146    /// System contracts state in the genesis block
147    pub system_contract_states: StdArc<[ContractSlotState]>,
148}
149
150/// Options for [`ClientDatabase`]
151#[derive(Debug, Copy, Clone)]
152pub struct ClientDatabaseOptions<GBB, StorageBackend> {
153    /// Write buffer size.
154    ///
155    /// Larger buffer allows buffering more async writes for improved responsiveness but requires
156    /// more RAM. Zero buffer size means all writes must be completed before returning from the
157    /// operation that triggered it. Non-zero buffer means writes can happen in the background.
158    ///
159    /// The recommended value is 5.
160    pub write_buffer_size: usize = 5,
161    /// Blocks at this depth are considered to be "confirmed" and irreversible from the consensus
162    /// perspective.
163    ///
164    /// This parameter allows establishing a final canonical order of blocks and eliminating any
165    /// potential forks at a specified depth and beyond.
166    pub confirmation_depth_k: BlockNumber,
167    /// Soft confirmation depth for blocks.
168    ///
169    /// Doesn't prevent forking on the consensus level but makes it extremely unlikely.
170    ///
171    /// This parameter determines how many blocks are retained in memory before being written to
172    /// disk. Writing discarded blocks to disk is a waste of resources, so they are retained in
173    /// memory before being soft-confirmed and written to disk for longer-term storage.
174    ///
175    /// A smaller number reduces memory usage while increasing the probability of unnecessary disk
176    /// writes. A larger number increases memory usage, while avoiding unnecessary disk writes, but
177    /// also increases the chance of recent blocks not being retained on disk in case of a crash.
178    ///
179    /// The recommended value is 3 blocks.
180    pub soft_confirmation_depth: BlockNumber = BlockNumber::new(3),
181    /// Defines how many fork tips should be maintained in total.
182    ///
183    /// As natural forks occur, there may be more than one tip in existence, with only one of them
184    /// being considered "canonical". This parameter defines how many of these tips to maintain in a
185    /// sort of LRU style cache. Tips beyond this limit that were not extended for a long time will
186    /// be pruned automatically.
187    ///
188    /// A larger number results in higher memory usage and higher complexity of pruning algorithms.
189    ///
190    /// The recommended value is 3 blocks.
191    pub max_fork_tips: NonZeroUsize = NonZeroUsize::new(3).expect("Not zero; qed"),
192    /// Max distance between fork tip and the best block.
193    ///
194    /// When forks are this deep, they will be pruned, even without reaching the `max_fork_tips`
195    /// limit. This essentially means the tip was not extended for some time, and while it is
196    /// theoretically possible for the chain to continue from this tip, the probability is so small
197    /// that it is not worth storing it.
198    ///
199    /// A larger value results in higher memory usage and higher complexity of pruning algorithms.
200    ///
201    /// The recommended value is 5 blocks.
202    pub max_fork_tip_distance: BlockNumber = BlockNumber::new(5),
203    /// Genesis block builder is responsible to create genesis block and corresponding state for
204    /// bootstrapping purposes.
205    pub genesis_block_builder: GBB,
206    /// Storage backend to use for storing and retrieving storage items
207    pub storage_backend: StorageBackend,
208}
209
210/// Options for [`ClientDatabase`]
211#[derive(Debug, Copy, Clone)]
212pub struct ClientDatabaseFormatOptions {
213    /// The number of [`AlignedPage`]s in a single page group.
214    ///
215    /// [`AlignedPage`]: crate::storage_backend::AlignedPage
216    ///
217    /// Each group always has a set of storage items with monotonically increasing sequence
218    /// numbers. The database only frees page groups for reuse when all storage items there are
219    /// no longer in use.
220    ///
221    /// A smaller number means storage can be reclaimed for reuse more quickly and higher
222    /// concurrency during restart, but must not be too small that no storage item fits within a
223    /// page group anymore. A larger number allows finding the range of sequence numbers that are
224    /// already used and where potential write interruption happened on restart more efficiently,
225    /// but will use more RAM in the process.
226    ///
227    /// The recommended size is 256 MiB unless a tiny database is used for testing purposes, where
228    /// a smaller value might work too.
229    pub page_group_size: NonZeroU32,
230    /// By default, formatting will be aborted if the database appears to be already formatted.
231    ///
232    /// Setting this option to `true` skips the check and formats the database anyway.
233    pub force: bool,
234}
235
236#[derive(Debug, thiserror::Error)]
237pub enum ClientDatabaseError {
238    /// Invalid soft confirmation depth, it must be smaller than confirmation depth k
239    #[error("Invalid soft confirmation depth, it must be smaller than confirmation depth k")]
240    InvalidSoftConfirmationDepth,
241    /// Invalid max fork tip distance, it must be smaller or equal to confirmation depth k
242    #[error("Invalid max fork tip distance, it must be smaller or equal to confirmation depth k")]
243    InvalidMaxForkTipDistance,
244    /// Storage backend has canceled read request
245    #[error("Storage backend has canceled read request")]
246    ReadRequestCancelled,
247    /// Storage backend read error
248    #[error("Storage backend read error: {error}")]
249    ReadError {
250        /// Low-level error
251        error: io::Error,
252    },
253    /// Unsupported database version
254    #[error("Unsupported database version: {database_version}")]
255    UnsupportedDatabaseVersion {
256        /// Database version
257        database_version: u8,
258    },
259    /// Page group size is too small, must be at least two pages
260    #[error("Page group size is too small ({page_group_size}), must be at least two pages")]
261    PageGroupSizeTooSmall {
262        /// Page group size in pages
263        page_group_size: u32,
264    },
265    /// Unexpected sequence number
266    #[error(
267        "Unexpected sequence number {actual} at page offset {page_offset} (expected \
268        {expected})"
269    )]
270    UnexpectedSequenceNumber {
271        /// Sequence number in the database
272        actual: u64,
273        /// Expected sequence number
274        expected: u64,
275        /// Page offset where storage item is found
276        page_offset: u32,
277    },
278    /// Unexpected storage item
279    #[error("Unexpected storage item at offset {page_offset}: {storage_item:?}")]
280    UnexpectedStorageItem {
281        /// First storage item
282        storage_item: Box<dyn fmt::Debug + Send + Sync>,
283        /// Page offset where storage item is found
284        page_offset: u32,
285    },
286    /// Invalid block
287    #[error("Invalid block at offset {page_offset}")]
288    InvalidBlock {
289        /// Page offset where storage item is found
290        page_offset: u32,
291    },
292    /// Invalid segment headers
293    #[error("Invalid segment headers at offset {page_offset}")]
294    InvalidSegmentHeaders {
295        /// Page offset where storage item is found
296        page_offset: u32,
297    },
298    /// Failed to adjust ancestor block forks
299    #[error("Failed to adjust ancestor block forks")]
300    FailedToAdjustAncestorBlockForks,
301    /// Database is not formatted yet
302    #[error("Database is not formatted yet")]
303    Unformatted,
304    /// Non-permanent first page group
305    #[error("Non-permanent first page group")]
306    NonPermanentFirstPageGroup,
307}
308
309/// Error for [`ClientDatabase::format()`]
310#[derive(Debug, thiserror::Error)]
311pub enum ClientDatabaseFormatError {
312    /// Storage backend has canceled read request
313    #[error("Storage backend has canceled read request")]
314    ReadRequestCancelled,
315    /// Storage backend read error
316    #[error("Storage backend read error: {error}")]
317    ReadError {
318        /// Low-level error
319        error: io::Error,
320    },
321    /// Failed to generate database id
322    #[error("Failed to generate database id")]
323    FailedToGenerateDatabaseId {
324        /// Low-level error
325        #[from]
326        error: OsError,
327    },
328    /// Database is already formatted yet
329    #[error("Database is already formatted yet")]
330    AlreadyFormatted,
331    /// Storage backend has canceled a writing request
332    #[error("Storage backend has canceled a writing request")]
333    WriteRequestCancelled,
334    /// Storage item write error
335    #[error("Storage item write error")]
336    StorageItemWriteError {
337        /// Low-level error
338        #[from]
339        error: io::Error,
340    },
341}
342
343#[derive(Debug, Copy, Clone)]
344struct ForkTip {
345    number: BlockNumber,
346    root: BlockRoot,
347}
348
349#[derive(Debug)]
350struct ClientDatabaseBlockInMemory<Block>
351where
352    Block: GenericOwnedBlock,
353{
354    block: Block,
355    block_details: BlockDetails,
356}
357
358enum FullBlock<'a, Block>
359where
360    Block: GenericOwnedBlock,
361{
362    InMemory(&'a Block),
363    Persisted {
364        header: &'a Block::Header,
365        write_location: WriteLocation,
366    },
367}
368
369/// Client database block contains details about the block state in the database.
370///
371/// Originally all blocks are stored in memory. Once a block is soft-confirmed (see
372/// [`ClientDatabaseOptions::soft_confirmation_depth`]), it is persisted (likely on disk). Later
373///  when it is "confirmed" fully (see [`ClientDatabaseOptions::soft_confirmation_depth`]), it
374/// becomes irreversible.
375#[derive(Debug)]
376enum ClientDatabaseBlock<Block>
377where
378    Block: GenericOwnedBlock,
379{
380    /// Block is stored in memory and wasn't persisted yet
381    InMemory(ClientDatabaseBlockInMemory<Block>),
382    /// Block was persisted (likely on disk)
383    Persisted {
384        header: Block::Header,
385        block_details: BlockDetails,
386        write_location: WriteLocation,
387    },
388    /// Block was persisted (likely on disk) and is irreversibly "confirmed" from the consensus
389    /// perspective
390    PersistedConfirmed {
391        header: Block::Header,
392        write_location: WriteLocation,
393    },
394}
395
396impl<Block> ClientDatabaseBlock<Block>
397where
398    Block: GenericOwnedBlock,
399{
400    #[inline(always)]
401    fn header(&self) -> &Block::Header {
402        match self {
403            Self::InMemory(in_memory) => in_memory.block.header(),
404            Self::Persisted { header, .. } => header,
405            Self::PersistedConfirmed { header, .. } => header,
406        }
407    }
408
409    #[inline(always)]
410    fn full_block(&self) -> FullBlock<'_, Block> {
411        match self {
412            Self::InMemory(in_memory) => FullBlock::InMemory(&in_memory.block),
413            Self::Persisted {
414                header,
415                write_location,
416                ..
417            }
418            | Self::PersistedConfirmed {
419                header,
420                write_location,
421            } => FullBlock::Persisted {
422                header,
423                write_location: *write_location,
424            },
425        }
426    }
427
428    #[inline(always)]
429    fn block_details(&self) -> Option<&BlockDetails> {
430        match self {
431            Self::InMemory(in_memory) => Some(&in_memory.block_details),
432            Self::Persisted { block_details, .. } => Some(block_details),
433            Self::PersistedConfirmed { .. } => None,
434        }
435    }
436}
437
438#[derive(Debug)]
439struct StateData<Block>
440where
441    Block: GenericOwnedBlock,
442{
443    /// Tips of forks that have no descendants.
444    ///
445    /// The current best block is at the front, the rest are in the order from most recently
446    /// updated towards the front to least recently at the back.
447    fork_tips: VecDeque<ForkTip>,
448    /// Map from block root to block number.
449    ///
450    /// Is meant to be used in conjunction with `headers` and `blocks` fields, which are indexed by
451    /// block numbers.
452    block_roots: HashMap<BlockRoot, BlockNumber, BuildHasherDefault<BlockRootHasher>>,
453    /// List of blocks with the newest at the front.
454    ///
455    /// The first element of the first entry corresponds to the best block.
456    ///
457    /// It is expected that in most block numbers there will be exactly one block, some two,
458    /// anything more than that will be very rare. The list of forks for a block number is
459    /// organized in such a way that the first entry at every block number corresponds to the
460    /// canonical version of the blockchain at any point in time.
461    ///
462    /// A position withing this data structure is called "block offset". This is an ephemeral value
463    /// and changes as new best blocks are added. Blocks at the same height are collectively called
464    /// "block forks" and the position of the block within the same block height is called
465    /// "fork offset". While fork offset `0` always corresponds to the canonical version of the
466    /// blockchain, other offsets are not guaranteed to follow any particular ordering rules.
467    blocks: VecDeque<SmallVec<[ClientDatabaseBlock<Block>; 2]>>,
468}
469
470#[derive(Debug)]
471struct SegmentHeadersCache {
472    segment_headers_cache: Vec<SegmentHeader>,
473}
474
475impl SegmentHeadersCache {
476    #[inline(always)]
477    fn last_segment_header(&self) -> Option<SegmentHeader> {
478        self.segment_headers_cache.last().cloned()
479    }
480
481    #[inline(always)]
482    fn max_segment_index(&self) -> Option<SegmentIndex> {
483        self.segment_headers_cache
484            .last()
485            .map(|segment_header| segment_header.segment_index.as_inner())
486    }
487
488    #[inline(always)]
489    fn get_segment_header(&self, segment_index: SegmentIndex) -> Option<SegmentHeader> {
490        self.segment_headers_cache
491            .get(u64::from(segment_index) as usize)
492            .copied()
493    }
494
495    /// Returns actually added segments (some might have been skipped)
496    fn add_segment_headers(
497        &mut self,
498        mut segment_headers: Vec<SegmentHeader>,
499    ) -> Result<Vec<SegmentHeader>, PersistSegmentHeadersError> {
500        self.segment_headers_cache.reserve(segment_headers.len());
501
502        let mut maybe_last_segment_index = self.max_segment_index();
503
504        if let Some(last_segment_index) = maybe_last_segment_index {
505            // Skip already stored segment headers
506            segment_headers.retain(|segment_header| {
507                segment_header.segment_index.as_inner() > last_segment_index
508            });
509        }
510
511        // Check all input segment headers to see which ones are not stored yet and verifying that
512        // segment indices are monotonically increasing
513        for segment_header in segment_headers.iter().copied() {
514            let segment_index = segment_header.segment_index();
515            match maybe_last_segment_index {
516                Some(last_segment_index) => {
517                    if segment_index != last_segment_index + SegmentIndex::ONE {
518                        return Err(PersistSegmentHeadersError::MustFollowLastSegmentIndex {
519                            segment_index,
520                            last_segment_index,
521                        });
522                    }
523
524                    self.segment_headers_cache.push(segment_header);
525                    maybe_last_segment_index.replace(segment_index);
526                }
527                None => {
528                    if segment_index != SegmentIndex::ZERO {
529                        return Err(PersistSegmentHeadersError::FirstSegmentIndexZero {
530                            segment_index,
531                        });
532                    }
533
534                    self.segment_headers_cache.push(segment_header);
535                    maybe_last_segment_index.replace(segment_index);
536                }
537            }
538        }
539
540        Ok(segment_headers)
541    }
542}
543
544// TODO: Hide implementation details
545#[derive(Debug)]
546struct State<Block, StorageBackend>
547where
548    Block: GenericOwnedBlock,
549{
550    data: StateData<Block>,
551    segment_headers_cache: SegmentHeadersCache,
552    storage_backend_adapter: AsyncRwLock<StorageBackendAdapter<StorageBackend>>,
553}
554
555impl<Block, StorageBackend> State<Block, StorageBackend>
556where
557    Block: GenericOwnedBlock,
558{
559    #[inline(always)]
560    fn best_tip(&self) -> &ForkTip {
561        self.data
562            .fork_tips
563            .front()
564            .expect("The best block is always present; qed")
565    }
566
567    #[inline(always)]
568    fn best_block(&self) -> &ClientDatabaseBlock<Block> {
569        self.data
570            .blocks
571            .front()
572            .expect("The best block is always present; qed")
573            .first()
574            .expect("The best block is always present; qed")
575    }
576}
577
578#[derive(Debug)]
579struct BlockToPersist<'a, Block>
580where
581    Block: GenericOwnedBlock,
582{
583    block_offset: usize,
584    fork_offset: usize,
585    block: &'a ClientDatabaseBlockInMemory<Block>,
586}
587
588#[derive(Debug)]
589struct PersistedBlock {
590    block_offset: usize,
591    fork_offset: usize,
592    write_location: WriteLocation,
593}
594
595#[derive(Debug)]
596struct ClientDatabaseInnerOptions {
597    confirmation_depth_k: BlockNumber,
598    soft_confirmation_depth: BlockNumber,
599    max_fork_tips: NonZeroUsize,
600    max_fork_tip_distance: BlockNumber,
601}
602
603#[derive(Debug)]
604struct Inner<Block, StorageBackend>
605where
606    Block: GenericOwnedBlock,
607{
608    state: AsyncRwLock<State<Block, StorageBackend>>,
609    options: ClientDatabaseInnerOptions,
610}
611
612/// Client database
613#[derive(Debug)]
614pub struct ClientDatabase<Block, StorageBackend>
615where
616    Block: GenericOwnedBlock,
617{
618    inner: Arc<Inner<Block, StorageBackend>>,
619}
620
621impl<Block, StorageBackend> Clone for ClientDatabase<Block, StorageBackend>
622where
623    Block: GenericOwnedBlock,
624{
625    fn clone(&self) -> Self {
626        Self {
627            inner: self.inner.clone(),
628        }
629    }
630}
631
632impl<Block, StorageBackend> Drop for ClientDatabase<Block, StorageBackend>
633where
634    Block: GenericOwnedBlock,
635{
636    fn drop(&mut self) {
637        // TODO: Persist things that were not persisted yet to reduce the data loss on shutdown
638    }
639}
640
641impl<Block, StorageBackend> ChainInfo<Block> for ClientDatabase<Block, StorageBackend>
642where
643    Block: GenericOwnedBlock,
644    StorageBackend: ClientDatabaseStorageBackend,
645{
646    #[inline]
647    fn best_root(&self) -> BlockRoot {
648        // Blocking read lock is fine because where a write lock is only taken for a short time and
649        // all other locks are read locks
650        self.inner.state.read_blocking().best_tip().root
651    }
652
653    #[inline]
654    fn best_header(&self) -> Block::Header {
655        // Blocking read lock is fine because where a write lock is only taken for a short time and
656        // all other locks are read locks
657        self.inner
658            .state
659            .read_blocking()
660            .best_block()
661            .header()
662            .clone()
663    }
664
665    #[inline]
666    fn best_header_with_details(&self) -> (Block::Header, BlockDetails) {
667        // Blocking read lock is fine because where a write lock is only taken for a short time and
668        // all other locks are read locks
669        let state = self.inner.state.read_blocking();
670        let best_block = state.best_block();
671        (
672            best_block.header().clone(),
673            best_block
674                .block_details()
675                .expect("Always present for the best block; qed")
676                .clone(),
677        )
678    }
679
680    // TODO: Add fast path when `descendant_block_root` is the best block
681    #[inline]
682    fn ancestor_header(
683        &self,
684        ancestor_block_number: BlockNumber,
685        descendant_block_root: &BlockRoot,
686    ) -> Option<Block::Header> {
687        // Blocking read lock is fine because where a write lock is only taken for a short time and
688        // all other locks are read locks
689        let state = self.inner.state.read_blocking();
690        let best_number = state.best_tip().number;
691
692        let ancestor_block_offset =
693            best_number.checked_sub(ancestor_block_number)?.as_u64() as usize;
694        let ancestor_block_candidates = state.data.blocks.get(ancestor_block_offset)?;
695
696        let descendant_block_number = *state.data.block_roots.get(descendant_block_root)?;
697        if ancestor_block_number > descendant_block_number {
698            return None;
699        }
700        let descendant_block_offset =
701            best_number.checked_sub(descendant_block_number)?.as_u64() as usize;
702
703        // Range of blocks where the first item is expected to contain a descendant
704        let mut blocks_range_iter = state
705            .data
706            .blocks
707            .iter()
708            .enumerate()
709            .skip(descendant_block_offset);
710
711        let (_offset, descendant_block_candidates) = blocks_range_iter.next()?;
712        let descendant_header = descendant_block_candidates
713            .iter()
714            .find(|block| &*block.header().header().root() == descendant_block_root)?
715            .header()
716            .header();
717
718        // If there are no forks at this level, then this is the canonical chain and ancestor
719        // block number we're looking for is the first block at the corresponding block number.
720        // Similarly, if there is just a single ancestor candidate and descendant exists, it must be
721        // the one we care about.
722        if descendant_block_candidates.len() == 1 || ancestor_block_candidates.len() == 1 {
723            return ancestor_block_candidates
724                .iter()
725                .next()
726                .map(|block| block.header().clone());
727        }
728
729        let mut parent_block_root = &descendant_header.prefix.parent_root;
730
731        // Iterate over the blocks following descendant until ancestor is reached
732        for (block_offset, parent_candidates) in blocks_range_iter {
733            let parent_header = parent_candidates
734                .iter()
735                .find(|header| &*header.header().header().root() == parent_block_root)?
736                .header();
737
738            // When header offset matches, we found the header
739            if block_offset == ancestor_block_offset {
740                return Some(parent_header.clone());
741            }
742
743            parent_block_root = &parent_header.header().prefix.parent_root;
744        }
745
746        None
747    }
748
749    #[inline]
750    fn header(&self, block_root: &BlockRoot) -> Option<Block::Header> {
751        // Blocking read lock is fine because where a write lock is only taken for a short time and
752        // all other locks are read locks
753        let state = self.inner.state.read_blocking();
754        let best_number = state.best_tip().number;
755
756        let block_number = *state.data.block_roots.get(block_root)?;
757        let block_offset = best_number.checked_sub(block_number)?.as_u64() as usize;
758        let block_candidates = state.data.blocks.get(block_offset)?;
759
760        block_candidates.iter().find_map(|block| {
761            let header = block.header();
762
763            if &*header.header().root() == block_root {
764                Some(header.clone())
765            } else {
766                None
767            }
768        })
769    }
770
771    #[inline]
772    fn header_with_details(&self, block_root: &BlockRoot) -> Option<(Block::Header, BlockDetails)> {
773        // Blocking read lock is fine because where a write lock is only taken for a short time and
774        // all other locks are read locks
775        let state = self.inner.state.read_blocking();
776        let best_number = state.best_tip().number;
777
778        let block_number = *state.data.block_roots.get(block_root)?;
779        let block_offset = best_number.checked_sub(block_number)?.as_u64() as usize;
780        let block_candidates = state.data.blocks.get(block_offset)?;
781
782        block_candidates.iter().find_map(|block| {
783            let header = block.header();
784            let block_details = block.block_details().cloned()?;
785
786            if &*header.header().root() == block_root {
787                Some((header.clone(), block_details))
788            } else {
789                None
790            }
791        })
792    }
793
794    #[inline]
795    async fn block(&self, block_root: &BlockRoot) -> Result<Block, ReadBlockError> {
796        let state = self.inner.state.read().await;
797        let best_number = state.best_tip().number;
798
799        let block_number = *state
800            .data
801            .block_roots
802            .get(block_root)
803            .ok_or(ReadBlockError::UnknownBlockRoot)?;
804        let block_offset = best_number
805            .checked_sub(block_number)
806            .expect("Known block roots always have valid block offset; qed")
807            .as_u64() as usize;
808        let block_candidates = state
809            .data
810            .blocks
811            .get(block_offset)
812            .expect("Valid block offsets always have block entries; qed");
813
814        for block_candidate in block_candidates {
815            let header = block_candidate.header();
816
817            if &*header.header().root() == block_root {
818                return match block_candidate.full_block() {
819                    FullBlock::InMemory(block) => Ok(block.clone()),
820                    FullBlock::Persisted {
821                        header,
822                        write_location,
823                    } => {
824                        let storage_backend_adapter = state.storage_backend_adapter.read().await;
825
826                        let storage_item = storage_backend_adapter
827                            .read_storage_item::<StorageItemBlock>(write_location)
828                            .await?;
829
830                        let storage_item_block = match storage_item {
831                            StorageItemBlock::Block(storage_item_block) => storage_item_block,
832                            StorageItemBlock::SegmentHeaders(_) => {
833                                return Err(ReadBlockError::StorageItemReadError {
834                                    error: io::Error::other(
835                                        "Unexpected storage item: SegmentHeaders",
836                                    ),
837                                });
838                            }
839                        };
840
841                        let StorageItemBlockBlock {
842                            header: _,
843                            body,
844                            mmr_with_block: _,
845                            system_contract_states: _,
846                        } = storage_item_block;
847
848                        Block::from_buffers(header.buffer().clone(), body)
849                            .ok_or(ReadBlockError::FailedToDecode)
850                    }
851                };
852            }
853        }
854
855        unreachable!("Known block root always has block candidate associated with it; qed")
856    }
857
858    #[inline]
859    fn last_segment_header(&self) -> Option<SegmentHeader> {
860        // Blocking read lock is fine because where a write lock is only taken for a short time and
861        // all other locks are read locks
862        let state = self.inner.state.read_blocking();
863        state.segment_headers_cache.last_segment_header()
864    }
865
866    #[inline]
867    fn max_segment_index(&self) -> Option<SegmentIndex> {
868        // Blocking read lock is fine because where a write lock is only taken for a short time and
869        // all other locks are read locks
870        let state = self.inner.state.read_blocking();
871
872        state.segment_headers_cache.max_segment_index()
873    }
874
875    #[inline]
876    fn get_segment_header(&self, segment_index: SegmentIndex) -> Option<SegmentHeader> {
877        // Blocking read lock is fine because where a write lock is only taken for a short time and
878        // all other locks are read locks
879        let state = self.inner.state.read_blocking();
880
881        state
882            .segment_headers_cache
883            .get_segment_header(segment_index)
884    }
885
886    #[inline]
887    fn segment_headers_for_block(&self, block_number: BlockNumber) -> Vec<SegmentHeader> {
888        // Blocking read lock is fine because where a write lock is only taken for a short time and
889        // all other locks are read locks
890        let state = self.inner.state.read_blocking();
891
892        let Some(last_segment_index) = state.segment_headers_cache.max_segment_index() else {
893            // Not initialized
894            return Vec::new();
895        };
896
897        // Special case for the initial segment (for beacon chain genesis block)
898        if Block::Block::SHARD_KIND == RealShardKind::BeaconChain
899            && block_number == BlockNumber::ONE
900        {
901            // If there is a segment index present, and we store monotonically increasing segment
902            // headers, then the first header exists
903            return vec![
904                state
905                    .segment_headers_cache
906                    .get_segment_header(SegmentIndex::ZERO)
907                    .expect("Segment headers are stored in monotonically increasing order; qed"),
908            ];
909        }
910
911        if last_segment_index == SegmentIndex::ZERO {
912            // Genesis segment already included in block #1
913            return Vec::new();
914        }
915
916        let mut current_segment_index = last_segment_index;
917        loop {
918            // If the current segment index present, and we store monotonically increasing segment
919            // headers, then the current segment header exists as well.
920            let current_segment_header = state
921                .segment_headers_cache
922                .get_segment_header(current_segment_index)
923                .expect("Segment headers are stored in monotonically increasing order; qed");
924
925            // The block immediately after the archived segment adding the confirmation depth
926            let target_block_number = current_segment_header.last_archived_block.number()
927                + BlockNumber::ONE
928                + self.inner.options.confirmation_depth_k;
929            if target_block_number == block_number {
930                let mut headers_for_block = vec![current_segment_header];
931
932                // Check block spanning multiple segments
933                let last_archived_block_number = current_segment_header.last_archived_block.number;
934                let mut segment_index = current_segment_index - SegmentIndex::ONE;
935
936                while let Some(segment_header) = state
937                    .segment_headers_cache
938                    .get_segment_header(segment_index)
939                {
940                    if segment_header.last_archived_block.number == last_archived_block_number {
941                        headers_for_block.insert(0, segment_header);
942                        segment_index -= SegmentIndex::ONE;
943                    } else {
944                        break;
945                    }
946                }
947
948                return headers_for_block;
949            }
950
951            // iterate segments further
952            if target_block_number > block_number {
953                // no need to check the initial segment
954                if current_segment_index > SegmentIndex::ONE {
955                    current_segment_index -= SegmentIndex::ONE
956                } else {
957                    break;
958                }
959            } else {
960                // No segment headers required
961                return Vec::new();
962            }
963        }
964
965        // No segment headers required
966        Vec::new()
967    }
968}
969
970impl<Block, StorageBackend> ChainInfoWrite<Block> for ClientDatabase<Block, StorageBackend>
971where
972    Block: GenericOwnedBlock,
973    StorageBackend: ClientDatabaseStorageBackend,
974{
975    async fn persist_block(
976        &self,
977        block: Block,
978        block_details: BlockDetails,
979    ) -> Result<(), PersistBlockError> {
980        let mut state = self.inner.state.write().await;
981        let best_number = state.best_tip().number;
982
983        let header = block.header().header();
984
985        let block_number = header.prefix.number;
986
987        if best_number == BlockNumber::ZERO && block_number != BlockNumber::ONE {
988            // Special case when syncing on top of the fresh database
989            Self::insert_first_block(&mut state.data, block, block_details);
990
991            return Ok(());
992        }
993
994        if block_number == best_number + BlockNumber::ONE {
995            return Self::insert_new_best_block(state, &self.inner, block, block_details).await;
996        }
997
998        let block_offset = best_number
999            .checked_sub(block_number)
1000            .ok_or(PersistBlockError::MissingParent)?
1001            .as_u64() as usize;
1002
1003        if block_offset >= self.inner.options.confirmation_depth_k.as_u64() as usize {
1004            return Err(PersistBlockError::OutsideAcceptableRange);
1005        }
1006
1007        let state = &mut *state;
1008
1009        let block_forks = state.data.blocks.get_mut(block_offset).ok_or_else(|| {
1010            error!(
1011                %block_number,
1012                %block_offset,
1013                "Failed to store block fork, header offset is missing despite being within \
1014                acceptable range"
1015            );
1016
1017            PersistBlockError::OutsideAcceptableRange
1018        })?;
1019
1020        for (index, fork_tip) in state.data.fork_tips.iter_mut().enumerate() {
1021            // Block's parent is no longer a fork tip, remove it
1022            if fork_tip.root == header.prefix.parent_root {
1023                state.data.fork_tips.remove(index);
1024                break;
1025            }
1026        }
1027
1028        let block_root = *header.root();
1029        // Insert at position 1, which means the most recent tip, which doesn't correspond to
1030        // the best block
1031        state.data.fork_tips.insert(
1032            1,
1033            ForkTip {
1034                number: block_number,
1035                root: block_root,
1036            },
1037        );
1038        state.data.block_roots.insert(block_root, block_number);
1039        block_forks.push(ClientDatabaseBlock::InMemory(ClientDatabaseBlockInMemory {
1040            block,
1041            block_details,
1042        }));
1043
1044        Self::prune_outdated_fork_tips(block_number, &mut state.data, &self.inner.options);
1045
1046        Ok(())
1047    }
1048
1049    async fn persist_segment_headers(
1050        &self,
1051        segment_headers: Vec<SegmentHeader>,
1052    ) -> Result<(), PersistSegmentHeadersError> {
1053        let mut state = self.inner.state.write().await;
1054
1055        let added_segment_headers = state
1056            .segment_headers_cache
1057            .add_segment_headers(segment_headers)?;
1058
1059        if added_segment_headers.is_empty() {
1060            return Ok(());
1061        }
1062
1063        // Convert write lock into upgradable read lock to allow reads, while preventing segment
1064        // headers modifications
1065        // TODO: This assumes both guarantees in https://github.com/smol-rs/async-lock/issues/100
1066        //  are satisfied. If not, blocking read locks in other places will cause issues.
1067        let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1068
1069        let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1070
1071        storage_backend_adapter
1072            .write_storage_item(StorageItemBlock::SegmentHeaders(
1073                StorageItemBlockSegmentHeaders {
1074                    segment_headers: added_segment_headers,
1075                },
1076            ))
1077            .await?;
1078
1079        Ok(())
1080    }
1081}
1082
1083impl<Block, StorageBackend> ClientDatabase<Block, StorageBackend>
1084where
1085    Block: GenericOwnedBlock,
1086    StorageBackend: ClientDatabaseStorageBackend,
1087{
1088    /// Open the existing database.
1089    ///
1090    /// NOTE: The database needs to be formatted with [`Self::format()`] before it can be used.
1091    pub async fn open<GBB>(
1092        options: ClientDatabaseOptions<GBB, StorageBackend>,
1093    ) -> Result<Self, ClientDatabaseError>
1094    where
1095        GBB: FnOnce() -> GenesisBlockBuilderResult<Block>,
1096    {
1097        let ClientDatabaseOptions {
1098            write_buffer_size,
1099            confirmation_depth_k,
1100            soft_confirmation_depth,
1101            max_fork_tips,
1102            max_fork_tip_distance,
1103            genesis_block_builder,
1104            storage_backend,
1105        } = options;
1106        if soft_confirmation_depth >= confirmation_depth_k {
1107            return Err(ClientDatabaseError::InvalidSoftConfirmationDepth);
1108        }
1109
1110        if max_fork_tip_distance > confirmation_depth_k {
1111            return Err(ClientDatabaseError::InvalidMaxForkTipDistance);
1112        }
1113
1114        let mut state_data = StateData {
1115            fork_tips: VecDeque::new(),
1116            block_roots: HashMap::default(),
1117            blocks: VecDeque::new(),
1118        };
1119        let mut segment_headers_cache = SegmentHeadersCache {
1120            segment_headers_cache: Vec::new(),
1121        };
1122
1123        let options = ClientDatabaseInnerOptions {
1124            confirmation_depth_k,
1125            soft_confirmation_depth,
1126            max_fork_tips,
1127            max_fork_tip_distance,
1128        };
1129
1130        let storage_item_handlers = StorageItemHandlers {
1131            permanent: |_arg| {
1132                // TODO
1133                Ok(())
1134            },
1135            block: |arg| {
1136                let StorageItemHandlerArg {
1137                    storage_item,
1138                    page_offset,
1139                    num_pages,
1140                } = arg;
1141                let storage_item_block = match storage_item {
1142                    StorageItemBlock::Block(storage_item_block) => storage_item_block,
1143                    StorageItemBlock::SegmentHeaders(segment_headers) => {
1144                        let num_segment_headers = segment_headers.segment_headers.len();
1145                        return match segment_headers_cache
1146                            .add_segment_headers(segment_headers.segment_headers)
1147                        {
1148                            Ok(_) => Ok(()),
1149                            Err(error) => {
1150                                error!(
1151                                    %page_offset,
1152                                    %num_segment_headers,
1153                                    %error,
1154                                    "Failed to add segment headers from storage item"
1155                                );
1156
1157                                Err(ClientDatabaseError::InvalidSegmentHeaders { page_offset })
1158                            }
1159                        };
1160                    }
1161                };
1162
1163                // TODO: It would be nice to not allocate body here since we'll not use it here
1164                //  anyway
1165                let StorageItemBlockBlock {
1166                    header,
1167                    body: _,
1168                    mmr_with_block,
1169                    system_contract_states,
1170                } = storage_item_block;
1171
1172                let header = Block::Header::from_buffer(header).map_err(|_buffer| {
1173                    error!(%page_offset, "Failed to decode block header from bytes");
1174
1175                    ClientDatabaseError::InvalidBlock { page_offset }
1176                })?;
1177
1178                let block_root = *header.header().root();
1179                let block_number = header.header().prefix.number;
1180
1181                state_data.block_roots.insert(block_root, block_number);
1182
1183                let maybe_best_number = state_data
1184                    .blocks
1185                    .front()
1186                    .and_then(|block_forks| block_forks.first())
1187                    .map(|best_block| {
1188                        // Type inference is not working here for some reason
1189                        let header: &Block::Header = best_block.header();
1190
1191                        header.header().prefix.number
1192                    });
1193
1194                let block_offset = if let Some(best_number) = maybe_best_number {
1195                    if block_number <= best_number {
1196                        (best_number - block_number).as_u64() as usize
1197                    } else {
1198                        // The new best block must follow the previous best block
1199                        if block_number - best_number != BlockNumber::ONE {
1200                            error!(
1201                                %page_offset,
1202                                %best_number,
1203                                %block_number,
1204                                "Invalid new best block number, it must be only one block \
1205                                higher than the best block"
1206                            );
1207
1208                            return Err(ClientDatabaseError::InvalidBlock { page_offset });
1209                        }
1210
1211                        state_data.blocks.push_front(SmallVec::new());
1212                        // Will insert a new block at the front
1213                        0
1214                    }
1215                } else {
1216                    state_data.blocks.push_front(SmallVec::new());
1217                    // Will insert a new block at the front
1218                    0
1219                };
1220
1221                let block_forks = match state_data.blocks.get_mut(block_offset) {
1222                    Some(block_forks) => block_forks,
1223                    None => {
1224                        // Ignore the older block, other blocks at its height were already pruned
1225                        // anyway
1226
1227                        return Ok(());
1228                    }
1229                };
1230
1231                // Push a new block to the end of the list, we'll fix it up later
1232                block_forks.push(ClientDatabaseBlock::Persisted {
1233                    header,
1234                    block_details: BlockDetails {
1235                        mmr_with_block,
1236                        system_contract_states,
1237                    },
1238                    write_location: WriteLocation {
1239                        page_offset,
1240                        num_pages,
1241                    },
1242                });
1243
1244                // If a new block was inserted, confirm a new canonical block to prune extra
1245                // in-memory information
1246                if block_offset == 0 && block_forks.len() == 1 {
1247                    Self::confirm_canonical_block(block_number, &mut state_data, &options);
1248                }
1249
1250                Ok(())
1251            },
1252        };
1253
1254        let storage_backend_adapter =
1255            StorageBackendAdapter::open(write_buffer_size, storage_item_handlers, storage_backend)
1256                .await?;
1257
1258        if let Some(best_block) = state_data.blocks.front().and_then(|block_forks| {
1259            // The best block is last in the list here because that is how it was inserted while
1260            // reading from the database
1261            block_forks.last()
1262        }) {
1263            // Type inference is not working here for some reason
1264            let header: &Block::Header = best_block.header();
1265            let header = header.header();
1266            let block_number = header.prefix.number;
1267            let block_root = *header.root();
1268
1269            if !Self::adjust_ancestor_block_forks(&mut state_data.blocks, block_root) {
1270                return Err(ClientDatabaseError::FailedToAdjustAncestorBlockForks);
1271            }
1272
1273            // Store the best block as the first and only fork tip
1274            state_data.fork_tips.push_front(ForkTip {
1275                number: block_number,
1276                root: block_root,
1277            });
1278        } else {
1279            let GenesisBlockBuilderResult {
1280                block,
1281                system_contract_states,
1282            } = genesis_block_builder();
1283
1284            // If the database is empty, initialize everything with the genesis block
1285            let header = block.header().header();
1286            let block_number = header.prefix.number;
1287            let block_root = *header.root();
1288
1289            state_data.fork_tips.push_front(ForkTip {
1290                number: block_number,
1291                root: block_root,
1292            });
1293            state_data.block_roots.insert(block_root, block_number);
1294            state_data
1295                .blocks
1296                .push_front(smallvec![ClientDatabaseBlock::InMemory(
1297                    ClientDatabaseBlockInMemory {
1298                        block,
1299                        block_details: BlockDetails {
1300                            system_contract_states,
1301                            mmr_with_block: Arc::new({
1302                                let mut mmr = BlockMerkleMountainRange::new();
1303                                mmr.add_leaf(&block_root);
1304                                mmr
1305                            })
1306                        },
1307                    }
1308                )]);
1309        }
1310
1311        let state = State {
1312            data: state_data,
1313            segment_headers_cache,
1314            storage_backend_adapter: AsyncRwLock::new(storage_backend_adapter),
1315        };
1316
1317        let inner = Inner {
1318            state: AsyncRwLock::new(state),
1319            options,
1320        };
1321
1322        Ok(Self {
1323            inner: Arc::new(inner),
1324        })
1325    }
1326
1327    /// Format a new database
1328    pub async fn format(
1329        storage_backend: &StorageBackend,
1330        options: ClientDatabaseFormatOptions,
1331    ) -> Result<(), ClientDatabaseFormatError> {
1332        StorageBackendAdapter::format(storage_backend, options).await
1333    }
1334
1335    fn insert_first_block(state: &mut StateData<Block>, block: Block, block_details: BlockDetails) {
1336        // If the database is empty, initialize everything with the genesis block
1337        let header = block.header().header();
1338        let block_number = header.prefix.number;
1339        let block_root = *header.root();
1340
1341        state.fork_tips.clear();
1342        state.fork_tips.push_front(ForkTip {
1343            number: block_number,
1344            root: block_root,
1345        });
1346        state.block_roots.clear();
1347        state.block_roots.insert(block_root, block_number);
1348        state.blocks.clear();
1349        state
1350            .blocks
1351            .push_front(smallvec![ClientDatabaseBlock::InMemory(
1352                ClientDatabaseBlockInMemory {
1353                    block,
1354                    block_details,
1355                }
1356            )]);
1357    }
1358
1359    async fn insert_new_best_block(
1360        mut state: AsyncRwLockWriteGuard<'_, State<Block, StorageBackend>>,
1361        inner: &Inner<Block, StorageBackend>,
1362        block: Block,
1363        block_details: BlockDetails,
1364    ) -> Result<(), PersistBlockError> {
1365        let header = block.header().header();
1366        let block_number = header.prefix.number;
1367        let block_root = *header.root();
1368        let parent_root = header.prefix.parent_root;
1369
1370        // Adjust the relative order of forks to ensure the first index always corresponds to
1371        // ancestors of the new best block
1372        if !Self::adjust_ancestor_block_forks(&mut state.data.blocks, parent_root) {
1373            return Err(PersistBlockError::MissingParent);
1374        }
1375
1376        // Store new block in the state
1377        {
1378            for (index, fork_tip) in state.data.fork_tips.iter_mut().enumerate() {
1379                // Block's parent is no longer a fork tip, remove it
1380                if fork_tip.root == parent_root {
1381                    state.data.fork_tips.remove(index);
1382                    break;
1383                }
1384            }
1385
1386            state.data.fork_tips.push_front(ForkTip {
1387                number: block_number,
1388                root: block_root,
1389            });
1390            state.data.block_roots.insert(block_root, block_number);
1391            state
1392                .data
1393                .blocks
1394                .push_front(smallvec![ClientDatabaseBlock::InMemory(
1395                    ClientDatabaseBlockInMemory {
1396                        block,
1397                        block_details: block_details.clone()
1398                    }
1399                )]);
1400        }
1401
1402        let options = &inner.options;
1403
1404        Self::confirm_canonical_block(block_number, &mut state.data, options);
1405        Self::prune_outdated_fork_tips(block_number, &mut state.data, options);
1406
1407        // Convert write lock into upgradable read lock to allow reads, while preventing concurrent
1408        // block modifications
1409        // TODO: This assumes both guarantees in https://github.com/smol-rs/async-lock/issues/100
1410        //  are satisfied. If not, blocking read locks in other places will cause issues.
1411        let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1412
1413        let mut blocks_to_persist = Vec::new();
1414        for block_offset in options.soft_confirmation_depth.as_u64() as usize.. {
1415            let Some(fork_blocks) = state.data.blocks.get(block_offset) else {
1416                break;
1417            };
1418
1419            let len_before = blocks_to_persist.len();
1420            fork_blocks
1421                .iter()
1422                .enumerate()
1423                .filter_map(|(fork_offset, client_database_block)| {
1424                    match client_database_block {
1425                        ClientDatabaseBlock::InMemory(block) => Some(BlockToPersist {
1426                            block_offset,
1427                            fork_offset,
1428                            block,
1429                        }),
1430                        ClientDatabaseBlock::Persisted { .. }
1431                        | ClientDatabaseBlock::PersistedConfirmed { .. } => {
1432                            // Already persisted
1433                            None
1434                        }
1435                    }
1436                })
1437                .collect_into(&mut blocks_to_persist);
1438
1439            if blocks_to_persist.len() == len_before {
1440                break;
1441            }
1442        }
1443
1444        // Persist blocks from older to newer
1445        let mut persisted_blocks = Vec::with_capacity(blocks_to_persist.len());
1446        {
1447            let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1448
1449            for block_to_persist in blocks_to_persist.into_iter().rev() {
1450                let BlockToPersist {
1451                    block_offset,
1452                    fork_offset,
1453                    block,
1454                } = block_to_persist;
1455
1456                let write_location = storage_backend_adapter
1457                    .write_storage_item(StorageItemBlock::Block(StorageItemBlockBlock {
1458                        header: block.block.header().buffer().clone(),
1459                        body: block.block.body().buffer().clone(),
1460                        mmr_with_block: Arc::clone(&block.block_details.mmr_with_block),
1461                        system_contract_states: StdArc::clone(
1462                            &block.block_details.system_contract_states,
1463                        ),
1464                    }))
1465                    .await?;
1466
1467                persisted_blocks.push(PersistedBlock {
1468                    block_offset,
1469                    fork_offset,
1470                    write_location,
1471                });
1472            }
1473        }
1474
1475        // Convert blocks to persisted
1476        let mut state = RwLockUpgradableReadGuard::upgrade(state).await;
1477        for persisted_block in persisted_blocks {
1478            let PersistedBlock {
1479                block_offset,
1480                fork_offset,
1481                write_location,
1482            } = persisted_block;
1483
1484            let block = state
1485                .data
1486                .blocks
1487                .get_mut(block_offset)
1488                .expect("Still holding the same lock since last check; qed")
1489                .get_mut(fork_offset)
1490                .expect("Still holding the same lock since last check; qed");
1491
1492            replace_with_or_abort(block, |block| {
1493                if let ClientDatabaseBlock::InMemory(in_memory) = block {
1494                    let (header, _body) = in_memory.block.split();
1495
1496                    ClientDatabaseBlock::Persisted {
1497                        header,
1498                        block_details: in_memory.block_details,
1499                        write_location,
1500                    }
1501                } else {
1502                    unreachable!("Still holding the same lock since last check; qed");
1503                }
1504            });
1505        }
1506
1507        // TODO: Prune blocks that are no longer necessary
1508        // TODO: Prune unused page groups here or elsewhere?
1509
1510        Ok(())
1511    }
1512
1513    /// Adjust the relative order of forks to ensure the first index always corresponds to
1514    /// `parent_block_root` and its ancestors.
1515    ///
1516    /// Returns `true` on success and `false` if one of the parents was not found.
1517    #[must_use]
1518    fn adjust_ancestor_block_forks(
1519        blocks: &mut VecDeque<SmallVec<[ClientDatabaseBlock<Block>; 2]>>,
1520        mut parent_block_root: BlockRoot,
1521    ) -> bool {
1522        let mut ancestor_blocks = blocks.iter_mut();
1523
1524        loop {
1525            if ancestor_blocks.len() == 1 {
1526                // Nothing left to adjust with a single fork
1527                break;
1528            }
1529
1530            let Some(parent_blocks) = ancestor_blocks.next() else {
1531                // No more parent headers present
1532                break;
1533            };
1534
1535            let Some(fork_offset_parent_block_root) =
1536                parent_blocks
1537                    .iter()
1538                    .enumerate()
1539                    .find_map(|(fork_offset, fork_block)| {
1540                        let fork_header = fork_block.header().header();
1541                        if *fork_header.root() == parent_block_root {
1542                            Some((fork_offset, fork_header.prefix.parent_root))
1543                        } else {
1544                            None
1545                        }
1546                    })
1547            else {
1548                return false;
1549            };
1550
1551            let fork_offset;
1552            (fork_offset, parent_block_root) = fork_offset_parent_block_root;
1553
1554            parent_blocks.swap(0, fork_offset);
1555        }
1556
1557        true
1558    }
1559
1560    /// Prune outdated fork tips that are too deep and have not been updated for a long time.
1561    ///
1562    /// Note that actual headers, blocks and MMRs could remain if they are currently used by
1563    /// something or were already persisted on disk. With persisted blocks specifically, RAM usage
1564    /// implications are minimal, and we wouldn't want to re-download already stored blocks in case
1565    /// they end up being necessary later.
1566    fn prune_outdated_fork_tips(
1567        best_number: BlockNumber,
1568        state: &mut StateData<Block>,
1569        options: &ClientDatabaseInnerOptions,
1570    ) {
1571        let state = &mut *state;
1572
1573        // These forks are just candidates because they will not be pruned if the reference count is
1574        // not 1, indicating they are still in use by something
1575        let mut candidate_forks_to_remove = Vec::with_capacity(options.max_fork_tips.get());
1576
1577        // Prune forks that are too far away from the best block
1578        state.fork_tips.retain(|fork_tip| {
1579            if best_number - fork_tip.number > options.max_fork_tip_distance {
1580                candidate_forks_to_remove.push(*fork_tip);
1581                false
1582            } else {
1583                true
1584            }
1585        });
1586        // Prune forks that exceed the maximum number of forks
1587        if state.fork_tips.len() > options.max_fork_tips.get() {
1588            state
1589                .fork_tips
1590                .drain(options.max_fork_tips.get()..)
1591                .collect_into(&mut candidate_forks_to_remove);
1592        }
1593
1594        // Prune all possible candidates
1595        candidate_forks_to_remove
1596            .retain(|fork_tip| !Self::prune_outdated_fork(best_number, fork_tip, state));
1597        // Return those that were not pruned back to the list of tips
1598        state.fork_tips.extend(candidate_forks_to_remove);
1599    }
1600
1601    /// Returns `true` if the tip was pruned successfully and `false` if it should be returned to
1602    /// the list of fork tips
1603    #[must_use]
1604    fn prune_outdated_fork(
1605        best_number: BlockNumber,
1606        fork_tip: &ForkTip,
1607        state: &mut StateData<Block>,
1608    ) -> bool {
1609        let block_offset = (best_number - fork_tip.number).as_u64() as usize;
1610
1611        // Prune fork top and all its ancestors that are not used
1612        let mut block_root_to_prune = fork_tip.root;
1613        let mut pruned_tip = false;
1614        for block_offset in block_offset.. {
1615            let Some(fork_blocks) = state.blocks.get_mut(block_offset) else {
1616                if !pruned_tip {
1617                    error!(
1618                        %best_number,
1619                        ?fork_tip,
1620                        block_offset,
1621                        "Block offset was not present in the database, this is an implementation \
1622                        bug #1"
1623                    );
1624                }
1625                // No forks left to prune
1626                break;
1627            };
1628
1629            if fork_blocks.len() == 1 {
1630                if !pruned_tip {
1631                    error!(
1632                        %best_number,
1633                        ?fork_tip,
1634                        block_offset,
1635                        "Block offset was not present in the database, this is an implementation \
1636                        bug #2"
1637                    );
1638                }
1639
1640                // No forks left to prune
1641                break;
1642            }
1643
1644            let Some((fork_offset, block)) = fork_blocks
1645                .iter()
1646                .enumerate()
1647                // Skip ancestor of the best block, it is certainly not a fork to be pruned
1648                .skip(1)
1649                .find(|(_fork_offset, block)| {
1650                    *block.header().header().root() == block_root_to_prune
1651                })
1652            else {
1653                if !pruned_tip {
1654                    error!(
1655                        %best_number,
1656                        ?fork_tip,
1657                        block_offset,
1658                        "Block offset was not present in the database, this is an implementation \
1659                        bug #3"
1660                    );
1661                }
1662
1663                // Nothing left to prune
1664                break;
1665            };
1666
1667            // More than one instance means something somewhere is using or depends on this block
1668            if block.header().ref_count() > 1 {
1669                break;
1670            }
1671
1672            // Blocks that are already persisted
1673            match block {
1674                ClientDatabaseBlock::InMemory(_) => {
1675                    // Prune
1676                }
1677                ClientDatabaseBlock::Persisted { .. }
1678                | ClientDatabaseBlock::PersistedConfirmed { .. } => {
1679                    // Already on disk, keep it in memory for later, but prune the tip
1680                    pruned_tip = true;
1681                    break;
1682                }
1683            }
1684
1685            state.block_roots.get_mut(&block_root_to_prune);
1686            block_root_to_prune = block.header().header().prefix.parent_root;
1687            fork_blocks.swap_remove(fork_offset);
1688
1689            pruned_tip = true;
1690        }
1691
1692        pruned_tip
1693    }
1694
1695    /// Confirm a block at confirmation depth k and prune any other blocks at the same depth with
1696    /// their descendants
1697    fn confirm_canonical_block(
1698        best_number: BlockNumber,
1699        state_data: &mut StateData<Block>,
1700        options: &ClientDatabaseInnerOptions,
1701    ) {
1702        // `+1` means it effectively confirms parent blocks instead. This is done to keep the parent
1703        // of the confirmed block with its MMR in memory due to confirmed blocks not storing their
1704        // MMRs, which might be needed for reorgs at the lowest possible depth.
1705        let block_offset = (options.confirmation_depth_k + BlockNumber::ONE).as_u64() as usize;
1706
1707        let Some(fork_blocks) = state_data.blocks.get_mut(block_offset) else {
1708            // Nothing to confirm yet
1709            return;
1710        };
1711
1712        // Mark the canonical block as confirmed
1713        {
1714            let Some(canonical_block) = fork_blocks.first_mut() else {
1715                error!(
1716                    %best_number,
1717                    block_offset,
1718                    "Have not found a canonical block to confirm, this is an implementation bug"
1719                );
1720                return;
1721            };
1722
1723            replace_with_or_abort(canonical_block, |block| match block {
1724                ClientDatabaseBlock::InMemory(_) => {
1725                    error!(
1726                        %best_number,
1727                        block_offset,
1728                        header = ?block.header(),
1729                        "Block to be confirmed must not be in memory, this is an implementation bug"
1730                    );
1731                    block
1732                }
1733                ClientDatabaseBlock::Persisted {
1734                    header,
1735                    block_details: _,
1736                    write_location,
1737                } => ClientDatabaseBlock::PersistedConfirmed {
1738                    header,
1739                    write_location,
1740                },
1741                ClientDatabaseBlock::PersistedConfirmed { .. } => {
1742                    error!(
1743                        %best_number,
1744                        block_offset,
1745                        header = ?block.header(),
1746                        "Block to be confirmed must not be confirmed yet, this is an \
1747                        implementation bug"
1748                    );
1749                    block
1750                }
1751            });
1752        }
1753
1754        // Prune the rest of the blocks and their descendants
1755        let mut block_roots_to_prune = fork_blocks
1756            .drain(1..)
1757            .map(|block| *block.header().header().root())
1758            .collect::<Vec<_>>();
1759        let mut current_block_offset = block_offset;
1760        while !block_roots_to_prune.is_empty() {
1761            // Prune fork tips (if any)
1762            state_data
1763                .fork_tips
1764                .retain(|fork_tip| !block_roots_to_prune.contains(&fork_tip.root));
1765
1766            // Prune removed block roots
1767            for block_root in &block_roots_to_prune {
1768                state_data.block_roots.remove(block_root);
1769            }
1770
1771            // Block offset for direct descendants
1772            if let Some(next_block_offset) = current_block_offset.checked_sub(1) {
1773                current_block_offset = next_block_offset;
1774            } else {
1775                // Reached the tip
1776                break;
1777            }
1778
1779            let fork_blocks = state_data
1780                .blocks
1781                .get_mut(current_block_offset)
1782                .expect("Lower block offset always exists; qed");
1783
1784            // Collect descendants of pruned blocks to prune them next
1785            block_roots_to_prune = fork_blocks
1786                .drain_filter(|block| {
1787                    let header = block.header().header();
1788
1789                    block_roots_to_prune.contains(&header.prefix.parent_root)
1790                })
1791                .map(|block| *block.header().header().root())
1792                .collect();
1793        }
1794    }
1795}