ab_client_database/
lib.rs

1//! Client database.
2//!
3//! ## High-level architecture overview
4//!
5//! The database operates on [`ClientDatabaseStorageBackend`], which is backed by [`AlignedPage`]s
6//! that can be read or written. Pages contain `StorageItem`s, one storage item can occupy one or
7//! more pages, but pages always belong to a single storage item. Pages are the smallest unit and
8//! align nicely with the hardware architecture of modern SSDs. Each page starts with a prefix that
9//! describes the contents of the page. `StorageItem` always starts at the multiple of the
10//! `u128`/16 bytes, allowing for direct memory mapping onto target data structures.
11//!
12//! [`AlignedPage`]: crate::storage_backend::AlignedPage
13//!
14//! Individual pages are grouped into page groups (configurable via [`ClientDatabaseOptions`]). Page
15//! groups can be permanent and ephemeral. Permanent page groups store information that is never
16//! going to be deleted, like segment headers. Ephemeral page groups store the majority of the
17//! information about blocks, blockchain state and other things that are being created all the time.
18//! Once information in an ephemeral page group is too old and no longer needed, it can be
19//! repurposed for a new permanent or ephemeral page group. There are different kinds of page groups
20//! defined in `PageGroupKind`, and each variant has independent sequence numbers.
21//!
22//! Page groups are append-only, there is only one active permanent and one ephemeral page group.
23//! They are appended with more pages containing storage items until there is no space to add a
24//! complete storage item, after which the next page group is started.
25//!
26//! Ephemeral page groups can be freed only when they contain 100% outdated storage items.
27//! Individual pages can't be freed.
28//!
29//! Each storage item has a sequence number and checksums that help to define the global ordering
30//! and check whether a storage item was written fully. Upon restart, the page group containing the
31//! latest storage items is found, and the latest fully written storage item is identified to
32//! reconstruct the database state.
33//!
34//! Each page group starts with a `StorageItemPageGroupHeader` storage item for easier
35//! identification.
36//!
37//! The database is typically contained in a single file (though in principle could be contained in
38//! multiple if necessary). Before the database can be used, it needs to be formatted with a
39//! specific size (it is possible to increase the size afterward) before it can be used. It is
40//! expected (but depends on the storage backend) that the whole file size is pre-allocated on disk
41//! and no writes will fail due to lack of disk space (which could be the case with a sparse file).
42
43#![expect(incomplete_features, reason = "generic_const_exprs")]
44// TODO: This feature is not actually used in this crate, but is added as a workaround for
45//  https://github.com/rust-lang/rust/issues/141492
46#![feature(generic_const_exprs)]
47#![feature(
48    default_field_values,
49    get_mut_unchecked,
50    iter_collect_into,
51    maybe_uninit_as_bytes,
52    maybe_uninit_fill,
53    maybe_uninit_slice,
54    maybe_uninit_write_slice,
55    push_mut,
56    try_blocks
57)]
58
59mod page_group;
60pub mod storage_backend;
61mod storage_backend_adapter;
62
63use crate::page_group::block::StorageItemBlock;
64use crate::page_group::block::block::StorageItemBlockBlock;
65use crate::storage_backend::ClientDatabaseStorageBackend;
66use crate::storage_backend_adapter::{
67    StorageBackendAdapter, StorageItemHandlerArg, StorageItemHandlers, WriteLocation,
68};
69use ab_client_api::{
70    BlockDetails, BlockMerkleMountainRange, ChainInfo, ChainInfoWrite, ContractSlotState,
71    PersistBlockError,
72};
73use ab_core_primitives::block::body::owned::GenericOwnedBlockBody;
74use ab_core_primitives::block::header::GenericBlockHeader;
75use ab_core_primitives::block::header::owned::GenericOwnedBlockHeader;
76use ab_core_primitives::block::owned::GenericOwnedBlock;
77use ab_core_primitives::block::{BlockNumber, BlockRoot};
78use ab_io_type::trivial_type::TrivialType;
79use async_lock::{
80    Mutex as AsyncMutex, RwLock as AsyncRwLock, RwLockUpgradableReadGuard,
81    RwLockWriteGuard as AsyncRwLockWriteGuard,
82};
83use rand_core::OsError;
84use rclite::Arc;
85use replace_with::replace_with_or_abort;
86use smallvec::{SmallVec, smallvec};
87use std::collections::{HashMap, VecDeque};
88use std::hash::{BuildHasherDefault, Hasher};
89use std::num::{NonZeroU32, NonZeroUsize};
90use std::ops::Deref;
91use std::sync::Arc as StdArc;
92use std::{fmt, io};
93use tracing::error;
94
95/// Unique identifier for a database
96#[derive(Debug, Copy, Clone, Eq, PartialEq, TrivialType)]
97#[repr(C)]
98pub struct DatabaseId([u8; 32]);
99
100impl Deref for DatabaseId {
101    type Target = [u8; 32];
102
103    #[inline(always)]
104    fn deref(&self) -> &Self::Target {
105        &self.0
106    }
107}
108
109impl AsRef<[u8]> for DatabaseId {
110    #[inline(always)]
111    fn as_ref(&self) -> &[u8] {
112        &self.0
113    }
114}
115
116impl DatabaseId {
117    #[inline(always)]
118    pub const fn new(bytes: [u8; 32]) -> Self {
119        Self(bytes)
120    }
121}
122
123#[derive(Default)]
124struct BlockRootHasher(u64);
125
126impl Hasher for BlockRootHasher {
127    #[inline(always)]
128    fn finish(&self) -> u64 {
129        self.0
130    }
131
132    #[inline(always)]
133    fn write(&mut self, bytes: &[u8]) {
134        let Some(state) = bytes.as_chunks().0.first().copied().map(u64::from_le_bytes) else {
135            return;
136        };
137
138        self.0 = state;
139    }
140}
141
142#[derive(Debug)]
143pub struct GenesisBlockBuilderResult<Block> {
144    /// Genesis block
145    pub block: Block,
146    /// System contracts state in the genesis block
147    pub system_contract_states: StdArc<[ContractSlotState]>,
148}
149
150/// Options for [`ClientDatabase`]
151#[derive(Debug, Copy, Clone)]
152pub struct ClientDatabaseOptions<GBB, StorageBackend> {
153    /// Write buffer size.
154    ///
155    /// Larger buffer allows buffering more async writes for improved responsiveness but requires
156    /// more RAM. Zero buffer size means all writes must be completed before returning from the
157    /// operation that triggered it. Non-zero buffer means writes can happen in the background.
158    ///
159    /// The recommended value is 5.
160    pub write_buffer_size: usize = 5,
161    /// Blocks at this depth are considered to be "confirmed" and irreversible from the consensus
162    /// perspective.
163    ///
164    /// This parameter allows establishing a final canonical order of blocks and eliminating any
165    /// potential forks at a specified depth and beyond.
166    pub confirmation_depth_k: BlockNumber,
167    /// Soft confirmation depth for blocks.
168    ///
169    /// Doesn't prevent forking on the consensus level but makes it extremely unlikely.
170    ///
171    /// This parameter determines how many blocks are retained in memory before being written to
172    /// disk. Writing discarded blocks to disk is a waste of resources, so they are retained in
173    /// memory before being soft-confirmed and written to disk for longer-term storage.
174    ///
175    /// A smaller number reduces memory usage while increasing the probability of unnecessary disk
176    /// writes. A larger number increases memory usage, while avoiding unnecessary disk writes, but
177    /// also increases the chance of recent blocks not being retained on disk in case of a crash.
178    ///
179    /// The recommended value is 3 blocks.
180    pub soft_confirmation_depth: BlockNumber = BlockNumber::new(3),
181    /// Defines how many fork tips should be maintained in total.
182    ///
183    /// As natural forks occur, there may be more than one tip in existence, with only one of them
184    /// being considered "canonical". This parameter defines how many of these tips to maintain in a
185    /// sort of LRU style cache. Tips beyond this limit that were not extended for a long time will
186    /// be pruned automatically.
187    ///
188    /// A larger number results in higher memory usage and higher complexity of pruning algorithms.
189    ///
190    /// The recommended value is 3 blocks.
191    pub max_fork_tips: NonZeroUsize = NonZeroUsize::new(3).expect("Not zero; qed"),
192    /// Max distance between fork tip and the best block.
193    ///
194    /// When forks are this deep, they will be pruned, even without reaching the `max_fork_tips`
195    /// limit. This essentially means the tip was not extended for some time, and while it is
196    /// theoretically possible for the chain to continue from this tip, the probability is so small
197    /// that it is not worth storing it.
198    ///
199    /// A larger value results in higher memory usage and higher complexity of pruning algorithms.
200    ///
201    /// The recommended value is 5 blocks.
202    pub max_fork_tip_distance: BlockNumber = BlockNumber::new(5),
203    /// Genesis block builder is responsible to create genesis block and corresponding state for
204    /// bootstrapping purposes.
205    pub genesis_block_builder: GBB,
206    /// Storage backend to use for storing and retrieving storage items
207    pub storage_backend: StorageBackend,
208}
209
210/// Options for [`ClientDatabase`]
211#[derive(Debug, Copy, Clone)]
212pub struct ClientDatabaseFormatOptions {
213    /// The number of [`AlignedPage`]s in a single page group.
214    ///
215    /// [`AlignedPage`]: crate::storage_backend::AlignedPage
216    ///
217    /// Each group always has a set of storage items with monotonically increasing sequence numbers.
218    /// The database only frees page groups for reuse when all storage items there are no longer in
219    /// use.
220    ///
221    /// A smaller number means storage can be reclaimed for reuse more quickly and higher
222    /// concurrency during restart, but must not be too small that no storage item fits within a
223    /// page group anymore. A larger number allows finding the range of sequence numbers that are
224    /// already used and where potential write interruption happened on restart more efficiently,
225    /// but will use more RAM in the process.
226    ///
227    /// The recommended size is 256 MiB unless a tiny database is used for testing purposes, where
228    /// a smaller value might work too.
229    pub page_group_size: NonZeroU32,
230    /// By default, formatting will be aborted if the database appears to be already formatted.
231    ///
232    /// Setting this option to `true` skips the check and formats the database anyway.
233    pub force: bool,
234}
235
236#[derive(Debug, thiserror::Error)]
237pub enum ClientDatabaseError {
238    /// Invalid soft confirmation depth, it must be smaller than confirmation depth k
239    #[error("Invalid soft confirmation depth, it must be smaller than confirmation depth k")]
240    InvalidSoftConfirmationDepth,
241    /// Invalid max fork tip distance, it must be smaller or equal to confirmation depth k
242    #[error("Invalid max fork tip distance, it must be smaller or equal to confirmation depth k")]
243    InvalidMaxForkTipDistance,
244    /// Storage backend has canceled read request
245    #[error("Storage backend has canceled read request")]
246    ReadRequestCancelled,
247    /// Storage backend read error
248    #[error("Storage backend read error: {error}")]
249    ReadError {
250        /// Low-level error
251        error: io::Error,
252    },
253    /// Unsupported database version
254    #[error("Unsupported database version: {database_version}")]
255    UnsupportedDatabaseVersion {
256        /// Database version
257        database_version: u8,
258    },
259    /// Page group size is too small, must be at least two pages
260    #[error("Page group size is too small ({page_group_size}), must be at least two pages")]
261    PageGroupSizeTooSmall {
262        /// Page group size in pages
263        page_group_size: u32,
264    },
265    /// Unexpected sequence number
266    #[error(
267        "Unexpected sequence number {actual} at page offset {page_offset} (expected \
268        {expected})"
269    )]
270    UnexpectedSequenceNumber {
271        /// Sequence number in the database
272        actual: u64,
273        /// Expected sequence number
274        expected: u64,
275        /// Page offset where storage item is found
276        page_offset: u32,
277    },
278    /// Unexpected storage item
279    #[error("Unexpected storage item at offset {page_offset}: {storage_item:?}")]
280    UnexpectedStorageItem {
281        /// First storage item
282        storage_item: Box<dyn fmt::Debug + Send + Sync>,
283        /// Page offset where storage item is found
284        page_offset: u32,
285    },
286    /// Invalid block
287    #[error("Invalid block at offset {page_offset}")]
288    InvalidBlock {
289        /// Page offset where storage item is found
290        page_offset: u32,
291    },
292    /// Failed to adjust ancestor block forks
293    #[error("Failed to adjust ancestor block forks")]
294    FailedToAdjustAncestorBlockForks,
295    /// Database is not formatted yet
296    #[error("Database is not formatted yet")]
297    Unformatted,
298    /// Non-permanent first page group
299    #[error("Non-permanent first page group")]
300    NonPermanentFirstPageGroup,
301}
302
303/// Error for [`ClientDatabase::format()`]
304#[derive(Debug, thiserror::Error)]
305pub enum ClientDatabaseFormatError {
306    /// Storage backend has canceled read request
307    #[error("Storage backend has canceled read request")]
308    ReadRequestCancelled,
309    /// Storage backend read error
310    #[error("Storage backend read error: {error}")]
311    ReadError {
312        /// Low-level error
313        error: io::Error,
314    },
315    /// Failed to generate database id
316    #[error("Failed to generate database id")]
317    FailedToGenerateDatabaseId {
318        /// Low-level error
319        #[from]
320        error: OsError,
321    },
322    /// Database is already formatted yet
323    #[error("Database is already formatted yet")]
324    AlreadyFormatted,
325    /// Storage backend has canceled a writing request
326    #[error("Storage backend has canceled a writing request")]
327    WriteRequestCancelled,
328    /// Storage item write error
329    #[error("Storage item write error")]
330    StorageItemWriteError {
331        /// Low-level error
332        #[from]
333        error: io::Error,
334    },
335}
336
337#[derive(Debug, Copy, Clone)]
338struct ForkTip {
339    number: BlockNumber,
340    root: BlockRoot,
341}
342
343#[derive(Debug)]
344struct ClientDatabaseBlockInMemory<Block>
345where
346    Block: GenericOwnedBlock,
347{
348    block: Block,
349    block_details: BlockDetails,
350}
351
352/// Client database block contains details about the block state in the database.
353///
354/// Originally all blocks are stored in memory. Once a block is soft-confirmed (see
355/// [`ClientDatabaseOptions::soft_confirmation_depth`]), it is persisted (likely on disk). Later
356///  when it is "confirmed" fully (see [`ClientDatabaseOptions::soft_confirmation_depth`]), it becomes
357/// irreversible.
358#[derive(Debug)]
359enum ClientDatabaseBlock<Block>
360where
361    Block: GenericOwnedBlock,
362{
363    /// Block is stored in memory and wasn't persisted yet
364    InMemory(ClientDatabaseBlockInMemory<Block>),
365    /// Block was persisted (likely on disk)
366    Persisted {
367        header: Block::Header,
368        block_details: BlockDetails,
369        write_location: WriteLocation,
370    },
371    /// Block was persisted (likely on disk) and is irreversibly "confirmed" from the consensus
372    /// perspective
373    PersistedConfirmed {
374        header: Block::Header,
375        #[expect(dead_code, reason = "Not used yet")]
376        write_location: WriteLocation,
377    },
378}
379
380impl<Block> ClientDatabaseBlock<Block>
381where
382    Block: GenericOwnedBlock,
383{
384    #[inline(always)]
385    fn header(&self) -> &Block::Header {
386        match self {
387            Self::InMemory(in_memory) => in_memory.block.header(),
388            Self::Persisted { header, .. } => header,
389            Self::PersistedConfirmed { header, .. } => header,
390        }
391    }
392
393    #[inline(always)]
394    fn block_details(&self) -> Option<&BlockDetails> {
395        match self {
396            Self::InMemory(in_memory) => Some(&in_memory.block_details),
397            Self::Persisted { block_details, .. } => Some(block_details),
398            Self::PersistedConfirmed { .. } => None,
399        }
400    }
401}
402
403#[derive(Debug)]
404struct State<Block>
405where
406    Block: GenericOwnedBlock,
407{
408    /// Tips of forks that have no descendants.
409    ///
410    /// The current best block is at the front, the rest are in the order from most recently updated
411    /// towards the front to least recently at the back.
412    fork_tips: VecDeque<ForkTip>,
413    /// Map from block root to block number.
414    ///
415    /// Is meant to be used in conjunction with `headers` and `blocks` fields, which are indexed by
416    /// block numbers.
417    block_roots: HashMap<BlockRoot, BlockNumber, BuildHasherDefault<BlockRootHasher>>,
418    /// List of blocks with the newest at the front.
419    ///
420    /// The first element of the first entry corresponds to the best block.
421    ///
422    /// It is expected that in most block numbers there will be exactly one block, some two,
423    /// anything more than that will be very rare. The list of forks for a block number is organized
424    /// in such a way that the first entry at every block number corresponds to the canonical
425    /// version of the blockchain at any point in time.
426    ///
427    /// A position withing this data structure is called "block offset". This is an ephemeral value
428    /// and changes as new best blocks are added. Blocks at the same height are collectively called
429    /// "block forks" and the position of the block within the same block height is called
430    /// "fork offset". While fork offset `0` always corresponds to the canonical version of the
431    /// blockchain, other offsets are not guaranteed to follow any particular ordering rules.
432    blocks: VecDeque<SmallVec<[ClientDatabaseBlock<Block>; 2]>>,
433}
434
435impl<Block> State<Block>
436where
437    Block: GenericOwnedBlock,
438{
439    #[inline(always)]
440    fn best_tip(&self) -> &ForkTip {
441        self.fork_tips
442            .front()
443            .expect("The best block is always present; qed")
444    }
445
446    #[inline(always)]
447    fn best_block(&self) -> &ClientDatabaseBlock<Block> {
448        self.blocks
449            .front()
450            .expect("The best block is always present; qed")
451            .first()
452            .expect("The best block is always present; qed")
453    }
454}
455
456#[derive(Debug)]
457struct BlockToPersist<'a, Block>
458where
459    Block: GenericOwnedBlock,
460{
461    block_offset: usize,
462    fork_offset: usize,
463    block: &'a ClientDatabaseBlockInMemory<Block>,
464}
465
466#[derive(Debug)]
467struct PersistedBlock {
468    block_offset: usize,
469    fork_offset: usize,
470    write_location: WriteLocation,
471}
472
473#[derive(Debug)]
474struct ClientDatabaseInnerOptions {
475    confirmation_depth_k: BlockNumber,
476    soft_confirmation_depth: BlockNumber,
477    max_fork_tips: NonZeroUsize,
478    max_fork_tip_distance: BlockNumber,
479}
480
481#[derive(Debug)]
482struct Inner<Block, StorageBackend>
483where
484    Block: GenericOwnedBlock,
485{
486    state: AsyncRwLock<State<Block>>,
487    storage_backend_adapter: AsyncMutex<StorageBackendAdapter>,
488    storage_backend: StorageBackend,
489    options: ClientDatabaseInnerOptions,
490}
491
492/// Client database
493#[derive(Debug)]
494pub struct ClientDatabase<Block, StorageBackend>
495where
496    Block: GenericOwnedBlock,
497{
498    inner: Arc<Inner<Block, StorageBackend>>,
499}
500
501impl<Block, StorageBackend> Clone for ClientDatabase<Block, StorageBackend>
502where
503    Block: GenericOwnedBlock,
504{
505    fn clone(&self) -> Self {
506        Self {
507            inner: self.inner.clone(),
508        }
509    }
510}
511
512impl<Block, StorageBackend> Drop for ClientDatabase<Block, StorageBackend>
513where
514    Block: GenericOwnedBlock,
515{
516    fn drop(&mut self) {
517        // TODO: Persist things that were not persisted yet to reduce the data loss on shutdown
518    }
519}
520
521impl<Block, StorageBackend> ChainInfo<Block> for ClientDatabase<Block, StorageBackend>
522where
523    Block: GenericOwnedBlock,
524    StorageBackend: ClientDatabaseStorageBackend,
525{
526    fn best_root(&self) -> BlockRoot {
527        // Blocking read lock is fine because the only place where write lock is taken is short and
528        // all other locks are read locks
529        self.inner.state.read_blocking().best_tip().root
530    }
531
532    fn best_header(&self) -> Block::Header {
533        // Blocking read lock is fine because the only place where write lock is taken is short and
534        // all other locks are read locks
535        self.inner
536            .state
537            .read_blocking()
538            .best_block()
539            .header()
540            .clone()
541    }
542
543    fn best_header_with_details(&self) -> (Block::Header, BlockDetails) {
544        // Blocking read lock is fine because the only place where write lock is taken is short and
545        // all other locks are read locks
546        let state = self.inner.state.read_blocking();
547        let best_block = state.best_block();
548        (
549            best_block.header().clone(),
550            best_block
551                .block_details()
552                .expect("Always present for the best block; qed")
553                .clone(),
554        )
555    }
556
557    fn ancestor_header(
558        &self,
559        ancestor_block_number: BlockNumber,
560        descendant_block_root: &BlockRoot,
561    ) -> Option<Block::Header> {
562        // Blocking read lock is fine because the only place where write lock is taken is short and
563        // all other locks are read locks
564        let state = self.inner.state.read_blocking();
565        let best_number = state.best_tip().number;
566
567        let ancestor_block_offset =
568            best_number.checked_sub(ancestor_block_number)?.as_u64() as usize;
569        let ancestor_block_candidates = state.blocks.get(ancestor_block_offset)?;
570
571        let descendant_block_number = *state.block_roots.get(descendant_block_root)?;
572        if ancestor_block_number >= descendant_block_number {
573            return None;
574        }
575        let descendant_block_offset =
576            best_number.checked_sub(descendant_block_number)?.as_u64() as usize;
577
578        // Range of blocks where the first item is expected to contain a descendant
579        let mut blocks_range_iter = state
580            .blocks
581            .iter()
582            .enumerate()
583            .skip(descendant_block_offset);
584
585        let (_offset, descendant_block_candidates) = blocks_range_iter.next()?;
586        let descendant_header = descendant_block_candidates
587            .iter()
588            .find(|block| &*block.header().header().root() == descendant_block_root)?
589            .header()
590            .header();
591
592        // If there are no forks at this level, then this is the canonical chain and ancestor
593        // block number we're looking for is the first block at the corresponding block number.
594        // Similarly, if there is just a single ancestor candidate and descendant exists, it must be
595        // the one we care about.
596        if descendant_block_candidates.len() == 1 || ancestor_block_candidates.len() == 1 {
597            return ancestor_block_candidates
598                .iter()
599                .next()
600                .map(|block| block.header().clone());
601        }
602
603        let mut parent_block_root = &descendant_header.prefix.parent_root;
604
605        // Iterate over the blocks following descendant until ancestor is reached
606        for (block_offset, parent_candidates) in blocks_range_iter {
607            let parent_header = parent_candidates
608                .iter()
609                .find(|header| &*header.header().header().root() == parent_block_root)?
610                .header();
611
612            // When header offset matches, we found the header
613            if block_offset == ancestor_block_offset {
614                return Some(parent_header.clone());
615            }
616
617            parent_block_root = &parent_header.header().prefix.parent_root;
618        }
619
620        None
621    }
622
623    fn header(&self, block_root: &BlockRoot) -> Option<Block::Header> {
624        // Blocking read lock is fine because the only place where write lock is taken is short and
625        // all other locks are read locks
626        let state = self.inner.state.read_blocking();
627        let best_number = state.best_tip().number;
628
629        let block_number = *state.block_roots.get(block_root)?;
630        let block_offset = best_number.checked_sub(block_number)?.as_u64() as usize;
631        let block_candidates = state.blocks.get(block_offset)?;
632
633        block_candidates.iter().find_map(|block| {
634            let header = block.header();
635
636            if &*header.header().root() == block_root {
637                Some(header.clone())
638            } else {
639                None
640            }
641        })
642    }
643
644    fn header_with_details(&self, block_root: &BlockRoot) -> Option<(Block::Header, BlockDetails)> {
645        // Blocking read lock is fine because the only place where write lock is taken is short and
646        // all other locks are read locks
647        let state = self.inner.state.read_blocking();
648        let best_number = state.best_tip().number;
649
650        let block_number = *state.block_roots.get(block_root)?;
651        let block_offset = best_number.checked_sub(block_number)?.as_u64() as usize;
652        let block_candidates = state.blocks.get(block_offset)?;
653
654        block_candidates.iter().find_map(|block| {
655            let header = block.header();
656            let block_details = block.block_details().cloned()?;
657
658            if &*header.header().root() == block_root {
659                Some((header.clone(), block_details))
660            } else {
661                None
662            }
663        })
664    }
665}
666
667impl<Block, StorageBackend> ChainInfoWrite<Block> for ClientDatabase<Block, StorageBackend>
668where
669    Block: GenericOwnedBlock,
670    StorageBackend: ClientDatabaseStorageBackend,
671{
672    async fn persist_block(
673        &self,
674        block: Block,
675        block_details: BlockDetails,
676    ) -> Result<(), PersistBlockError> {
677        let mut state = self.inner.state.write().await;
678        let best_number = state.best_tip().number;
679
680        let header = block.header().header();
681
682        let block_number = header.prefix.number;
683
684        if best_number == BlockNumber::ZERO && block_number != BlockNumber::ONE {
685            // Special case when syncing on top of the fresh database
686            Self::insert_first_block(&mut state, block, block_details);
687
688            return Ok(());
689        }
690
691        if block_number == best_number + BlockNumber::ONE {
692            return Self::insert_new_best_block(state, &self.inner, block, block_details).await;
693        }
694
695        let block_offset = best_number
696            .checked_sub(block_number)
697            .ok_or(PersistBlockError::MissingParent)?
698            .as_u64() as usize;
699
700        if block_offset >= self.inner.options.confirmation_depth_k.as_u64() as usize {
701            return Err(PersistBlockError::OutsideAcceptableRange);
702        }
703
704        let state = &mut *state;
705
706        let block_forks = state.blocks.get_mut(block_offset).ok_or_else(|| {
707            error!(
708                %block_number,
709                %block_offset,
710                "Failed to store block fork, header offset is missing despite being within \
711                acceptable range"
712            );
713
714            PersistBlockError::OutsideAcceptableRange
715        })?;
716
717        for (index, fork_tip) in state.fork_tips.iter_mut().enumerate() {
718            // Block's parent is no longer a fork tip, remove it
719            if fork_tip.root == header.prefix.parent_root {
720                state.fork_tips.remove(index);
721                break;
722            }
723        }
724
725        let block_root = *header.root();
726        // Insert at position 1, which means the most recent tip, which doesn't correspond to
727        // the best block
728        state.fork_tips.insert(
729            1,
730            ForkTip {
731                number: block_number,
732                root: block_root,
733            },
734        );
735        state.block_roots.insert(block_root, block_number);
736        block_forks.push(ClientDatabaseBlock::InMemory(ClientDatabaseBlockInMemory {
737            block,
738            block_details,
739        }));
740
741        Self::prune_outdated_fork_tips(block_number, state, &self.inner.options);
742
743        Ok(())
744    }
745}
746
747impl<Block, StorageBackend> ClientDatabase<Block, StorageBackend>
748where
749    Block: GenericOwnedBlock,
750    StorageBackend: ClientDatabaseStorageBackend,
751{
752    /// Open the existing database.
753    ///
754    /// NOTE: The database needs to be formatted with [`Self::format()`] before it can be used.
755    pub async fn open<GBB>(
756        options: ClientDatabaseOptions<GBB, StorageBackend>,
757    ) -> Result<Self, ClientDatabaseError>
758    where
759        GBB: FnOnce() -> GenesisBlockBuilderResult<Block>,
760    {
761        let ClientDatabaseOptions {
762            write_buffer_size,
763            confirmation_depth_k,
764            soft_confirmation_depth,
765            max_fork_tips,
766            max_fork_tip_distance,
767            genesis_block_builder,
768            storage_backend,
769        } = options;
770        if soft_confirmation_depth >= confirmation_depth_k {
771            return Err(ClientDatabaseError::InvalidSoftConfirmationDepth);
772        }
773
774        if max_fork_tip_distance > confirmation_depth_k {
775            return Err(ClientDatabaseError::InvalidMaxForkTipDistance);
776        }
777
778        let mut state = State {
779            fork_tips: VecDeque::new(),
780            block_roots: HashMap::default(),
781            blocks: VecDeque::new(),
782        };
783
784        let options = ClientDatabaseInnerOptions {
785            confirmation_depth_k,
786            soft_confirmation_depth,
787            max_fork_tips,
788            max_fork_tip_distance,
789        };
790
791        let storage_item_handlers = StorageItemHandlers {
792            permanent: |_arg| {
793                // TODO
794                Ok(())
795            },
796            block: |arg| {
797                let StorageItemHandlerArg {
798                    storage_item,
799                    page_offset,
800                } = arg;
801                #[expect(
802                    clippy::infallible_destructuring_match,
803                    reason = "Only a single variant for now"
804                )]
805                let storage_item_block = match storage_item {
806                    StorageItemBlock::Block(storage_item_block) => storage_item_block,
807                };
808
809                // TODO: It would be nice to not allocate body here since we'll not use it here
810                //  anyway
811                let StorageItemBlockBlock {
812                    header,
813                    body: _,
814                    mmr_with_block,
815                    system_contract_states,
816                } = storage_item_block;
817
818                let header = Block::Header::from_buffer(header).map_err(|_buffer| {
819                    error!(%page_offset, "Failed to decode block header from bytes");
820
821                    ClientDatabaseError::InvalidBlock { page_offset }
822                })?;
823
824                let block_root = *header.header().root();
825                let block_number = header.header().prefix.number;
826
827                state.block_roots.insert(block_root, block_number);
828
829                let maybe_best_number = state
830                    .blocks
831                    .front()
832                    .and_then(|block_forks| block_forks.first())
833                    .map(|best_block| {
834                        // Type inference is not working here for some reason
835                        let header: &Block::Header = best_block.header();
836
837                        header.header().prefix.number
838                    });
839
840                let block_offset = if let Some(best_number) = maybe_best_number {
841                    if block_number <= best_number {
842                        (best_number - block_number).as_u64() as usize
843                    } else {
844                        // The new best block must follow the previous best block
845                        if block_number - best_number != BlockNumber::ONE {
846                            error!(
847                                %page_offset,
848                                %best_number,
849                                %block_number,
850                                "Invalid new best block number, it must be only one block \
851                                higher than the best block"
852                            );
853
854                            return Err(ClientDatabaseError::InvalidBlock { page_offset });
855                        }
856
857                        state.blocks.push_front(SmallVec::new());
858                        // Will insert a new block at the front
859                        0
860                    }
861                } else {
862                    state.blocks.push_front(SmallVec::new());
863                    // Will insert a new block at the front
864                    0
865                };
866
867                let block_forks = match state.blocks.get_mut(block_offset) {
868                    Some(block_forks) => block_forks,
869                    None => {
870                        // Ignore the older block, other blocks at its height were already pruned
871                        // anyway
872
873                        return Ok(());
874                    }
875                };
876
877                // Push a new block to the end of the list, we'll fix it up later
878                block_forks.push(ClientDatabaseBlock::Persisted {
879                    header,
880                    block_details: BlockDetails {
881                        mmr_with_block,
882                        system_contract_states,
883                    },
884                    write_location: WriteLocation { page_offset },
885                });
886
887                // If a new block was inserted, confirm a new canonical block to prune extra
888                // in-memory information
889                if block_offset == 0 && block_forks.len() == 1 {
890                    Self::confirm_canonical_block(block_number, &mut state, &options);
891                }
892
893                Ok(())
894            },
895        };
896
897        let storage_backend_adapter =
898            StorageBackendAdapter::open(write_buffer_size, storage_item_handlers, &storage_backend)
899                .await?;
900
901        if let Some(best_block) = state.blocks.front().and_then(|block_forks| {
902            // The best block is last in the list here because that is how it was inserted while
903            // reading from the database
904            block_forks.last()
905        }) {
906            // Type inference is not working here for some reason
907            let header: &Block::Header = best_block.header();
908            let header = header.header();
909            let block_number = header.prefix.number;
910            let block_root = *header.root();
911
912            if !Self::adjust_ancestor_block_forks(&mut state.blocks, block_root) {
913                return Err(ClientDatabaseError::FailedToAdjustAncestorBlockForks);
914            }
915
916            // Store the best block as the first and only fork tip
917            state.fork_tips.push_front(ForkTip {
918                number: block_number,
919                root: block_root,
920            });
921        } else {
922            let GenesisBlockBuilderResult {
923                block,
924                system_contract_states,
925            } = genesis_block_builder();
926
927            // If the database is empty, initialize everything with the genesis block
928            let header = block.header().header();
929            let block_number = header.prefix.number;
930            let block_root = *header.root();
931
932            state.fork_tips.push_front(ForkTip {
933                number: block_number,
934                root: block_root,
935            });
936            state.block_roots.insert(block_root, block_number);
937            state
938                .blocks
939                .push_front(smallvec![ClientDatabaseBlock::InMemory(
940                    ClientDatabaseBlockInMemory {
941                        block,
942                        block_details: BlockDetails {
943                            system_contract_states,
944                            mmr_with_block: Arc::new({
945                                let mut mmr = BlockMerkleMountainRange::new();
946                                mmr.add_leaf(&block_root);
947                                mmr
948                            })
949                        },
950                    }
951                )]);
952        }
953
954        let inner = Inner {
955            state: AsyncRwLock::new(state),
956            storage_backend_adapter: AsyncMutex::new(storage_backend_adapter),
957            storage_backend,
958            options,
959        };
960
961        Ok(Self {
962            inner: Arc::new(inner),
963        })
964    }
965
966    /// Format a new database
967    pub async fn format(
968        storage_backend: &StorageBackend,
969        options: ClientDatabaseFormatOptions,
970    ) -> Result<(), ClientDatabaseFormatError> {
971        StorageBackendAdapter::format(storage_backend, options).await
972    }
973
974    fn insert_first_block(state: &mut State<Block>, block: Block, block_details: BlockDetails) {
975        // If the database is empty, initialize everything with the genesis block
976        let header = block.header().header();
977        let block_number = header.prefix.number;
978        let block_root = *header.root();
979
980        state.fork_tips.clear();
981        state.fork_tips.push_front(ForkTip {
982            number: block_number,
983            root: block_root,
984        });
985        state.block_roots.clear();
986        state.block_roots.insert(block_root, block_number);
987        state.blocks.clear();
988        state
989            .blocks
990            .push_front(smallvec![ClientDatabaseBlock::InMemory(
991                ClientDatabaseBlockInMemory {
992                    block,
993                    block_details,
994                }
995            )]);
996    }
997
998    async fn insert_new_best_block(
999        mut state: AsyncRwLockWriteGuard<'_, State<Block>>,
1000        inner: &Inner<Block, StorageBackend>,
1001        block: Block,
1002        block_details: BlockDetails,
1003    ) -> Result<(), PersistBlockError> {
1004        let header = block.header().header();
1005        let block_number = header.prefix.number;
1006        let block_root = *header.root();
1007        let parent_root = header.prefix.parent_root;
1008
1009        // Adjust the relative order of forks to ensure the first index always corresponds to
1010        // ancestors of the new best block
1011        if !Self::adjust_ancestor_block_forks(&mut state.blocks, parent_root) {
1012            return Err(PersistBlockError::MissingParent);
1013        }
1014
1015        // Store new block in the state
1016        {
1017            for (index, fork_tip) in state.fork_tips.iter_mut().enumerate() {
1018                // Block's parent is no longer a fork tip, remove it
1019                if fork_tip.root == parent_root {
1020                    state.fork_tips.remove(index);
1021                    break;
1022                }
1023            }
1024
1025            state.fork_tips.push_front(ForkTip {
1026                number: block_number,
1027                root: block_root,
1028            });
1029            state.block_roots.insert(block_root, block_number);
1030            state
1031                .blocks
1032                .push_front(smallvec![ClientDatabaseBlock::InMemory(
1033                    ClientDatabaseBlockInMemory {
1034                        block,
1035                        block_details: block_details.clone()
1036                    }
1037                )]);
1038        }
1039
1040        let options = &inner.options;
1041
1042        Self::confirm_canonical_block(block_number, &mut state, options);
1043        Self::prune_outdated_fork_tips(block_number, &mut state, options);
1044
1045        // Convert write lock into upgradable read lock to allow reads, while preventing concurrent
1046        // block modifications
1047        // TODO: This assumes both guarantees in https://github.com/smol-rs/async-lock/issues/100
1048        //  are satisfied. If not, blocking read locks in other places will cause issues.
1049        let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1050
1051        let mut blocks_to_persist = Vec::with_capacity(
1052            options
1053                .confirmation_depth_k
1054                .saturating_sub(options.soft_confirmation_depth)
1055                .as_u64() as usize,
1056        );
1057        for block_offset in options.soft_confirmation_depth.as_u64() as usize.. {
1058            let Some(fork_blocks) = state.blocks.get(block_offset) else {
1059                break;
1060            };
1061
1062            let len_before = blocks_to_persist.len();
1063            fork_blocks
1064                .iter()
1065                .enumerate()
1066                .filter_map(|(fork_offset, client_database_block)| {
1067                    match client_database_block {
1068                        ClientDatabaseBlock::InMemory(block) => Some(BlockToPersist {
1069                            block_offset,
1070                            fork_offset,
1071                            block,
1072                        }),
1073                        ClientDatabaseBlock::Persisted { .. }
1074                        | ClientDatabaseBlock::PersistedConfirmed { .. } => {
1075                            // Already persisted
1076                            None
1077                        }
1078                    }
1079                })
1080                .collect_into(&mut blocks_to_persist);
1081
1082            if blocks_to_persist.len() == len_before {
1083                break;
1084            }
1085        }
1086
1087        let mut storage_backend_adapter = inner.storage_backend_adapter.lock().await;
1088
1089        // Persist blocks from older to newer
1090        let mut persisted_blocks = Vec::with_capacity(blocks_to_persist.len());
1091        for block_to_persist in blocks_to_persist.into_iter().rev() {
1092            let BlockToPersist {
1093                block_offset,
1094                fork_offset,
1095                block,
1096            } = block_to_persist;
1097
1098            let write_location = storage_backend_adapter
1099                .write_storage_item(
1100                    &inner.storage_backend,
1101                    StorageItemBlock::Block(StorageItemBlockBlock {
1102                        header: block.block.header().buffer().clone(),
1103                        body: block.block.body().buffer().clone(),
1104                        mmr_with_block: Arc::clone(&block.block_details.mmr_with_block),
1105                        system_contract_states: StdArc::clone(
1106                            &block.block_details.system_contract_states,
1107                        ),
1108                    }),
1109                )
1110                .await?;
1111
1112            persisted_blocks.push(PersistedBlock {
1113                block_offset,
1114                fork_offset,
1115                write_location,
1116            });
1117        }
1118
1119        // Convert blocks to persisted
1120        let mut state = RwLockUpgradableReadGuard::upgrade(state).await;
1121        for persisted_block in persisted_blocks {
1122            let PersistedBlock {
1123                block_offset,
1124                fork_offset,
1125                write_location,
1126            } = persisted_block;
1127
1128            let block = state
1129                .blocks
1130                .get_mut(block_offset)
1131                .expect("Still holding the same lock since last check; qed")
1132                .get_mut(fork_offset)
1133                .expect("Still holding the same lock since last check; qed");
1134
1135            replace_with_or_abort(block, |block| {
1136                if let ClientDatabaseBlock::InMemory(in_memory) = block {
1137                    let (header, _body) = in_memory.block.split();
1138
1139                    ClientDatabaseBlock::Persisted {
1140                        header,
1141                        block_details: in_memory.block_details,
1142                        write_location,
1143                    }
1144                } else {
1145                    unreachable!("Still holding the same lock since last check; qed");
1146                }
1147            });
1148        }
1149
1150        // TODO: Prune blocks that are no longer necessary
1151        // TODO: Prune unused page groups here or elsewhere?
1152
1153        Ok(())
1154    }
1155
1156    /// Adjust the relative order of forks to ensure the first index always corresponds to
1157    /// `parent_block_root` and its ancestors.
1158    ///
1159    /// Returns `true` on success and `false` if one of the parents was not found.
1160    #[must_use]
1161    fn adjust_ancestor_block_forks(
1162        blocks: &mut VecDeque<SmallVec<[ClientDatabaseBlock<Block>; 2]>>,
1163        mut parent_block_root: BlockRoot,
1164    ) -> bool {
1165        let mut ancestor_blocks = blocks.iter_mut();
1166
1167        loop {
1168            if ancestor_blocks.len() == 1 {
1169                // Nothing left to adjust with a single fork
1170                break;
1171            }
1172
1173            let Some(parent_blocks) = ancestor_blocks.next() else {
1174                // No more parent headers present
1175                break;
1176            };
1177
1178            let Some(fork_offset_parent_block_root) =
1179                parent_blocks
1180                    .iter()
1181                    .enumerate()
1182                    .find_map(|(fork_offset, fork_block)| {
1183                        let fork_header = fork_block.header().header();
1184                        if *fork_header.root() == parent_block_root {
1185                            Some((fork_offset, fork_header.prefix.parent_root))
1186                        } else {
1187                            None
1188                        }
1189                    })
1190            else {
1191                return false;
1192            };
1193
1194            let fork_offset;
1195            (fork_offset, parent_block_root) = fork_offset_parent_block_root;
1196
1197            parent_blocks.swap(0, fork_offset);
1198        }
1199
1200        true
1201    }
1202
1203    /// Prune outdated fork tips that are too deep and have not been updated for a long time.
1204    ///
1205    /// Note that actual headers, blocks and MMRs could remain if they are currently used by
1206    /// something or were already persisted on disk. With persisted blocks specifically, RAM usage
1207    /// implications are minimal, and we wouldn't want to re-download already stored blocks in case
1208    /// they end up being necessary later.
1209    fn prune_outdated_fork_tips(
1210        best_number: BlockNumber,
1211        state: &mut State<Block>,
1212        options: &ClientDatabaseInnerOptions,
1213    ) {
1214        let state = &mut *state;
1215
1216        // These forks are just candidates because they will not be pruned if the reference count is
1217        // not 1, indicating they are still in use by something
1218        let mut candidate_forks_to_remove = Vec::with_capacity(options.max_fork_tips.get());
1219
1220        // Prune forks that are too far away from the best block
1221        state.fork_tips.retain(|fork_tip| {
1222            if best_number - fork_tip.number > options.max_fork_tip_distance {
1223                candidate_forks_to_remove.push(*fork_tip);
1224                false
1225            } else {
1226                true
1227            }
1228        });
1229        // Prune forks that exceed the maximum number of forks
1230        if state.fork_tips.len() > options.max_fork_tips.get() {
1231            state
1232                .fork_tips
1233                .drain(options.max_fork_tips.get()..)
1234                .collect_into(&mut candidate_forks_to_remove);
1235        }
1236
1237        // Prune all possible candidates
1238        candidate_forks_to_remove
1239            .retain(|fork_tip| !Self::prune_outdated_fork(best_number, fork_tip, state));
1240        // Return those that were not pruned back to the list of tips
1241        state.fork_tips.extend(candidate_forks_to_remove);
1242    }
1243
1244    /// Returns `true` if the tip was pruned successfully and `false` if it should be returned to
1245    /// the list of fork tips
1246    #[must_use]
1247    fn prune_outdated_fork(
1248        best_number: BlockNumber,
1249        fork_tip: &ForkTip,
1250        state: &mut State<Block>,
1251    ) -> bool {
1252        let block_offset = (best_number - fork_tip.number).as_u64() as usize;
1253
1254        // Prune fork top and all its ancestors that are not used
1255        let mut block_root_to_prune = fork_tip.root;
1256        let mut pruned_tip = false;
1257        for block_offset in block_offset.. {
1258            let Some(fork_blocks) = state.blocks.get_mut(block_offset) else {
1259                if !pruned_tip {
1260                    error!(
1261                        %best_number,
1262                        ?fork_tip,
1263                        block_offset,
1264                        "Block offset was not present in the database, this is an implementation \
1265                        bug #1"
1266                    );
1267                }
1268                // No forks left to prune
1269                break;
1270            };
1271
1272            if fork_blocks.len() == 1 {
1273                if !pruned_tip {
1274                    error!(
1275                        %best_number,
1276                        ?fork_tip,
1277                        block_offset,
1278                        "Block offset was not present in the database, this is an implementation \
1279                        bug #2"
1280                    );
1281                }
1282
1283                // No forks left to prune
1284                break;
1285            }
1286
1287            let Some((fork_offset, block)) = fork_blocks
1288                .iter()
1289                .enumerate()
1290                // Skip ancestor of the best block, it is certainly not a fork to be pruned
1291                .skip(1)
1292                .find(|(_fork_offset, block)| {
1293                    *block.header().header().root() == block_root_to_prune
1294                })
1295            else {
1296                if !pruned_tip {
1297                    error!(
1298                        %best_number,
1299                        ?fork_tip,
1300                        block_offset,
1301                        "Block offset was not present in the database, this is an implementation \
1302                        bug #3"
1303                    );
1304                }
1305
1306                // Nothing left to prune
1307                break;
1308            };
1309
1310            // More than one instance means something somewhere is using or depends on this block
1311            if block.header().ref_count() > 1 {
1312                break;
1313            }
1314
1315            // Blocks that are already persisted
1316            match block {
1317                ClientDatabaseBlock::InMemory(_) => {
1318                    // Prune
1319                }
1320                ClientDatabaseBlock::Persisted { .. }
1321                | ClientDatabaseBlock::PersistedConfirmed { .. } => {
1322                    // Already on disk, keep it in memory for later, but prune the tip
1323                    pruned_tip = true;
1324                    break;
1325                }
1326            }
1327
1328            state.block_roots.get_mut(&block_root_to_prune);
1329            block_root_to_prune = block.header().header().prefix.parent_root;
1330            fork_blocks.swap_remove(fork_offset);
1331
1332            pruned_tip = true;
1333        }
1334
1335        pruned_tip
1336    }
1337
1338    /// Confirm a block at confirmation depth k and prune any other blocks at the same depth with
1339    /// their descendants
1340    fn confirm_canonical_block(
1341        best_number: BlockNumber,
1342        state: &mut State<Block>,
1343        options: &ClientDatabaseInnerOptions,
1344    ) {
1345        // `+1` means it effectively confirms parent blocks instead. This is done to keep the parent
1346        // of the confirmed block with its MMR in memory due to confirmed blocks not storing their
1347        // MMRs, which might be needed for reorgs at the lowest possible depth.
1348        let Some(block_offset) =
1349            best_number.checked_sub(options.confirmation_depth_k + BlockNumber::ONE)
1350        else {
1351            // Nothing to prune yet
1352            return;
1353        };
1354        let block_offset = block_offset.as_u64() as usize;
1355
1356        let Some(fork_blocks) = state.blocks.get_mut(block_offset) else {
1357            error!(
1358                %best_number,
1359                block_offset,
1360                "Have not found fork blocks to confirm, this is an implementation bug"
1361            );
1362            return;
1363        };
1364
1365        // Mark the canonical block as confirmed
1366        {
1367            let Some(canonical_block) = fork_blocks.first_mut() else {
1368                error!(
1369                    %best_number,
1370                    block_offset,
1371                    "Have not found a canonical block to confirm, this is an implementation bug"
1372                );
1373                return;
1374            };
1375
1376            replace_with_or_abort(canonical_block, |block| match block {
1377                ClientDatabaseBlock::InMemory(_) => {
1378                    error!(
1379                        %best_number,
1380                        block_offset,
1381                        header = ?block.header(),
1382                        "Block to be confirmed must not be in memory, this is an implementation bug"
1383                    );
1384                    block
1385                }
1386                ClientDatabaseBlock::Persisted {
1387                    header,
1388                    block_details: _,
1389                    write_location,
1390                } => ClientDatabaseBlock::PersistedConfirmed {
1391                    header,
1392                    write_location,
1393                },
1394                ClientDatabaseBlock::PersistedConfirmed { .. } => {
1395                    error!(
1396                        %best_number,
1397                        block_offset,
1398                        header = ?block.header(),
1399                        "Block to be confirmed must not be confirmed yet, this is an \
1400                        implementation bug"
1401                    );
1402                    block
1403                }
1404            });
1405        }
1406
1407        // Prune the rest of the blocks and their descendants
1408        let mut block_roots_to_prune = fork_blocks
1409            .drain(1..)
1410            .map(|block| *block.header().header().root())
1411            .collect::<Vec<_>>();
1412        let mut current_block_offset = block_offset;
1413        while !block_roots_to_prune.is_empty() {
1414            // Prune fork tips (if any)
1415            state
1416                .fork_tips
1417                .retain(|fork_tip| !block_roots_to_prune.contains(&fork_tip.root));
1418
1419            // Prune removed block roots
1420            for block_root in &block_roots_to_prune {
1421                state.block_roots.remove(block_root);
1422            }
1423
1424            // Block offset for direct descendants
1425            if let Some(next_block_offset) = current_block_offset.checked_sub(1) {
1426                current_block_offset = next_block_offset;
1427            } else {
1428                // Reached the tip
1429                break;
1430            }
1431
1432            let fork_blocks = state
1433                .blocks
1434                .get_mut(current_block_offset)
1435                .expect("Lower block offset always exists; qed");
1436
1437            // Collect descendants of pruned blocks to prune them next
1438            block_roots_to_prune = fork_blocks
1439                .drain_filter(|block| {
1440                    let header = block.header().header();
1441
1442                    block_roots_to_prune.contains(&header.prefix.parent_root)
1443                })
1444                .map(|block| *block.header().header().root())
1445                .collect();
1446        }
1447    }
1448}