Skip to main content

ab_client_database/
lib.rs

1//! Client database.
2//!
3//! ## High-level architecture overview
4//!
5//! The database operates on [`ClientDatabaseStorageBackend`], which is backed by [`AlignedPage`]s
6//! that can be read or written. Pages contain `StorageItem`s, one storage item can occupy one or
7//! more pages, but pages always belong to a single storage item. Pages are the smallest unit and
8//! align nicely with the hardware architecture of modern SSDs. Each page starts with a prefix that
9//! describes the contents of the page. `StorageItem` always starts at the multiple of the
10//! `u128`/16 bytes, allowing for direct memory mapping onto target data structures.
11//!
12//! [`AlignedPage`]: crate::storage_backend::AlignedPage
13//!
14//! Individual pages are grouped into page groups (configurable via [`ClientDatabaseOptions`]). Page
15//! groups can be permanent and ephemeral. Permanent page groups store information that is never
16//! going to be deleted, like segment headers. Ephemeral page groups store the majority of the
17//! information about blocks, blockchain state and other things that are being created all the time.
18//! Once information in an ephemeral page group is too old and no longer needed, it can be
19//! repurposed for a new permanent or ephemeral page group. There are different kinds of page groups
20//! defined in `PageGroupKind`, and each variant has independent sequence numbers.
21//!
22//! Page groups are append-only, there is only one active permanent and one ephemeral page group.
23//! They are appended with more pages containing storage items until there is no space to add a
24//! complete storage item, after which the next page group is started.
25//!
26//! Ephemeral page groups can be freed only when they contain 100% outdated storage items.
27//! Individual pages can't be freed.
28//!
29//! Each storage item has a sequence number and checksums that help to define the global ordering
30//! and check whether a storage item was written fully. Upon restart, the page group containing the
31//! latest storage items is found, and the latest fully written storage item is identified to
32//! reconstruct the database state.
33//!
34//! Each page group starts with a `StorageItemPageGroupHeader` storage item for easier
35//! identification.
36//!
37//! The database is typically contained in a single file (though in principle could be contained in
38//! multiple if necessary). Before the database can be used, it needs to be formatted with a
39//! specific size (it is possible to increase the size afterward) before it can be used. It is
40//! expected (but depends on the storage backend) that the whole file size is pre-allocated on disk
41//! and no writes will fail due to lack of disk space (which could be the case with a sparse file).
42
43#![expect(incomplete_features, reason = "generic_const_exprs")]
44// TODO: This feature is not actually used in this crate, but is added as a workaround for
45//  https://github.com/rust-lang/rust/issues/141492
46#![feature(generic_const_exprs)]
47#![feature(
48    const_block_items,
49    const_convert,
50    const_trait_impl,
51    default_field_values,
52    get_mut_unchecked,
53    iter_collect_into,
54    maybe_uninit_as_bytes,
55    maybe_uninit_fill,
56    try_blocks
57)]
58
59mod page_group;
60pub mod storage_backend;
61mod storage_backend_adapter;
62
63use crate::page_group::block::StorageItemBlock;
64use crate::page_group::block::block::StorageItemBlockBlock;
65use crate::page_group::block::segment_headers::StorageItemBlockSegmentHeaders;
66use crate::storage_backend::ClientDatabaseStorageBackend;
67use crate::storage_backend_adapter::{
68    StorageBackendAdapter, StorageItemHandlerArg, StorageItemHandlers, WriteLocation,
69};
70use ab_client_api::{
71    BlockDetails, BlockMerkleMountainRange, ChainInfo, ChainInfoWrite, ContractSlotState,
72    PersistBlockError, PersistSegmentHeadersError, ReadBlockError,
73};
74use ab_core_primitives::block::body::owned::GenericOwnedBlockBody;
75use ab_core_primitives::block::header::GenericBlockHeader;
76use ab_core_primitives::block::header::owned::GenericOwnedBlockHeader;
77use ab_core_primitives::block::owned::GenericOwnedBlock;
78use ab_core_primitives::block::{BlockNumber, BlockRoot, GenericBlock};
79use ab_core_primitives::segments::{LocalSegmentIndex, SegmentHeader};
80use ab_core_primitives::shard::RealShardKind;
81use ab_io_type::trivial_type::TrivialType;
82use async_lock::{
83    RwLock as AsyncRwLock, RwLockUpgradableReadGuard, RwLockWriteGuard as AsyncRwLockWriteGuard,
84};
85use rand::rngs::SysError;
86use rclite::Arc;
87use replace_with::replace_with_or_abort;
88use smallvec::{SmallVec, smallvec};
89use std::collections::{HashMap, VecDeque};
90use std::hash::{BuildHasherDefault, Hasher};
91use std::num::{NonZeroU32, NonZeroUsize};
92use std::ops::Deref;
93use std::sync::Arc as StdArc;
94use std::{fmt, io};
95use tracing::error;
96
97/// Unique identifier for a database
98#[derive(Debug, Copy, Clone, Eq, PartialEq, TrivialType)]
99#[repr(C)]
100pub struct DatabaseId([u8; 32]);
101
102impl Deref for DatabaseId {
103    type Target = [u8; 32];
104
105    #[inline(always)]
106    fn deref(&self) -> &Self::Target {
107        &self.0
108    }
109}
110
111impl AsRef<[u8]> for DatabaseId {
112    #[inline(always)]
113    fn as_ref(&self) -> &[u8] {
114        &self.0
115    }
116}
117
118impl DatabaseId {
119    #[inline(always)]
120    pub const fn new(bytes: [u8; 32]) -> Self {
121        Self(bytes)
122    }
123}
124
125#[derive(Default)]
126struct BlockRootHasher(u64);
127
128impl Hasher for BlockRootHasher {
129    #[inline(always)]
130    fn finish(&self) -> u64 {
131        self.0
132    }
133
134    #[inline(always)]
135    fn write(&mut self, bytes: &[u8]) {
136        let Some(state) = bytes.as_chunks().0.first().copied().map(u64::from_le_bytes) else {
137            return;
138        };
139
140        self.0 = state;
141    }
142}
143
144#[derive(Debug)]
145pub struct GenesisBlockBuilderResult<Block> {
146    /// Genesis block
147    pub block: Block,
148    /// System contracts state in the genesis block
149    pub system_contract_states: StdArc<[ContractSlotState]>,
150}
151
152/// Options for [`ClientDatabase`]
153#[derive(Debug, Copy, Clone)]
154pub struct ClientDatabaseOptions<GBB, StorageBackend> {
155    /// Write buffer size.
156    ///
157    /// Larger buffer allows buffering more async writes for improved responsiveness but requires
158    /// more RAM. Zero buffer size means all writes must be completed before returning from the
159    /// operation that triggered it. Non-zero buffer means writes can happen in the background.
160    ///
161    /// The recommended value is 5.
162    pub write_buffer_size: usize = 5,
163    /// Blocks at this depth are considered to be "confirmed" and irreversible from the consensus
164    /// perspective.
165    ///
166    /// This parameter allows establishing a final canonical order of blocks and eliminating any
167    /// potential forks at a specified depth and beyond.
168    pub confirmation_depth_k: BlockNumber,
169    /// Soft confirmation depth for blocks.
170    ///
171    /// Doesn't prevent forking on the consensus level but makes it extremely unlikely.
172    ///
173    /// This parameter determines how many blocks are retained in memory before being written to
174    /// disk. Writing discarded blocks to disk is a waste of resources, so they are retained in
175    /// memory before being soft-confirmed and written to disk for longer-term storage.
176    ///
177    /// A smaller number reduces memory usage while increasing the probability of unnecessary disk
178    /// writes. A larger number increases memory usage, while avoiding unnecessary disk writes, but
179    /// also increases the chance of recent blocks not being retained on disk in case of a crash.
180    ///
181    /// The recommended value is 3 blocks.
182    pub soft_confirmation_depth: BlockNumber = BlockNumber::from(3),
183    /// Defines how many fork tips should be maintained in total.
184    ///
185    /// As natural forks occur, there may be more than one tip in existence, with only one of them
186    /// being considered "canonical". This parameter defines how many of these tips to maintain in a
187    /// sort of LRU style cache. Tips beyond this limit that were not extended for a long time will
188    /// be pruned automatically.
189    ///
190    /// A larger number results in higher memory usage and higher complexity of pruning algorithms.
191    ///
192    /// The recommended value is 3 blocks.
193    pub max_fork_tips: NonZeroUsize = NonZeroUsize::new(3).expect("Not zero; qed"),
194    /// Max distance between fork tip and the best block.
195    ///
196    /// When forks are this deep, they will be pruned, even without reaching the `max_fork_tips`
197    /// limit. This essentially means the tip was not extended for some time, and while it is
198    /// theoretically possible for the chain to continue from this tip, the probability is so small
199    /// that it is not worth storing it.
200    ///
201    /// A larger value results in higher memory usage and higher complexity of pruning algorithms.
202    ///
203    /// The recommended value is 5 blocks.
204    pub max_fork_tip_distance: BlockNumber = BlockNumber::from(5),
205    /// Genesis block builder is responsible to create genesis block and corresponding state for
206    /// bootstrapping purposes.
207    pub genesis_block_builder: GBB,
208    /// Storage backend to use for storing and retrieving storage items
209    pub storage_backend: StorageBackend,
210}
211
212/// Options for [`ClientDatabase`]
213#[derive(Debug, Copy, Clone)]
214pub struct ClientDatabaseFormatOptions {
215    /// The number of [`AlignedPage`]s in a single page group.
216    ///
217    /// [`AlignedPage`]: crate::storage_backend::AlignedPage
218    ///
219    /// Each group always has a set of storage items with monotonically increasing sequence
220    /// numbers. The database only frees page groups for reuse when all storage items there are
221    /// no longer in use.
222    ///
223    /// A smaller number means storage can be reclaimed for reuse more quickly and higher
224    /// concurrency during restart, but must not be too small that no storage item fits within a
225    /// page group anymore. A larger number allows finding the range of sequence numbers that are
226    /// already used and where potential write interruption happened on restart more efficiently,
227    /// but will use more RAM in the process.
228    ///
229    /// The recommended size is 256 MiB unless a tiny database is used for testing purposes, where
230    /// a smaller value might work too.
231    pub page_group_size: NonZeroU32,
232    /// By default, formatting will be aborted if the database appears to be already formatted.
233    ///
234    /// Setting this option to `true` skips the check and formats the database anyway.
235    pub force: bool,
236}
237
238#[derive(Debug, thiserror::Error)]
239pub enum ClientDatabaseError {
240    /// Invalid soft confirmation depth, it must be smaller than confirmation depth k
241    #[error("Invalid soft confirmation depth, it must be smaller than confirmation depth k")]
242    InvalidSoftConfirmationDepth,
243    /// Invalid max fork tip distance, it must be smaller or equal to confirmation depth k
244    #[error("Invalid max fork tip distance, it must be smaller or equal to confirmation depth k")]
245    InvalidMaxForkTipDistance,
246    /// Storage backend has canceled read request
247    #[error("Storage backend has canceled read request")]
248    ReadRequestCancelled,
249    /// Storage backend read error
250    #[error("Storage backend read error: {error}")]
251    ReadError {
252        /// Low-level error
253        error: io::Error,
254    },
255    /// Unsupported database version
256    #[error("Unsupported database version: {database_version}")]
257    UnsupportedDatabaseVersion {
258        /// Database version
259        database_version: u8,
260    },
261    /// Page group size is too small, must be at least two pages
262    #[error("Page group size is too small ({page_group_size}), must be at least two pages")]
263    PageGroupSizeTooSmall {
264        /// Page group size in pages
265        page_group_size: u32,
266    },
267    /// Unexpected sequence number
268    #[error(
269        "Unexpected sequence number {actual} at page offset {page_offset} (expected \
270        {expected})"
271    )]
272    UnexpectedSequenceNumber {
273        /// Sequence number in the database
274        actual: u64,
275        /// Expected sequence number
276        expected: u64,
277        /// Page offset where storage item is found
278        page_offset: u32,
279    },
280    /// Unexpected storage item
281    #[error("Unexpected storage item at offset {page_offset}: {storage_item:?}")]
282    UnexpectedStorageItem {
283        /// First storage item
284        storage_item: Box<dyn fmt::Debug + Send + Sync>,
285        /// Page offset where storage item is found
286        page_offset: u32,
287    },
288    /// Invalid block
289    #[error("Invalid block at offset {page_offset}")]
290    InvalidBlock {
291        /// Page offset where storage item is found
292        page_offset: u32,
293    },
294    /// Invalid segment headers
295    #[error("Invalid segment headers at offset {page_offset}")]
296    InvalidSegmentHeaders {
297        /// Page offset where storage item is found
298        page_offset: u32,
299    },
300    /// Failed to adjust ancestor block forks
301    #[error("Failed to adjust ancestor block forks")]
302    FailedToAdjustAncestorBlockForks,
303    /// Database is not formatted yet
304    #[error("Database is not formatted yet")]
305    Unformatted,
306    /// Non-permanent first page group
307    #[error("Non-permanent first page group")]
308    NonPermanentFirstPageGroup,
309}
310
311/// Error for [`ClientDatabase::format()`]
312#[derive(Debug, thiserror::Error)]
313pub enum ClientDatabaseFormatError {
314    /// Storage backend has canceled read request
315    #[error("Storage backend has canceled read request")]
316    ReadRequestCancelled,
317    /// Storage backend read error
318    #[error("Storage backend read error: {error}")]
319    ReadError {
320        /// Low-level error
321        error: io::Error,
322    },
323    /// Failed to generate database id
324    #[error("Failed to generate database id")]
325    FailedToGenerateDatabaseId {
326        /// Low-level error
327        #[from]
328        error: SysError,
329    },
330    /// Database is already formatted yet
331    #[error("Database is already formatted yet")]
332    AlreadyFormatted,
333    /// Storage backend has canceled a writing request
334    #[error("Storage backend has canceled a writing request")]
335    WriteRequestCancelled,
336    /// Storage item write error
337    #[error("Storage item write error")]
338    StorageItemWriteError {
339        /// Low-level error
340        #[from]
341        error: io::Error,
342    },
343}
344
345#[derive(Debug, Copy, Clone)]
346struct ForkTip {
347    number: BlockNumber,
348    root: BlockRoot,
349}
350
351#[derive(Debug)]
352struct ClientDatabaseBlockInMemory<Block>
353where
354    Block: GenericOwnedBlock,
355{
356    block: Block,
357    block_details: BlockDetails,
358}
359
360enum FullBlock<'a, Block>
361where
362    Block: GenericOwnedBlock,
363{
364    InMemory(&'a Block),
365    Persisted {
366        header: &'a Block::Header,
367        write_location: WriteLocation,
368    },
369}
370
371/// Client database block contains details about the block state in the database.
372///
373/// Originally all blocks are stored in memory. Once a block is soft-confirmed (see
374/// [`ClientDatabaseOptions::soft_confirmation_depth`]), it is persisted (likely on disk). Later
375///  when it is "confirmed" fully (see [`ClientDatabaseOptions::soft_confirmation_depth`]), it
376/// becomes irreversible.
377#[derive(Debug)]
378enum ClientDatabaseBlock<Block>
379where
380    Block: GenericOwnedBlock,
381{
382    /// Block is stored in memory and wasn't persisted yet
383    InMemory(ClientDatabaseBlockInMemory<Block>),
384    /// Block was persisted (likely on disk)
385    Persisted {
386        header: Block::Header,
387        block_details: BlockDetails,
388        write_location: WriteLocation,
389    },
390    /// Block was persisted (likely on disk) and is irreversibly "confirmed" from the consensus
391    /// perspective
392    PersistedConfirmed {
393        header: Block::Header,
394        write_location: WriteLocation,
395    },
396}
397
398impl<Block> ClientDatabaseBlock<Block>
399where
400    Block: GenericOwnedBlock,
401{
402    #[inline(always)]
403    fn header(&self) -> &Block::Header {
404        match self {
405            Self::InMemory(in_memory) => in_memory.block.header(),
406            Self::Persisted { header, .. } => header,
407            Self::PersistedConfirmed { header, .. } => header,
408        }
409    }
410
411    #[inline(always)]
412    fn full_block(&self) -> FullBlock<'_, Block> {
413        match self {
414            Self::InMemory(in_memory) => FullBlock::InMemory(&in_memory.block),
415            Self::Persisted {
416                header,
417                write_location,
418                ..
419            }
420            | Self::PersistedConfirmed {
421                header,
422                write_location,
423            } => FullBlock::Persisted {
424                header,
425                write_location: *write_location,
426            },
427        }
428    }
429
430    #[inline(always)]
431    fn block_details(&self) -> Option<&BlockDetails> {
432        match self {
433            Self::InMemory(in_memory) => Some(&in_memory.block_details),
434            Self::Persisted { block_details, .. } => Some(block_details),
435            Self::PersistedConfirmed { .. } => None,
436        }
437    }
438}
439
440#[derive(Debug)]
441struct StateData<Block>
442where
443    Block: GenericOwnedBlock,
444{
445    /// Tips of forks that have no descendants.
446    ///
447    /// The current best block is at the front, the rest are in the order from most recently
448    /// updated towards the front to least recently at the back.
449    fork_tips: VecDeque<ForkTip>,
450    /// Map from block root to block number.
451    ///
452    /// Is meant to be used in conjunction with `headers` and `blocks` fields, which are indexed by
453    /// block numbers.
454    block_roots: HashMap<BlockRoot, BlockNumber, BuildHasherDefault<BlockRootHasher>>,
455    /// List of blocks with the newest at the front.
456    ///
457    /// The first element of the first entry corresponds to the best block.
458    ///
459    /// It is expected that in most block numbers there will be exactly one block, some two,
460    /// anything more than that will be very rare. The list of forks for a block number is
461    /// organized in such a way that the first entry at every block number corresponds to the
462    /// canonical version of the blockchain at any point in time.
463    ///
464    /// A position withing this data structure is called "block offset". This is an ephemeral value
465    /// and changes as new best blocks are added. Blocks at the same height are collectively called
466    /// "block forks" and the position of the block within the same block height is called
467    /// "fork offset". While fork offset `0` always corresponds to the canonical version of the
468    /// blockchain, other offsets are not guaranteed to follow any particular ordering rules.
469    blocks: VecDeque<SmallVec<[ClientDatabaseBlock<Block>; 2]>>,
470}
471
472#[derive(Debug)]
473struct SegmentHeadersCache {
474    segment_headers_cache: Vec<SegmentHeader>,
475}
476
477impl SegmentHeadersCache {
478    #[inline(always)]
479    fn last_segment_header(&self) -> Option<SegmentHeader> {
480        self.segment_headers_cache.last().cloned()
481    }
482
483    #[inline(always)]
484    fn max_local_segment_index(&self) -> Option<LocalSegmentIndex> {
485        self.segment_headers_cache
486            .last()
487            .map(|segment_header| segment_header.segment_index.as_inner())
488    }
489
490    #[inline(always)]
491    fn get_segment_header(&self, local_segment_index: LocalSegmentIndex) -> Option<SegmentHeader> {
492        self.segment_headers_cache
493            .get(u64::from(local_segment_index) as usize)
494            .copied()
495    }
496
497    /// Returns actually added segments (some might have been skipped)
498    fn add_segment_headers(
499        &mut self,
500        mut segment_headers: Vec<SegmentHeader>,
501    ) -> Result<Vec<SegmentHeader>, PersistSegmentHeadersError> {
502        self.segment_headers_cache.reserve(segment_headers.len());
503
504        let mut maybe_last_local_segment_index = self.max_local_segment_index();
505
506        if let Some(last_segment_index) = maybe_last_local_segment_index {
507            // Skip already stored segment headers
508            segment_headers.retain(|segment_header| {
509                segment_header.segment_index.as_inner() > last_segment_index
510            });
511        }
512
513        // Check all input segment headers to see which ones are not stored yet and verifying that
514        // segment indices are monotonically increasing
515        for segment_header in segment_headers.iter().copied() {
516            let local_segment_index = segment_header.local_segment_index();
517            match maybe_last_local_segment_index {
518                Some(last_local_segment_index) => {
519                    if local_segment_index != last_local_segment_index + LocalSegmentIndex::ONE {
520                        return Err(PersistSegmentHeadersError::MustFollowLastSegmentIndex {
521                            local_segment_index,
522                            last_local_segment_index,
523                        });
524                    }
525
526                    self.segment_headers_cache.push(segment_header);
527                    maybe_last_local_segment_index.replace(local_segment_index);
528                }
529                None => {
530                    if local_segment_index != LocalSegmentIndex::ZERO {
531                        return Err(PersistSegmentHeadersError::FirstSegmentIndexZero {
532                            local_segment_index,
533                        });
534                    }
535
536                    self.segment_headers_cache.push(segment_header);
537                    maybe_last_local_segment_index.replace(local_segment_index);
538                }
539            }
540        }
541
542        Ok(segment_headers)
543    }
544}
545
546// TODO: Hide implementation details
547#[derive(Debug)]
548struct State<Block, StorageBackend>
549where
550    Block: GenericOwnedBlock,
551{
552    data: StateData<Block>,
553    segment_headers_cache: SegmentHeadersCache,
554    storage_backend_adapter: AsyncRwLock<StorageBackendAdapter<StorageBackend>>,
555}
556
557impl<Block, StorageBackend> State<Block, StorageBackend>
558where
559    Block: GenericOwnedBlock,
560{
561    #[inline(always)]
562    fn best_tip(&self) -> &ForkTip {
563        self.data
564            .fork_tips
565            .front()
566            .expect("The best block is always present; qed")
567    }
568
569    #[inline(always)]
570    fn best_block(&self) -> &ClientDatabaseBlock<Block> {
571        self.data
572            .blocks
573            .front()
574            .expect("The best block is always present; qed")
575            .first()
576            .expect("The best block is always present; qed")
577    }
578}
579
580#[derive(Debug)]
581struct BlockToPersist<'a, Block>
582where
583    Block: GenericOwnedBlock,
584{
585    block_offset: usize,
586    fork_offset: usize,
587    block: &'a ClientDatabaseBlockInMemory<Block>,
588}
589
590#[derive(Debug)]
591struct PersistedBlock {
592    block_offset: usize,
593    fork_offset: usize,
594    write_location: WriteLocation,
595}
596
597#[derive(Debug)]
598struct ClientDatabaseInnerOptions {
599    confirmation_depth_k: BlockNumber,
600    soft_confirmation_depth: BlockNumber,
601    max_fork_tips: NonZeroUsize,
602    max_fork_tip_distance: BlockNumber,
603}
604
605#[derive(Debug)]
606struct Inner<Block, StorageBackend>
607where
608    Block: GenericOwnedBlock,
609{
610    state: AsyncRwLock<State<Block, StorageBackend>>,
611    options: ClientDatabaseInnerOptions,
612}
613
614/// Client database
615#[derive(Debug)]
616pub struct ClientDatabase<Block, StorageBackend>
617where
618    Block: GenericOwnedBlock,
619{
620    inner: Arc<Inner<Block, StorageBackend>>,
621}
622
623impl<Block, StorageBackend> Clone for ClientDatabase<Block, StorageBackend>
624where
625    Block: GenericOwnedBlock,
626{
627    fn clone(&self) -> Self {
628        Self {
629            inner: self.inner.clone(),
630        }
631    }
632}
633
634impl<Block, StorageBackend> Drop for ClientDatabase<Block, StorageBackend>
635where
636    Block: GenericOwnedBlock,
637{
638    fn drop(&mut self) {
639        // TODO: Persist things that were not persisted yet to reduce the data loss on shutdown
640    }
641}
642
643impl<Block, StorageBackend> ChainInfo<Block> for ClientDatabase<Block, StorageBackend>
644where
645    Block: GenericOwnedBlock,
646    StorageBackend: ClientDatabaseStorageBackend,
647{
648    #[inline]
649    fn best_root(&self) -> BlockRoot {
650        // Blocking read lock is fine because where a write lock is only taken for a short time and
651        // all other locks are read locks
652        self.inner.state.read_blocking().best_tip().root
653    }
654
655    #[inline]
656    fn best_header(&self) -> Block::Header {
657        // Blocking read lock is fine because where a write lock is only taken for a short time and
658        // all other locks are read locks
659        self.inner
660            .state
661            .read_blocking()
662            .best_block()
663            .header()
664            .clone()
665    }
666
667    #[inline]
668    fn best_header_with_details(&self) -> (Block::Header, BlockDetails) {
669        // Blocking read lock is fine because where a write lock is only taken for a short time and
670        // all other locks are read locks
671        let state = self.inner.state.read_blocking();
672        let best_block = state.best_block();
673        (
674            best_block.header().clone(),
675            best_block
676                .block_details()
677                .expect("Always present for the best block; qed")
678                .clone(),
679        )
680    }
681
682    // TODO: Add fast path when `descendant_block_root` is the best block
683    #[inline]
684    fn ancestor_header(
685        &self,
686        ancestor_block_number: BlockNumber,
687        descendant_block_root: &BlockRoot,
688    ) -> Option<Block::Header> {
689        // Blocking read lock is fine because where a write lock is only taken for a short time and
690        // all other locks are read locks
691        let state = self.inner.state.read_blocking();
692        let best_number = state.best_tip().number;
693
694        let ancestor_block_offset =
695            u64::from(best_number.checked_sub(ancestor_block_number)?) as usize;
696        let ancestor_block_candidates = state.data.blocks.get(ancestor_block_offset)?;
697
698        let descendant_block_number = *state.data.block_roots.get(descendant_block_root)?;
699        if ancestor_block_number > descendant_block_number {
700            return None;
701        }
702        let descendant_block_offset =
703            u64::from(best_number.checked_sub(descendant_block_number)?) as usize;
704
705        // Range of blocks where the first item is expected to contain a descendant
706        let mut blocks_range_iter = state
707            .data
708            .blocks
709            .iter()
710            .enumerate()
711            .skip(descendant_block_offset);
712
713        let (_offset, descendant_block_candidates) = blocks_range_iter.next()?;
714        let descendant_header = descendant_block_candidates
715            .iter()
716            .find(|block| &*block.header().header().root() == descendant_block_root)?
717            .header()
718            .header();
719
720        // If there are no forks at this level, then this is the canonical chain and ancestor
721        // block number we're looking for is the first block at the corresponding block number.
722        // Similarly, if there is just a single ancestor candidate and descendant exists, it must be
723        // the one we care about.
724        if descendant_block_candidates.len() == 1 || ancestor_block_candidates.len() == 1 {
725            return ancestor_block_candidates
726                .iter()
727                .next()
728                .map(|block| block.header().clone());
729        }
730
731        let mut parent_block_root = &descendant_header.prefix.parent_root;
732
733        // Iterate over the blocks following descendant until ancestor is reached
734        for (block_offset, parent_candidates) in blocks_range_iter {
735            let parent_header = parent_candidates
736                .iter()
737                .find(|header| &*header.header().header().root() == parent_block_root)?
738                .header();
739
740            // When header offset matches, we found the header
741            if block_offset == ancestor_block_offset {
742                return Some(parent_header.clone());
743            }
744
745            parent_block_root = &parent_header.header().prefix.parent_root;
746        }
747
748        None
749    }
750
751    #[inline]
752    fn header(&self, block_root: &BlockRoot) -> Option<Block::Header> {
753        // Blocking read lock is fine because where a write lock is only taken for a short time and
754        // all other locks are read locks
755        let state = self.inner.state.read_blocking();
756        let best_number = state.best_tip().number;
757
758        let block_number = *state.data.block_roots.get(block_root)?;
759        let block_offset = u64::from(best_number.checked_sub(block_number)?) as usize;
760        let block_candidates = state.data.blocks.get(block_offset)?;
761
762        block_candidates.iter().find_map(|block| {
763            let header = block.header();
764
765            if &*header.header().root() == block_root {
766                Some(header.clone())
767            } else {
768                None
769            }
770        })
771    }
772
773    #[inline]
774    fn header_with_details(&self, block_root: &BlockRoot) -> Option<(Block::Header, BlockDetails)> {
775        // Blocking read lock is fine because where a write lock is only taken for a short time and
776        // all other locks are read locks
777        let state = self.inner.state.read_blocking();
778        let best_number = state.best_tip().number;
779
780        let block_number = *state.data.block_roots.get(block_root)?;
781        let block_offset = u64::from(best_number.checked_sub(block_number)?) as usize;
782        let block_candidates = state.data.blocks.get(block_offset)?;
783
784        block_candidates.iter().find_map(|block| {
785            let header = block.header();
786            let block_details = block.block_details().cloned()?;
787
788            if &*header.header().root() == block_root {
789                Some((header.clone(), block_details))
790            } else {
791                None
792            }
793        })
794    }
795
796    #[inline]
797    async fn block(&self, block_root: &BlockRoot) -> Result<Block, ReadBlockError> {
798        let state = self.inner.state.read().await;
799        let best_number = state.best_tip().number;
800
801        let block_number = *state
802            .data
803            .block_roots
804            .get(block_root)
805            .ok_or(ReadBlockError::UnknownBlockRoot)?;
806        let block_offset = u64::from(
807            best_number
808                .checked_sub(block_number)
809                .expect("Known block roots always have valid block offset; qed"),
810        ) as usize;
811        let block_candidates = state
812            .data
813            .blocks
814            .get(block_offset)
815            .expect("Valid block offsets always have block entries; qed");
816
817        for block_candidate in block_candidates {
818            let header = block_candidate.header();
819
820            if &*header.header().root() == block_root {
821                return match block_candidate.full_block() {
822                    FullBlock::InMemory(block) => Ok(block.clone()),
823                    FullBlock::Persisted {
824                        header,
825                        write_location,
826                    } => {
827                        let storage_backend_adapter = state.storage_backend_adapter.read().await;
828
829                        let storage_item = storage_backend_adapter
830                            .read_storage_item::<StorageItemBlock>(write_location)
831                            .await?;
832
833                        let storage_item_block = match storage_item {
834                            StorageItemBlock::Block(storage_item_block) => storage_item_block,
835                            StorageItemBlock::SegmentHeaders(_) => {
836                                return Err(ReadBlockError::StorageItemReadError {
837                                    error: io::Error::other(
838                                        "Unexpected storage item: SegmentHeaders",
839                                    ),
840                                });
841                            }
842                        };
843
844                        let StorageItemBlockBlock {
845                            header: _,
846                            body,
847                            mmr_with_block: _,
848                            system_contract_states: _,
849                        } = storage_item_block;
850
851                        Block::from_buffers(header.buffer().clone(), body)
852                            .ok_or(ReadBlockError::FailedToDecode)
853                    }
854                };
855            }
856        }
857
858        unreachable!("Known block root always has block candidate associated with it; qed")
859    }
860
861    #[inline]
862    fn last_segment_header(&self) -> Option<SegmentHeader> {
863        // Blocking read lock is fine because where a write lock is only taken for a short time and
864        // all other locks are read locks
865        let state = self.inner.state.read_blocking();
866        state.segment_headers_cache.last_segment_header()
867    }
868
869    #[inline]
870    fn max_local_segment_index(&self) -> Option<LocalSegmentIndex> {
871        // Blocking read lock is fine because where a write lock is only taken for a short time and
872        // all other locks are read locks
873        let state = self.inner.state.read_blocking();
874
875        state.segment_headers_cache.max_local_segment_index()
876    }
877
878    #[inline]
879    fn get_segment_header(&self, segment_index: LocalSegmentIndex) -> Option<SegmentHeader> {
880        // Blocking read lock is fine because where a write lock is only taken for a short time and
881        // all other locks are read locks
882        let state = self.inner.state.read_blocking();
883
884        state
885            .segment_headers_cache
886            .get_segment_header(segment_index)
887    }
888
889    #[inline]
890    fn segment_headers_for_block(&self, block_number: BlockNumber) -> Vec<SegmentHeader> {
891        // Blocking read lock is fine because where a write lock is only taken for a short time and
892        // all other locks are read locks
893        let state = self.inner.state.read_blocking();
894
895        let Some(last_local_segment_index) = state.segment_headers_cache.max_local_segment_index()
896        else {
897            // Not initialized
898            return Vec::new();
899        };
900
901        // Special case for the initial segment (for beacon chain genesis block)
902        if Block::Block::SHARD_KIND == RealShardKind::BeaconChain
903            && block_number == BlockNumber::ONE
904        {
905            // If there is a segment index present, and we store monotonically increasing segment
906            // headers, then the first header exists
907            return vec![
908                state
909                    .segment_headers_cache
910                    .get_segment_header(LocalSegmentIndex::ZERO)
911                    .expect("Segment headers are stored in monotonically increasing order; qed"),
912            ];
913        }
914
915        if last_local_segment_index == LocalSegmentIndex::ZERO {
916            // Genesis segment already included in block #1
917            return Vec::new();
918        }
919
920        let mut current_local_segment_index = last_local_segment_index;
921        loop {
922            // If the current segment index present, and we store monotonically increasing segment
923            // headers, then the current segment header exists as well.
924            let current_segment_header = state
925                .segment_headers_cache
926                .get_segment_header(current_local_segment_index)
927                .expect("Segment headers are stored in monotonically increasing order; qed");
928
929            // The block immediately after the archived segment adding the confirmation depth
930            let target_block_number = current_segment_header.last_archived_block.number()
931                + BlockNumber::ONE
932                + self.inner.options.confirmation_depth_k;
933            if target_block_number == block_number {
934                let mut headers_for_block = vec![current_segment_header];
935
936                // Check block spanning multiple segments
937                let last_archived_block_number = current_segment_header.last_archived_block.number;
938                let mut local_segment_index = current_local_segment_index - LocalSegmentIndex::ONE;
939
940                while let Some(segment_header) = state
941                    .segment_headers_cache
942                    .get_segment_header(local_segment_index)
943                {
944                    if segment_header.last_archived_block.number == last_archived_block_number {
945                        headers_for_block.insert(0, segment_header);
946                        local_segment_index -= LocalSegmentIndex::ONE;
947                    } else {
948                        break;
949                    }
950                }
951
952                return headers_for_block;
953            }
954
955            // iterate segments further
956            if target_block_number > block_number {
957                // no need to check the initial segment
958                if current_local_segment_index > LocalSegmentIndex::ONE {
959                    current_local_segment_index -= LocalSegmentIndex::ONE
960                } else {
961                    break;
962                }
963            } else {
964                // No segment headers required
965                return Vec::new();
966            }
967        }
968
969        // No segment headers required
970        Vec::new()
971    }
972}
973
974impl<Block, StorageBackend> ChainInfoWrite<Block> for ClientDatabase<Block, StorageBackend>
975where
976    Block: GenericOwnedBlock,
977    StorageBackend: ClientDatabaseStorageBackend,
978{
979    async fn persist_block(
980        &self,
981        block: Block,
982        block_details: BlockDetails,
983    ) -> Result<(), PersistBlockError> {
984        let mut state = self.inner.state.write().await;
985        let best_number = state.best_tip().number;
986
987        let header = block.header().header();
988
989        let block_number = header.prefix.number;
990
991        if best_number == BlockNumber::ZERO && block_number != BlockNumber::ONE {
992            // Special case when syncing on top of the fresh database
993            Self::insert_first_block(&mut state.data, block, block_details);
994
995            return Ok(());
996        }
997
998        if block_number == best_number + BlockNumber::ONE {
999            return Self::insert_new_best_block(state, &self.inner, block, block_details).await;
1000        }
1001
1002        let block_offset = u64::from(
1003            best_number
1004                .checked_sub(block_number)
1005                .ok_or(PersistBlockError::MissingParent)?,
1006        ) as usize;
1007
1008        if block_offset >= u64::from(self.inner.options.confirmation_depth_k) as usize {
1009            return Err(PersistBlockError::OutsideAcceptableRange);
1010        }
1011
1012        let state = &mut *state;
1013
1014        let block_forks = state.data.blocks.get_mut(block_offset).ok_or_else(|| {
1015            error!(
1016                %block_number,
1017                %block_offset,
1018                "Failed to store block fork, header offset is missing despite being within \
1019                acceptable range"
1020            );
1021
1022            PersistBlockError::OutsideAcceptableRange
1023        })?;
1024
1025        for (index, fork_tip) in state.data.fork_tips.iter_mut().enumerate() {
1026            // Block's parent is no longer a fork tip, remove it
1027            if fork_tip.root == header.prefix.parent_root {
1028                state.data.fork_tips.remove(index);
1029                break;
1030            }
1031        }
1032
1033        let block_root = *header.root();
1034        // Insert at position 1, which means the most recent tip, which doesn't correspond to
1035        // the best block
1036        state.data.fork_tips.insert(
1037            1,
1038            ForkTip {
1039                number: block_number,
1040                root: block_root,
1041            },
1042        );
1043        state.data.block_roots.insert(block_root, block_number);
1044        block_forks.push(ClientDatabaseBlock::InMemory(ClientDatabaseBlockInMemory {
1045            block,
1046            block_details,
1047        }));
1048
1049        Self::prune_outdated_fork_tips(block_number, &mut state.data, &self.inner.options);
1050
1051        Ok(())
1052    }
1053
1054    async fn persist_segment_headers(
1055        &self,
1056        segment_headers: Vec<SegmentHeader>,
1057    ) -> Result<(), PersistSegmentHeadersError> {
1058        let mut state = self.inner.state.write().await;
1059
1060        let added_segment_headers = state
1061            .segment_headers_cache
1062            .add_segment_headers(segment_headers)?;
1063
1064        if added_segment_headers.is_empty() {
1065            return Ok(());
1066        }
1067
1068        // Convert write lock into upgradable read lock to allow reads, while preventing segment
1069        // headers modifications
1070        // TODO: This assumes both guarantees in https://github.com/smol-rs/async-lock/issues/100
1071        //  are satisfied. If not, blocking read locks in other places will cause issues.
1072        let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1073
1074        let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1075
1076        storage_backend_adapter
1077            .write_storage_item(StorageItemBlock::SegmentHeaders(
1078                StorageItemBlockSegmentHeaders {
1079                    segment_headers: added_segment_headers,
1080                },
1081            ))
1082            .await?;
1083
1084        Ok(())
1085    }
1086}
1087
1088impl<Block, StorageBackend> ClientDatabase<Block, StorageBackend>
1089where
1090    Block: GenericOwnedBlock,
1091    StorageBackend: ClientDatabaseStorageBackend,
1092{
1093    /// Open the existing database.
1094    ///
1095    /// NOTE: The database needs to be formatted with [`Self::format()`] before it can be used.
1096    pub async fn open<GBB>(
1097        options: ClientDatabaseOptions<GBB, StorageBackend>,
1098    ) -> Result<Self, ClientDatabaseError>
1099    where
1100        GBB: FnOnce() -> GenesisBlockBuilderResult<Block>,
1101    {
1102        let ClientDatabaseOptions {
1103            write_buffer_size,
1104            confirmation_depth_k,
1105            soft_confirmation_depth,
1106            max_fork_tips,
1107            max_fork_tip_distance,
1108            genesis_block_builder,
1109            storage_backend,
1110        } = options;
1111        if soft_confirmation_depth >= confirmation_depth_k {
1112            return Err(ClientDatabaseError::InvalidSoftConfirmationDepth);
1113        }
1114
1115        if max_fork_tip_distance > confirmation_depth_k {
1116            return Err(ClientDatabaseError::InvalidMaxForkTipDistance);
1117        }
1118
1119        let mut state_data = StateData {
1120            fork_tips: VecDeque::new(),
1121            block_roots: HashMap::default(),
1122            blocks: VecDeque::new(),
1123        };
1124        let mut segment_headers_cache = SegmentHeadersCache {
1125            segment_headers_cache: Vec::new(),
1126        };
1127
1128        let options = ClientDatabaseInnerOptions {
1129            confirmation_depth_k,
1130            soft_confirmation_depth,
1131            max_fork_tips,
1132            max_fork_tip_distance,
1133        };
1134
1135        let storage_item_handlers = StorageItemHandlers {
1136            permanent: |_arg| {
1137                // TODO
1138                Ok(())
1139            },
1140            block: |arg| {
1141                let StorageItemHandlerArg {
1142                    storage_item,
1143                    page_offset,
1144                    num_pages,
1145                } = arg;
1146                let storage_item_block = match storage_item {
1147                    StorageItemBlock::Block(storage_item_block) => storage_item_block,
1148                    StorageItemBlock::SegmentHeaders(segment_headers) => {
1149                        let num_segment_headers = segment_headers.segment_headers.len();
1150                        return match segment_headers_cache
1151                            .add_segment_headers(segment_headers.segment_headers)
1152                        {
1153                            Ok(_) => Ok(()),
1154                            Err(error) => {
1155                                error!(
1156                                    %page_offset,
1157                                    %num_segment_headers,
1158                                    %error,
1159                                    "Failed to add segment headers from storage item"
1160                                );
1161
1162                                Err(ClientDatabaseError::InvalidSegmentHeaders { page_offset })
1163                            }
1164                        };
1165                    }
1166                };
1167
1168                // TODO: It would be nice to not allocate body here since we'll not use it here
1169                //  anyway
1170                let StorageItemBlockBlock {
1171                    header,
1172                    body: _,
1173                    mmr_with_block,
1174                    system_contract_states,
1175                } = storage_item_block;
1176
1177                let header = Block::Header::from_buffer(header).map_err(|_buffer| {
1178                    error!(%page_offset, "Failed to decode block header from bytes");
1179
1180                    ClientDatabaseError::InvalidBlock { page_offset }
1181                })?;
1182
1183                let block_root = *header.header().root();
1184                let block_number = header.header().prefix.number;
1185
1186                state_data.block_roots.insert(block_root, block_number);
1187
1188                let maybe_best_number = state_data
1189                    .blocks
1190                    .front()
1191                    .and_then(|block_forks| block_forks.first())
1192                    .map(|best_block| {
1193                        // Type inference is not working here for some reason
1194                        let header: &Block::Header = best_block.header();
1195
1196                        header.header().prefix.number
1197                    });
1198
1199                let block_offset = if let Some(best_number) = maybe_best_number {
1200                    if block_number <= best_number {
1201                        u64::from(best_number - block_number) as usize
1202                    } else {
1203                        // The new best block must follow the previous best block
1204                        if block_number - best_number != BlockNumber::ONE {
1205                            error!(
1206                                %page_offset,
1207                                %best_number,
1208                                %block_number,
1209                                "Invalid new best block number, it must be only one block \
1210                                higher than the best block"
1211                            );
1212
1213                            return Err(ClientDatabaseError::InvalidBlock { page_offset });
1214                        }
1215
1216                        state_data.blocks.push_front(SmallVec::new());
1217                        // Will insert a new block at the front
1218                        0
1219                    }
1220                } else {
1221                    state_data.blocks.push_front(SmallVec::new());
1222                    // Will insert a new block at the front
1223                    0
1224                };
1225
1226                let block_forks = match state_data.blocks.get_mut(block_offset) {
1227                    Some(block_forks) => block_forks,
1228                    None => {
1229                        // Ignore the older block, other blocks at its height were already pruned
1230                        // anyway
1231
1232                        return Ok(());
1233                    }
1234                };
1235
1236                // Push a new block to the end of the list, we'll fix it up later
1237                block_forks.push(ClientDatabaseBlock::Persisted {
1238                    header,
1239                    block_details: BlockDetails {
1240                        mmr_with_block,
1241                        system_contract_states,
1242                    },
1243                    write_location: WriteLocation {
1244                        page_offset,
1245                        num_pages,
1246                    },
1247                });
1248
1249                // If a new block was inserted, confirm a new canonical block to prune extra
1250                // in-memory information
1251                if block_offset == 0 && block_forks.len() == 1 {
1252                    Self::confirm_canonical_block(block_number, &mut state_data, &options);
1253                }
1254
1255                Ok(())
1256            },
1257        };
1258
1259        let storage_backend_adapter =
1260            StorageBackendAdapter::open(write_buffer_size, storage_item_handlers, storage_backend)
1261                .await?;
1262
1263        if let Some(best_block) = state_data.blocks.front().and_then(|block_forks| {
1264            // The best block is last in the list here because that is how it was inserted while
1265            // reading from the database
1266            block_forks.last()
1267        }) {
1268            // Type inference is not working here for some reason
1269            let header: &Block::Header = best_block.header();
1270            let header = header.header();
1271            let block_number = header.prefix.number;
1272            let block_root = *header.root();
1273
1274            if !Self::adjust_ancestor_block_forks(&mut state_data.blocks, block_root) {
1275                return Err(ClientDatabaseError::FailedToAdjustAncestorBlockForks);
1276            }
1277
1278            // Store the best block as the first and only fork tip
1279            state_data.fork_tips.push_front(ForkTip {
1280                number: block_number,
1281                root: block_root,
1282            });
1283        } else {
1284            let GenesisBlockBuilderResult {
1285                block,
1286                system_contract_states,
1287            } = genesis_block_builder();
1288
1289            // If the database is empty, initialize everything with the genesis block
1290            let header = block.header().header();
1291            let block_number = header.prefix.number;
1292            let block_root = *header.root();
1293
1294            state_data.fork_tips.push_front(ForkTip {
1295                number: block_number,
1296                root: block_root,
1297            });
1298            state_data.block_roots.insert(block_root, block_number);
1299            state_data
1300                .blocks
1301                .push_front(smallvec![ClientDatabaseBlock::InMemory(
1302                    ClientDatabaseBlockInMemory {
1303                        block,
1304                        block_details: BlockDetails {
1305                            system_contract_states,
1306                            mmr_with_block: Arc::new({
1307                                let mut mmr = BlockMerkleMountainRange::new();
1308                                mmr.add_leaf(&block_root);
1309                                mmr
1310                            })
1311                        },
1312                    }
1313                )]);
1314        }
1315
1316        let state = State {
1317            data: state_data,
1318            segment_headers_cache,
1319            storage_backend_adapter: AsyncRwLock::new(storage_backend_adapter),
1320        };
1321
1322        let inner = Inner {
1323            state: AsyncRwLock::new(state),
1324            options,
1325        };
1326
1327        Ok(Self {
1328            inner: Arc::new(inner),
1329        })
1330    }
1331
1332    /// Format a new database
1333    pub async fn format(
1334        storage_backend: &StorageBackend,
1335        options: ClientDatabaseFormatOptions,
1336    ) -> Result<(), ClientDatabaseFormatError> {
1337        StorageBackendAdapter::format(storage_backend, options).await
1338    }
1339
1340    fn insert_first_block(state: &mut StateData<Block>, block: Block, block_details: BlockDetails) {
1341        // If the database is empty, initialize everything with the genesis block
1342        let header = block.header().header();
1343        let block_number = header.prefix.number;
1344        let block_root = *header.root();
1345
1346        state.fork_tips.clear();
1347        state.fork_tips.push_front(ForkTip {
1348            number: block_number,
1349            root: block_root,
1350        });
1351        state.block_roots.clear();
1352        state.block_roots.insert(block_root, block_number);
1353        state.blocks.clear();
1354        state
1355            .blocks
1356            .push_front(smallvec![ClientDatabaseBlock::InMemory(
1357                ClientDatabaseBlockInMemory {
1358                    block,
1359                    block_details,
1360                }
1361            )]);
1362    }
1363
1364    async fn insert_new_best_block(
1365        mut state: AsyncRwLockWriteGuard<'_, State<Block, StorageBackend>>,
1366        inner: &Inner<Block, StorageBackend>,
1367        block: Block,
1368        block_details: BlockDetails,
1369    ) -> Result<(), PersistBlockError> {
1370        let header = block.header().header();
1371        let block_number = header.prefix.number;
1372        let block_root = *header.root();
1373        let parent_root = header.prefix.parent_root;
1374
1375        // Adjust the relative order of forks to ensure the first index always corresponds to
1376        // ancestors of the new best block
1377        if !Self::adjust_ancestor_block_forks(&mut state.data.blocks, parent_root) {
1378            return Err(PersistBlockError::MissingParent);
1379        }
1380
1381        // Store new block in the state
1382        {
1383            for (index, fork_tip) in state.data.fork_tips.iter_mut().enumerate() {
1384                // Block's parent is no longer a fork tip, remove it
1385                if fork_tip.root == parent_root {
1386                    state.data.fork_tips.remove(index);
1387                    break;
1388                }
1389            }
1390
1391            state.data.fork_tips.push_front(ForkTip {
1392                number: block_number,
1393                root: block_root,
1394            });
1395            state.data.block_roots.insert(block_root, block_number);
1396            state
1397                .data
1398                .blocks
1399                .push_front(smallvec![ClientDatabaseBlock::InMemory(
1400                    ClientDatabaseBlockInMemory {
1401                        block,
1402                        block_details: block_details.clone()
1403                    }
1404                )]);
1405        }
1406
1407        let options = &inner.options;
1408
1409        Self::confirm_canonical_block(block_number, &mut state.data, options);
1410        Self::prune_outdated_fork_tips(block_number, &mut state.data, options);
1411
1412        // Convert write lock into upgradable read lock to allow reads, while preventing concurrent
1413        // block modifications
1414        // TODO: This assumes both guarantees in https://github.com/smol-rs/async-lock/issues/100
1415        //  are satisfied. If not, blocking read locks in other places will cause issues.
1416        let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1417
1418        let mut blocks_to_persist = Vec::new();
1419        for block_offset in u64::from(options.soft_confirmation_depth) as usize.. {
1420            let Some(fork_blocks) = state.data.blocks.get(block_offset) else {
1421                break;
1422            };
1423
1424            let len_before = blocks_to_persist.len();
1425            fork_blocks
1426                .iter()
1427                .enumerate()
1428                .filter_map(|(fork_offset, client_database_block)| {
1429                    match client_database_block {
1430                        ClientDatabaseBlock::InMemory(block) => Some(BlockToPersist {
1431                            block_offset,
1432                            fork_offset,
1433                            block,
1434                        }),
1435                        ClientDatabaseBlock::Persisted { .. }
1436                        | ClientDatabaseBlock::PersistedConfirmed { .. } => {
1437                            // Already persisted
1438                            None
1439                        }
1440                    }
1441                })
1442                .collect_into(&mut blocks_to_persist);
1443
1444            if blocks_to_persist.len() == len_before {
1445                break;
1446            }
1447        }
1448
1449        // Persist blocks from older to newer
1450        let mut persisted_blocks = Vec::with_capacity(blocks_to_persist.len());
1451        {
1452            let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1453
1454            for block_to_persist in blocks_to_persist.into_iter().rev() {
1455                let BlockToPersist {
1456                    block_offset,
1457                    fork_offset,
1458                    block,
1459                } = block_to_persist;
1460
1461                let write_location = storage_backend_adapter
1462                    .write_storage_item(StorageItemBlock::Block(StorageItemBlockBlock {
1463                        header: block.block.header().buffer().clone(),
1464                        body: block.block.body().buffer().clone(),
1465                        mmr_with_block: Arc::clone(&block.block_details.mmr_with_block),
1466                        system_contract_states: StdArc::clone(
1467                            &block.block_details.system_contract_states,
1468                        ),
1469                    }))
1470                    .await?;
1471
1472                persisted_blocks.push(PersistedBlock {
1473                    block_offset,
1474                    fork_offset,
1475                    write_location,
1476                });
1477            }
1478        }
1479
1480        // Convert blocks to persisted
1481        let mut state = RwLockUpgradableReadGuard::upgrade(state).await;
1482        for persisted_block in persisted_blocks {
1483            let PersistedBlock {
1484                block_offset,
1485                fork_offset,
1486                write_location,
1487            } = persisted_block;
1488
1489            let block = state
1490                .data
1491                .blocks
1492                .get_mut(block_offset)
1493                .expect("Still holding the same lock since last check; qed")
1494                .get_mut(fork_offset)
1495                .expect("Still holding the same lock since last check; qed");
1496
1497            replace_with_or_abort(block, |block| {
1498                if let ClientDatabaseBlock::InMemory(in_memory) = block {
1499                    let (header, _body) = in_memory.block.split();
1500
1501                    ClientDatabaseBlock::Persisted {
1502                        header,
1503                        block_details: in_memory.block_details,
1504                        write_location,
1505                    }
1506                } else {
1507                    unreachable!("Still holding the same lock since last check; qed");
1508                }
1509            });
1510        }
1511
1512        // TODO: Prune blocks that are no longer necessary
1513        // TODO: Prune unused page groups here or elsewhere?
1514
1515        Ok(())
1516    }
1517
1518    /// Adjust the relative order of forks to ensure the first index always corresponds to
1519    /// `parent_block_root` and its ancestors.
1520    ///
1521    /// Returns `true` on success and `false` if one of the parents was not found.
1522    #[must_use]
1523    fn adjust_ancestor_block_forks(
1524        blocks: &mut VecDeque<SmallVec<[ClientDatabaseBlock<Block>; 2]>>,
1525        mut parent_block_root: BlockRoot,
1526    ) -> bool {
1527        let mut ancestor_blocks = blocks.iter_mut();
1528
1529        loop {
1530            if ancestor_blocks.len() == 1 {
1531                // Nothing left to adjust with a single fork
1532                break;
1533            }
1534
1535            let Some(parent_blocks) = ancestor_blocks.next() else {
1536                // No more parent headers present
1537                break;
1538            };
1539
1540            let Some(fork_offset_parent_block_root) =
1541                parent_blocks
1542                    .iter()
1543                    .enumerate()
1544                    .find_map(|(fork_offset, fork_block)| {
1545                        let fork_header = fork_block.header().header();
1546                        if *fork_header.root() == parent_block_root {
1547                            Some((fork_offset, fork_header.prefix.parent_root))
1548                        } else {
1549                            None
1550                        }
1551                    })
1552            else {
1553                return false;
1554            };
1555
1556            let fork_offset;
1557            (fork_offset, parent_block_root) = fork_offset_parent_block_root;
1558
1559            parent_blocks.swap(0, fork_offset);
1560        }
1561
1562        true
1563    }
1564
1565    /// Prune outdated fork tips that are too deep and have not been updated for a long time.
1566    ///
1567    /// Note that actual headers, blocks and MMRs could remain if they are currently used by
1568    /// something or were already persisted on disk. With persisted blocks specifically, RAM usage
1569    /// implications are minimal, and we wouldn't want to re-download already stored blocks in case
1570    /// they end up being necessary later.
1571    fn prune_outdated_fork_tips(
1572        best_number: BlockNumber,
1573        state: &mut StateData<Block>,
1574        options: &ClientDatabaseInnerOptions,
1575    ) {
1576        let state = &mut *state;
1577
1578        // These forks are just candidates because they will not be pruned if the reference count is
1579        // not 1, indicating they are still in use by something
1580        let mut candidate_forks_to_remove = Vec::with_capacity(options.max_fork_tips.get());
1581
1582        // Prune forks that are too far away from the best block
1583        state.fork_tips.retain(|fork_tip| {
1584            if best_number - fork_tip.number > options.max_fork_tip_distance {
1585                candidate_forks_to_remove.push(*fork_tip);
1586                false
1587            } else {
1588                true
1589            }
1590        });
1591        // Prune forks that exceed the maximum number of forks
1592        if state.fork_tips.len() > options.max_fork_tips.get() {
1593            state
1594                .fork_tips
1595                .drain(options.max_fork_tips.get()..)
1596                .collect_into(&mut candidate_forks_to_remove);
1597        }
1598
1599        // Prune all possible candidates
1600        candidate_forks_to_remove
1601            .retain(|fork_tip| !Self::prune_outdated_fork(best_number, fork_tip, state));
1602        // Return those that were not pruned back to the list of tips
1603        state.fork_tips.extend(candidate_forks_to_remove);
1604    }
1605
1606    /// Returns `true` if the tip was pruned successfully and `false` if it should be returned to
1607    /// the list of fork tips
1608    #[must_use]
1609    fn prune_outdated_fork(
1610        best_number: BlockNumber,
1611        fork_tip: &ForkTip,
1612        state: &mut StateData<Block>,
1613    ) -> bool {
1614        let block_offset = u64::from(best_number - fork_tip.number) as usize;
1615
1616        // Prune fork top and all its ancestors that are not used
1617        let mut block_root_to_prune = fork_tip.root;
1618        let mut pruned_tip = false;
1619        for block_offset in block_offset.. {
1620            let Some(fork_blocks) = state.blocks.get_mut(block_offset) else {
1621                if !pruned_tip {
1622                    error!(
1623                        %best_number,
1624                        ?fork_tip,
1625                        block_offset,
1626                        "Block offset was not present in the database, this is an implementation \
1627                        bug #1"
1628                    );
1629                }
1630                // No forks left to prune
1631                break;
1632            };
1633
1634            if fork_blocks.len() == 1 {
1635                if !pruned_tip {
1636                    error!(
1637                        %best_number,
1638                        ?fork_tip,
1639                        block_offset,
1640                        "Block offset was not present in the database, this is an implementation \
1641                        bug #2"
1642                    );
1643                }
1644
1645                // No forks left to prune
1646                break;
1647            }
1648
1649            let Some((fork_offset, block)) = fork_blocks
1650                .iter()
1651                .enumerate()
1652                // Skip ancestor of the best block, it is certainly not a fork to be pruned
1653                .skip(1)
1654                .find(|(_fork_offset, block)| {
1655                    *block.header().header().root() == block_root_to_prune
1656                })
1657            else {
1658                if !pruned_tip {
1659                    error!(
1660                        %best_number,
1661                        ?fork_tip,
1662                        block_offset,
1663                        "Block offset was not present in the database, this is an implementation \
1664                        bug #3"
1665                    );
1666                }
1667
1668                // Nothing left to prune
1669                break;
1670            };
1671
1672            // More than one instance means something somewhere is using or depends on this block
1673            if block.header().ref_count() > 1 {
1674                break;
1675            }
1676
1677            // Blocks that are already persisted
1678            match block {
1679                ClientDatabaseBlock::InMemory(_) => {
1680                    // Prune
1681                }
1682                ClientDatabaseBlock::Persisted { .. }
1683                | ClientDatabaseBlock::PersistedConfirmed { .. } => {
1684                    // Already on disk, keep it in memory for later, but prune the tip
1685                    pruned_tip = true;
1686                    break;
1687                }
1688            }
1689
1690            state.block_roots.get_mut(&block_root_to_prune);
1691            block_root_to_prune = block.header().header().prefix.parent_root;
1692            fork_blocks.swap_remove(fork_offset);
1693
1694            pruned_tip = true;
1695        }
1696
1697        pruned_tip
1698    }
1699
1700    /// Confirm a block at confirmation depth k and prune any other blocks at the same depth with
1701    /// their descendants
1702    fn confirm_canonical_block(
1703        best_number: BlockNumber,
1704        state_data: &mut StateData<Block>,
1705        options: &ClientDatabaseInnerOptions,
1706    ) {
1707        // `+1` means it effectively confirms parent blocks instead. This is done to keep the parent
1708        // of the confirmed block with its MMR in memory due to confirmed blocks not storing their
1709        // MMRs, which might be needed for reorgs at the lowest possible depth.
1710        let block_offset = u64::from(options.confirmation_depth_k + BlockNumber::ONE) as usize;
1711
1712        let Some(fork_blocks) = state_data.blocks.get_mut(block_offset) else {
1713            // Nothing to confirm yet
1714            return;
1715        };
1716
1717        // Mark the canonical block as confirmed
1718        {
1719            let Some(canonical_block) = fork_blocks.first_mut() else {
1720                error!(
1721                    %best_number,
1722                    block_offset,
1723                    "Have not found a canonical block to confirm, this is an implementation bug"
1724                );
1725                return;
1726            };
1727
1728            replace_with_or_abort(canonical_block, |block| match block {
1729                ClientDatabaseBlock::InMemory(_) => {
1730                    error!(
1731                        %best_number,
1732                        block_offset,
1733                        header = ?block.header(),
1734                        "Block to be confirmed must not be in memory, this is an implementation bug"
1735                    );
1736                    block
1737                }
1738                ClientDatabaseBlock::Persisted {
1739                    header,
1740                    block_details: _,
1741                    write_location,
1742                } => ClientDatabaseBlock::PersistedConfirmed {
1743                    header,
1744                    write_location,
1745                },
1746                ClientDatabaseBlock::PersistedConfirmed { .. } => {
1747                    error!(
1748                        %best_number,
1749                        block_offset,
1750                        header = ?block.header(),
1751                        "Block to be confirmed must not be confirmed yet, this is an \
1752                        implementation bug"
1753                    );
1754                    block
1755                }
1756            });
1757        }
1758
1759        // Prune the rest of the blocks and their descendants
1760        let mut block_roots_to_prune = fork_blocks
1761            .drain(1..)
1762            .map(|block| *block.header().header().root())
1763            .collect::<Vec<_>>();
1764        let mut current_block_offset = block_offset;
1765        while !block_roots_to_prune.is_empty() {
1766            // Prune fork tips (if any)
1767            state_data
1768                .fork_tips
1769                .retain(|fork_tip| !block_roots_to_prune.contains(&fork_tip.root));
1770
1771            // Prune removed block roots
1772            for block_root in &block_roots_to_prune {
1773                state_data.block_roots.remove(block_root);
1774            }
1775
1776            // Block offset for direct descendants
1777            if let Some(next_block_offset) = current_block_offset.checked_sub(1) {
1778                current_block_offset = next_block_offset;
1779            } else {
1780                // Reached the tip
1781                break;
1782            }
1783
1784            let fork_blocks = state_data
1785                .blocks
1786                .get_mut(current_block_offset)
1787                .expect("Lower block offset always exists; qed");
1788
1789            // Collect descendants of pruned blocks to prune them next
1790            block_roots_to_prune = fork_blocks
1791                .drain_filter(|block| {
1792                    let header = block.header().header();
1793
1794                    block_roots_to_prune.contains(&header.prefix.parent_root)
1795                })
1796                .map(|block| *block.header().header().root())
1797                .collect();
1798        }
1799    }
1800}