1#![expect(incomplete_features, reason = "generic_const_exprs")]
44#![feature(generic_const_exprs)]
47#![feature(
48 default_field_values,
49 get_mut_unchecked,
50 iter_collect_into,
51 maybe_uninit_as_bytes,
52 maybe_uninit_fill,
53 push_mut,
54 try_blocks
55)]
56
57mod page_group;
58pub mod storage_backend;
59mod storage_backend_adapter;
60
61use crate::page_group::block::StorageItemBlock;
62use crate::page_group::block::block::StorageItemBlockBlock;
63use crate::page_group::block::segment_headers::StorageItemBlockSegmentHeaders;
64use crate::storage_backend::ClientDatabaseStorageBackend;
65use crate::storage_backend_adapter::{
66 StorageBackendAdapter, StorageItemHandlerArg, StorageItemHandlers, WriteLocation,
67};
68use ab_client_api::{
69 BlockDetails, BlockMerkleMountainRange, ChainInfo, ChainInfoWrite, ContractSlotState,
70 PersistBlockError, PersistSegmentHeadersError, ReadBlockError,
71};
72use ab_core_primitives::block::body::owned::GenericOwnedBlockBody;
73use ab_core_primitives::block::header::GenericBlockHeader;
74use ab_core_primitives::block::header::owned::GenericOwnedBlockHeader;
75use ab_core_primitives::block::owned::GenericOwnedBlock;
76use ab_core_primitives::block::{BlockNumber, BlockRoot, GenericBlock};
77use ab_core_primitives::segments::{SegmentHeader, SegmentIndex};
78use ab_core_primitives::shard::RealShardKind;
79use ab_io_type::trivial_type::TrivialType;
80use async_lock::{
81 RwLock as AsyncRwLock, RwLockUpgradableReadGuard, RwLockWriteGuard as AsyncRwLockWriteGuard,
82};
83use rand::rngs::OsError;
84use rclite::Arc;
85use replace_with::replace_with_or_abort;
86use smallvec::{SmallVec, smallvec};
87use std::collections::{HashMap, VecDeque};
88use std::hash::{BuildHasherDefault, Hasher};
89use std::num::{NonZeroU32, NonZeroUsize};
90use std::ops::Deref;
91use std::sync::Arc as StdArc;
92use std::{fmt, io};
93use tracing::error;
94
95#[derive(Debug, Copy, Clone, Eq, PartialEq, TrivialType)]
97#[repr(C)]
98pub struct DatabaseId([u8; 32]);
99
100impl Deref for DatabaseId {
101 type Target = [u8; 32];
102
103 #[inline(always)]
104 fn deref(&self) -> &Self::Target {
105 &self.0
106 }
107}
108
109impl AsRef<[u8]> for DatabaseId {
110 #[inline(always)]
111 fn as_ref(&self) -> &[u8] {
112 &self.0
113 }
114}
115
116impl DatabaseId {
117 #[inline(always)]
118 pub const fn new(bytes: [u8; 32]) -> Self {
119 Self(bytes)
120 }
121}
122
123#[derive(Default)]
124struct BlockRootHasher(u64);
125
126impl Hasher for BlockRootHasher {
127 #[inline(always)]
128 fn finish(&self) -> u64 {
129 self.0
130 }
131
132 #[inline(always)]
133 fn write(&mut self, bytes: &[u8]) {
134 let Some(state) = bytes.as_chunks().0.first().copied().map(u64::from_le_bytes) else {
135 return;
136 };
137
138 self.0 = state;
139 }
140}
141
142#[derive(Debug)]
143pub struct GenesisBlockBuilderResult<Block> {
144 pub block: Block,
146 pub system_contract_states: StdArc<[ContractSlotState]>,
148}
149
150#[derive(Debug, Copy, Clone)]
152pub struct ClientDatabaseOptions<GBB, StorageBackend> {
153 pub write_buffer_size: usize = 5,
161 pub confirmation_depth_k: BlockNumber,
167 pub soft_confirmation_depth: BlockNumber = BlockNumber::new(3),
181 pub max_fork_tips: NonZeroUsize = NonZeroUsize::new(3).expect("Not zero; qed"),
192 pub max_fork_tip_distance: BlockNumber = BlockNumber::new(5),
203 pub genesis_block_builder: GBB,
206 pub storage_backend: StorageBackend,
208}
209
210#[derive(Debug, Copy, Clone)]
212pub struct ClientDatabaseFormatOptions {
213 pub page_group_size: NonZeroU32,
230 pub force: bool,
234}
235
236#[derive(Debug, thiserror::Error)]
237pub enum ClientDatabaseError {
238 #[error("Invalid soft confirmation depth, it must be smaller than confirmation depth k")]
240 InvalidSoftConfirmationDepth,
241 #[error("Invalid max fork tip distance, it must be smaller or equal to confirmation depth k")]
243 InvalidMaxForkTipDistance,
244 #[error("Storage backend has canceled read request")]
246 ReadRequestCancelled,
247 #[error("Storage backend read error: {error}")]
249 ReadError {
250 error: io::Error,
252 },
253 #[error("Unsupported database version: {database_version}")]
255 UnsupportedDatabaseVersion {
256 database_version: u8,
258 },
259 #[error("Page group size is too small ({page_group_size}), must be at least two pages")]
261 PageGroupSizeTooSmall {
262 page_group_size: u32,
264 },
265 #[error(
267 "Unexpected sequence number {actual} at page offset {page_offset} (expected \
268 {expected})"
269 )]
270 UnexpectedSequenceNumber {
271 actual: u64,
273 expected: u64,
275 page_offset: u32,
277 },
278 #[error("Unexpected storage item at offset {page_offset}: {storage_item:?}")]
280 UnexpectedStorageItem {
281 storage_item: Box<dyn fmt::Debug + Send + Sync>,
283 page_offset: u32,
285 },
286 #[error("Invalid block at offset {page_offset}")]
288 InvalidBlock {
289 page_offset: u32,
291 },
292 #[error("Invalid segment headers at offset {page_offset}")]
294 InvalidSegmentHeaders {
295 page_offset: u32,
297 },
298 #[error("Failed to adjust ancestor block forks")]
300 FailedToAdjustAncestorBlockForks,
301 #[error("Database is not formatted yet")]
303 Unformatted,
304 #[error("Non-permanent first page group")]
306 NonPermanentFirstPageGroup,
307}
308
309#[derive(Debug, thiserror::Error)]
311pub enum ClientDatabaseFormatError {
312 #[error("Storage backend has canceled read request")]
314 ReadRequestCancelled,
315 #[error("Storage backend read error: {error}")]
317 ReadError {
318 error: io::Error,
320 },
321 #[error("Failed to generate database id")]
323 FailedToGenerateDatabaseId {
324 #[from]
326 error: OsError,
327 },
328 #[error("Database is already formatted yet")]
330 AlreadyFormatted,
331 #[error("Storage backend has canceled a writing request")]
333 WriteRequestCancelled,
334 #[error("Storage item write error")]
336 StorageItemWriteError {
337 #[from]
339 error: io::Error,
340 },
341}
342
343#[derive(Debug, Copy, Clone)]
344struct ForkTip {
345 number: BlockNumber,
346 root: BlockRoot,
347}
348
349#[derive(Debug)]
350struct ClientDatabaseBlockInMemory<Block>
351where
352 Block: GenericOwnedBlock,
353{
354 block: Block,
355 block_details: BlockDetails,
356}
357
358enum FullBlock<'a, Block>
359where
360 Block: GenericOwnedBlock,
361{
362 InMemory(&'a Block),
363 Persisted {
364 header: &'a Block::Header,
365 write_location: WriteLocation,
366 },
367}
368
369#[derive(Debug)]
376enum ClientDatabaseBlock<Block>
377where
378 Block: GenericOwnedBlock,
379{
380 InMemory(ClientDatabaseBlockInMemory<Block>),
382 Persisted {
384 header: Block::Header,
385 block_details: BlockDetails,
386 write_location: WriteLocation,
387 },
388 PersistedConfirmed {
391 header: Block::Header,
392 write_location: WriteLocation,
393 },
394}
395
396impl<Block> ClientDatabaseBlock<Block>
397where
398 Block: GenericOwnedBlock,
399{
400 #[inline(always)]
401 fn header(&self) -> &Block::Header {
402 match self {
403 Self::InMemory(in_memory) => in_memory.block.header(),
404 Self::Persisted { header, .. } => header,
405 Self::PersistedConfirmed { header, .. } => header,
406 }
407 }
408
409 #[inline(always)]
410 fn full_block(&self) -> FullBlock<'_, Block> {
411 match self {
412 Self::InMemory(in_memory) => FullBlock::InMemory(&in_memory.block),
413 Self::Persisted {
414 header,
415 write_location,
416 ..
417 }
418 | Self::PersistedConfirmed {
419 header,
420 write_location,
421 } => FullBlock::Persisted {
422 header,
423 write_location: *write_location,
424 },
425 }
426 }
427
428 #[inline(always)]
429 fn block_details(&self) -> Option<&BlockDetails> {
430 match self {
431 Self::InMemory(in_memory) => Some(&in_memory.block_details),
432 Self::Persisted { block_details, .. } => Some(block_details),
433 Self::PersistedConfirmed { .. } => None,
434 }
435 }
436}
437
438#[derive(Debug)]
439struct StateData<Block>
440where
441 Block: GenericOwnedBlock,
442{
443 fork_tips: VecDeque<ForkTip>,
448 block_roots: HashMap<BlockRoot, BlockNumber, BuildHasherDefault<BlockRootHasher>>,
453 blocks: VecDeque<SmallVec<[ClientDatabaseBlock<Block>; 2]>>,
468}
469
470#[derive(Debug)]
471struct SegmentHeadersCache {
472 segment_headers_cache: Vec<SegmentHeader>,
473}
474
475impl SegmentHeadersCache {
476 #[inline(always)]
477 fn last_segment_header(&self) -> Option<SegmentHeader> {
478 self.segment_headers_cache.last().cloned()
479 }
480
481 #[inline(always)]
482 fn max_segment_index(&self) -> Option<SegmentIndex> {
483 self.segment_headers_cache
484 .last()
485 .map(|segment_header| segment_header.segment_index.as_inner())
486 }
487
488 #[inline(always)]
489 fn get_segment_header(&self, segment_index: SegmentIndex) -> Option<SegmentHeader> {
490 self.segment_headers_cache
491 .get(u64::from(segment_index) as usize)
492 .copied()
493 }
494
495 fn add_segment_headers(
497 &mut self,
498 mut segment_headers: Vec<SegmentHeader>,
499 ) -> Result<Vec<SegmentHeader>, PersistSegmentHeadersError> {
500 self.segment_headers_cache.reserve(segment_headers.len());
501
502 let mut maybe_last_segment_index = self.max_segment_index();
503
504 if let Some(last_segment_index) = maybe_last_segment_index {
505 segment_headers.retain(|segment_header| {
507 segment_header.segment_index.as_inner() > last_segment_index
508 });
509 }
510
511 for segment_header in segment_headers.iter().copied() {
514 let segment_index = segment_header.segment_index();
515 match maybe_last_segment_index {
516 Some(last_segment_index) => {
517 if segment_index != last_segment_index + SegmentIndex::ONE {
518 return Err(PersistSegmentHeadersError::MustFollowLastSegmentIndex {
519 segment_index,
520 last_segment_index,
521 });
522 }
523
524 self.segment_headers_cache.push(segment_header);
525 maybe_last_segment_index.replace(segment_index);
526 }
527 None => {
528 if segment_index != SegmentIndex::ZERO {
529 return Err(PersistSegmentHeadersError::FirstSegmentIndexZero {
530 segment_index,
531 });
532 }
533
534 self.segment_headers_cache.push(segment_header);
535 maybe_last_segment_index.replace(segment_index);
536 }
537 }
538 }
539
540 Ok(segment_headers)
541 }
542}
543
544#[derive(Debug)]
546struct State<Block, StorageBackend>
547where
548 Block: GenericOwnedBlock,
549{
550 data: StateData<Block>,
551 segment_headers_cache: SegmentHeadersCache,
552 storage_backend_adapter: AsyncRwLock<StorageBackendAdapter<StorageBackend>>,
553}
554
555impl<Block, StorageBackend> State<Block, StorageBackend>
556where
557 Block: GenericOwnedBlock,
558{
559 #[inline(always)]
560 fn best_tip(&self) -> &ForkTip {
561 self.data
562 .fork_tips
563 .front()
564 .expect("The best block is always present; qed")
565 }
566
567 #[inline(always)]
568 fn best_block(&self) -> &ClientDatabaseBlock<Block> {
569 self.data
570 .blocks
571 .front()
572 .expect("The best block is always present; qed")
573 .first()
574 .expect("The best block is always present; qed")
575 }
576}
577
578#[derive(Debug)]
579struct BlockToPersist<'a, Block>
580where
581 Block: GenericOwnedBlock,
582{
583 block_offset: usize,
584 fork_offset: usize,
585 block: &'a ClientDatabaseBlockInMemory<Block>,
586}
587
588#[derive(Debug)]
589struct PersistedBlock {
590 block_offset: usize,
591 fork_offset: usize,
592 write_location: WriteLocation,
593}
594
595#[derive(Debug)]
596struct ClientDatabaseInnerOptions {
597 confirmation_depth_k: BlockNumber,
598 soft_confirmation_depth: BlockNumber,
599 max_fork_tips: NonZeroUsize,
600 max_fork_tip_distance: BlockNumber,
601}
602
603#[derive(Debug)]
604struct Inner<Block, StorageBackend>
605where
606 Block: GenericOwnedBlock,
607{
608 state: AsyncRwLock<State<Block, StorageBackend>>,
609 options: ClientDatabaseInnerOptions,
610}
611
612#[derive(Debug)]
614pub struct ClientDatabase<Block, StorageBackend>
615where
616 Block: GenericOwnedBlock,
617{
618 inner: Arc<Inner<Block, StorageBackend>>,
619}
620
621impl<Block, StorageBackend> Clone for ClientDatabase<Block, StorageBackend>
622where
623 Block: GenericOwnedBlock,
624{
625 fn clone(&self) -> Self {
626 Self {
627 inner: self.inner.clone(),
628 }
629 }
630}
631
632impl<Block, StorageBackend> Drop for ClientDatabase<Block, StorageBackend>
633where
634 Block: GenericOwnedBlock,
635{
636 fn drop(&mut self) {
637 }
639}
640
641impl<Block, StorageBackend> ChainInfo<Block> for ClientDatabase<Block, StorageBackend>
642where
643 Block: GenericOwnedBlock,
644 StorageBackend: ClientDatabaseStorageBackend,
645{
646 #[inline]
647 fn best_root(&self) -> BlockRoot {
648 self.inner.state.read_blocking().best_tip().root
651 }
652
653 #[inline]
654 fn best_header(&self) -> Block::Header {
655 self.inner
658 .state
659 .read_blocking()
660 .best_block()
661 .header()
662 .clone()
663 }
664
665 #[inline]
666 fn best_header_with_details(&self) -> (Block::Header, BlockDetails) {
667 let state = self.inner.state.read_blocking();
670 let best_block = state.best_block();
671 (
672 best_block.header().clone(),
673 best_block
674 .block_details()
675 .expect("Always present for the best block; qed")
676 .clone(),
677 )
678 }
679
680 #[inline]
682 fn ancestor_header(
683 &self,
684 ancestor_block_number: BlockNumber,
685 descendant_block_root: &BlockRoot,
686 ) -> Option<Block::Header> {
687 let state = self.inner.state.read_blocking();
690 let best_number = state.best_tip().number;
691
692 let ancestor_block_offset =
693 best_number.checked_sub(ancestor_block_number)?.as_u64() as usize;
694 let ancestor_block_candidates = state.data.blocks.get(ancestor_block_offset)?;
695
696 let descendant_block_number = *state.data.block_roots.get(descendant_block_root)?;
697 if ancestor_block_number > descendant_block_number {
698 return None;
699 }
700 let descendant_block_offset =
701 best_number.checked_sub(descendant_block_number)?.as_u64() as usize;
702
703 let mut blocks_range_iter = state
705 .data
706 .blocks
707 .iter()
708 .enumerate()
709 .skip(descendant_block_offset);
710
711 let (_offset, descendant_block_candidates) = blocks_range_iter.next()?;
712 let descendant_header = descendant_block_candidates
713 .iter()
714 .find(|block| &*block.header().header().root() == descendant_block_root)?
715 .header()
716 .header();
717
718 if descendant_block_candidates.len() == 1 || ancestor_block_candidates.len() == 1 {
723 return ancestor_block_candidates
724 .iter()
725 .next()
726 .map(|block| block.header().clone());
727 }
728
729 let mut parent_block_root = &descendant_header.prefix.parent_root;
730
731 for (block_offset, parent_candidates) in blocks_range_iter {
733 let parent_header = parent_candidates
734 .iter()
735 .find(|header| &*header.header().header().root() == parent_block_root)?
736 .header();
737
738 if block_offset == ancestor_block_offset {
740 return Some(parent_header.clone());
741 }
742
743 parent_block_root = &parent_header.header().prefix.parent_root;
744 }
745
746 None
747 }
748
749 #[inline]
750 fn header(&self, block_root: &BlockRoot) -> Option<Block::Header> {
751 let state = self.inner.state.read_blocking();
754 let best_number = state.best_tip().number;
755
756 let block_number = *state.data.block_roots.get(block_root)?;
757 let block_offset = best_number.checked_sub(block_number)?.as_u64() as usize;
758 let block_candidates = state.data.blocks.get(block_offset)?;
759
760 block_candidates.iter().find_map(|block| {
761 let header = block.header();
762
763 if &*header.header().root() == block_root {
764 Some(header.clone())
765 } else {
766 None
767 }
768 })
769 }
770
771 #[inline]
772 fn header_with_details(&self, block_root: &BlockRoot) -> Option<(Block::Header, BlockDetails)> {
773 let state = self.inner.state.read_blocking();
776 let best_number = state.best_tip().number;
777
778 let block_number = *state.data.block_roots.get(block_root)?;
779 let block_offset = best_number.checked_sub(block_number)?.as_u64() as usize;
780 let block_candidates = state.data.blocks.get(block_offset)?;
781
782 block_candidates.iter().find_map(|block| {
783 let header = block.header();
784 let block_details = block.block_details().cloned()?;
785
786 if &*header.header().root() == block_root {
787 Some((header.clone(), block_details))
788 } else {
789 None
790 }
791 })
792 }
793
794 #[inline]
795 async fn block(&self, block_root: &BlockRoot) -> Result<Block, ReadBlockError> {
796 let state = self.inner.state.read().await;
797 let best_number = state.best_tip().number;
798
799 let block_number = *state
800 .data
801 .block_roots
802 .get(block_root)
803 .ok_or(ReadBlockError::UnknownBlockRoot)?;
804 let block_offset = best_number
805 .checked_sub(block_number)
806 .expect("Known block roots always have valid block offset; qed")
807 .as_u64() as usize;
808 let block_candidates = state
809 .data
810 .blocks
811 .get(block_offset)
812 .expect("Valid block offsets always have block entries; qed");
813
814 for block_candidate in block_candidates {
815 let header = block_candidate.header();
816
817 if &*header.header().root() == block_root {
818 return match block_candidate.full_block() {
819 FullBlock::InMemory(block) => Ok(block.clone()),
820 FullBlock::Persisted {
821 header,
822 write_location,
823 } => {
824 let storage_backend_adapter = state.storage_backend_adapter.read().await;
825
826 let storage_item = storage_backend_adapter
827 .read_storage_item::<StorageItemBlock>(write_location)
828 .await?;
829
830 let storage_item_block = match storage_item {
831 StorageItemBlock::Block(storage_item_block) => storage_item_block,
832 StorageItemBlock::SegmentHeaders(_) => {
833 return Err(ReadBlockError::StorageItemReadError {
834 error: io::Error::other(
835 "Unexpected storage item: SegmentHeaders",
836 ),
837 });
838 }
839 };
840
841 let StorageItemBlockBlock {
842 header: _,
843 body,
844 mmr_with_block: _,
845 system_contract_states: _,
846 } = storage_item_block;
847
848 Block::from_buffers(header.buffer().clone(), body)
849 .ok_or(ReadBlockError::FailedToDecode)
850 }
851 };
852 }
853 }
854
855 unreachable!("Known block root always has block candidate associated with it; qed")
856 }
857
858 #[inline]
859 fn last_segment_header(&self) -> Option<SegmentHeader> {
860 let state = self.inner.state.read_blocking();
863 state.segment_headers_cache.last_segment_header()
864 }
865
866 #[inline]
867 fn max_segment_index(&self) -> Option<SegmentIndex> {
868 let state = self.inner.state.read_blocking();
871
872 state.segment_headers_cache.max_segment_index()
873 }
874
875 #[inline]
876 fn get_segment_header(&self, segment_index: SegmentIndex) -> Option<SegmentHeader> {
877 let state = self.inner.state.read_blocking();
880
881 state
882 .segment_headers_cache
883 .get_segment_header(segment_index)
884 }
885
886 #[inline]
887 fn segment_headers_for_block(&self, block_number: BlockNumber) -> Vec<SegmentHeader> {
888 let state = self.inner.state.read_blocking();
891
892 let Some(last_segment_index) = state.segment_headers_cache.max_segment_index() else {
893 return Vec::new();
895 };
896
897 if Block::Block::SHARD_KIND == RealShardKind::BeaconChain
899 && block_number == BlockNumber::ONE
900 {
901 return vec![
904 state
905 .segment_headers_cache
906 .get_segment_header(SegmentIndex::ZERO)
907 .expect("Segment headers are stored in monotonically increasing order; qed"),
908 ];
909 }
910
911 if last_segment_index == SegmentIndex::ZERO {
912 return Vec::new();
914 }
915
916 let mut current_segment_index = last_segment_index;
917 loop {
918 let current_segment_header = state
921 .segment_headers_cache
922 .get_segment_header(current_segment_index)
923 .expect("Segment headers are stored in monotonically increasing order; qed");
924
925 let target_block_number = current_segment_header.last_archived_block.number()
927 + BlockNumber::ONE
928 + self.inner.options.confirmation_depth_k;
929 if target_block_number == block_number {
930 let mut headers_for_block = vec![current_segment_header];
931
932 let last_archived_block_number = current_segment_header.last_archived_block.number;
934 let mut segment_index = current_segment_index - SegmentIndex::ONE;
935
936 while let Some(segment_header) = state
937 .segment_headers_cache
938 .get_segment_header(segment_index)
939 {
940 if segment_header.last_archived_block.number == last_archived_block_number {
941 headers_for_block.insert(0, segment_header);
942 segment_index -= SegmentIndex::ONE;
943 } else {
944 break;
945 }
946 }
947
948 return headers_for_block;
949 }
950
951 if target_block_number > block_number {
953 if current_segment_index > SegmentIndex::ONE {
955 current_segment_index -= SegmentIndex::ONE
956 } else {
957 break;
958 }
959 } else {
960 return Vec::new();
962 }
963 }
964
965 Vec::new()
967 }
968}
969
970impl<Block, StorageBackend> ChainInfoWrite<Block> for ClientDatabase<Block, StorageBackend>
971where
972 Block: GenericOwnedBlock,
973 StorageBackend: ClientDatabaseStorageBackend,
974{
975 async fn persist_block(
976 &self,
977 block: Block,
978 block_details: BlockDetails,
979 ) -> Result<(), PersistBlockError> {
980 let mut state = self.inner.state.write().await;
981 let best_number = state.best_tip().number;
982
983 let header = block.header().header();
984
985 let block_number = header.prefix.number;
986
987 if best_number == BlockNumber::ZERO && block_number != BlockNumber::ONE {
988 Self::insert_first_block(&mut state.data, block, block_details);
990
991 return Ok(());
992 }
993
994 if block_number == best_number + BlockNumber::ONE {
995 return Self::insert_new_best_block(state, &self.inner, block, block_details).await;
996 }
997
998 let block_offset = best_number
999 .checked_sub(block_number)
1000 .ok_or(PersistBlockError::MissingParent)?
1001 .as_u64() as usize;
1002
1003 if block_offset >= self.inner.options.confirmation_depth_k.as_u64() as usize {
1004 return Err(PersistBlockError::OutsideAcceptableRange);
1005 }
1006
1007 let state = &mut *state;
1008
1009 let block_forks = state.data.blocks.get_mut(block_offset).ok_or_else(|| {
1010 error!(
1011 %block_number,
1012 %block_offset,
1013 "Failed to store block fork, header offset is missing despite being within \
1014 acceptable range"
1015 );
1016
1017 PersistBlockError::OutsideAcceptableRange
1018 })?;
1019
1020 for (index, fork_tip) in state.data.fork_tips.iter_mut().enumerate() {
1021 if fork_tip.root == header.prefix.parent_root {
1023 state.data.fork_tips.remove(index);
1024 break;
1025 }
1026 }
1027
1028 let block_root = *header.root();
1029 state.data.fork_tips.insert(
1032 1,
1033 ForkTip {
1034 number: block_number,
1035 root: block_root,
1036 },
1037 );
1038 state.data.block_roots.insert(block_root, block_number);
1039 block_forks.push(ClientDatabaseBlock::InMemory(ClientDatabaseBlockInMemory {
1040 block,
1041 block_details,
1042 }));
1043
1044 Self::prune_outdated_fork_tips(block_number, &mut state.data, &self.inner.options);
1045
1046 Ok(())
1047 }
1048
1049 async fn persist_segment_headers(
1050 &self,
1051 segment_headers: Vec<SegmentHeader>,
1052 ) -> Result<(), PersistSegmentHeadersError> {
1053 let mut state = self.inner.state.write().await;
1054
1055 let added_segment_headers = state
1056 .segment_headers_cache
1057 .add_segment_headers(segment_headers)?;
1058
1059 if added_segment_headers.is_empty() {
1060 return Ok(());
1061 }
1062
1063 let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1068
1069 let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1070
1071 storage_backend_adapter
1072 .write_storage_item(StorageItemBlock::SegmentHeaders(
1073 StorageItemBlockSegmentHeaders {
1074 segment_headers: added_segment_headers,
1075 },
1076 ))
1077 .await?;
1078
1079 Ok(())
1080 }
1081}
1082
1083impl<Block, StorageBackend> ClientDatabase<Block, StorageBackend>
1084where
1085 Block: GenericOwnedBlock,
1086 StorageBackend: ClientDatabaseStorageBackend,
1087{
1088 pub async fn open<GBB>(
1092 options: ClientDatabaseOptions<GBB, StorageBackend>,
1093 ) -> Result<Self, ClientDatabaseError>
1094 where
1095 GBB: FnOnce() -> GenesisBlockBuilderResult<Block>,
1096 {
1097 let ClientDatabaseOptions {
1098 write_buffer_size,
1099 confirmation_depth_k,
1100 soft_confirmation_depth,
1101 max_fork_tips,
1102 max_fork_tip_distance,
1103 genesis_block_builder,
1104 storage_backend,
1105 } = options;
1106 if soft_confirmation_depth >= confirmation_depth_k {
1107 return Err(ClientDatabaseError::InvalidSoftConfirmationDepth);
1108 }
1109
1110 if max_fork_tip_distance > confirmation_depth_k {
1111 return Err(ClientDatabaseError::InvalidMaxForkTipDistance);
1112 }
1113
1114 let mut state_data = StateData {
1115 fork_tips: VecDeque::new(),
1116 block_roots: HashMap::default(),
1117 blocks: VecDeque::new(),
1118 };
1119 let mut segment_headers_cache = SegmentHeadersCache {
1120 segment_headers_cache: Vec::new(),
1121 };
1122
1123 let options = ClientDatabaseInnerOptions {
1124 confirmation_depth_k,
1125 soft_confirmation_depth,
1126 max_fork_tips,
1127 max_fork_tip_distance,
1128 };
1129
1130 let storage_item_handlers = StorageItemHandlers {
1131 permanent: |_arg| {
1132 Ok(())
1134 },
1135 block: |arg| {
1136 let StorageItemHandlerArg {
1137 storage_item,
1138 page_offset,
1139 num_pages,
1140 } = arg;
1141 let storage_item_block = match storage_item {
1142 StorageItemBlock::Block(storage_item_block) => storage_item_block,
1143 StorageItemBlock::SegmentHeaders(segment_headers) => {
1144 let num_segment_headers = segment_headers.segment_headers.len();
1145 return match segment_headers_cache
1146 .add_segment_headers(segment_headers.segment_headers)
1147 {
1148 Ok(_) => Ok(()),
1149 Err(error) => {
1150 error!(
1151 %page_offset,
1152 %num_segment_headers,
1153 %error,
1154 "Failed to add segment headers from storage item"
1155 );
1156
1157 Err(ClientDatabaseError::InvalidSegmentHeaders { page_offset })
1158 }
1159 };
1160 }
1161 };
1162
1163 let StorageItemBlockBlock {
1166 header,
1167 body: _,
1168 mmr_with_block,
1169 system_contract_states,
1170 } = storage_item_block;
1171
1172 let header = Block::Header::from_buffer(header).map_err(|_buffer| {
1173 error!(%page_offset, "Failed to decode block header from bytes");
1174
1175 ClientDatabaseError::InvalidBlock { page_offset }
1176 })?;
1177
1178 let block_root = *header.header().root();
1179 let block_number = header.header().prefix.number;
1180
1181 state_data.block_roots.insert(block_root, block_number);
1182
1183 let maybe_best_number = state_data
1184 .blocks
1185 .front()
1186 .and_then(|block_forks| block_forks.first())
1187 .map(|best_block| {
1188 let header: &Block::Header = best_block.header();
1190
1191 header.header().prefix.number
1192 });
1193
1194 let block_offset = if let Some(best_number) = maybe_best_number {
1195 if block_number <= best_number {
1196 (best_number - block_number).as_u64() as usize
1197 } else {
1198 if block_number - best_number != BlockNumber::ONE {
1200 error!(
1201 %page_offset,
1202 %best_number,
1203 %block_number,
1204 "Invalid new best block number, it must be only one block \
1205 higher than the best block"
1206 );
1207
1208 return Err(ClientDatabaseError::InvalidBlock { page_offset });
1209 }
1210
1211 state_data.blocks.push_front(SmallVec::new());
1212 0
1214 }
1215 } else {
1216 state_data.blocks.push_front(SmallVec::new());
1217 0
1219 };
1220
1221 let block_forks = match state_data.blocks.get_mut(block_offset) {
1222 Some(block_forks) => block_forks,
1223 None => {
1224 return Ok(());
1228 }
1229 };
1230
1231 block_forks.push(ClientDatabaseBlock::Persisted {
1233 header,
1234 block_details: BlockDetails {
1235 mmr_with_block,
1236 system_contract_states,
1237 },
1238 write_location: WriteLocation {
1239 page_offset,
1240 num_pages,
1241 },
1242 });
1243
1244 if block_offset == 0 && block_forks.len() == 1 {
1247 Self::confirm_canonical_block(block_number, &mut state_data, &options);
1248 }
1249
1250 Ok(())
1251 },
1252 };
1253
1254 let storage_backend_adapter =
1255 StorageBackendAdapter::open(write_buffer_size, storage_item_handlers, storage_backend)
1256 .await?;
1257
1258 if let Some(best_block) = state_data.blocks.front().and_then(|block_forks| {
1259 block_forks.last()
1262 }) {
1263 let header: &Block::Header = best_block.header();
1265 let header = header.header();
1266 let block_number = header.prefix.number;
1267 let block_root = *header.root();
1268
1269 if !Self::adjust_ancestor_block_forks(&mut state_data.blocks, block_root) {
1270 return Err(ClientDatabaseError::FailedToAdjustAncestorBlockForks);
1271 }
1272
1273 state_data.fork_tips.push_front(ForkTip {
1275 number: block_number,
1276 root: block_root,
1277 });
1278 } else {
1279 let GenesisBlockBuilderResult {
1280 block,
1281 system_contract_states,
1282 } = genesis_block_builder();
1283
1284 let header = block.header().header();
1286 let block_number = header.prefix.number;
1287 let block_root = *header.root();
1288
1289 state_data.fork_tips.push_front(ForkTip {
1290 number: block_number,
1291 root: block_root,
1292 });
1293 state_data.block_roots.insert(block_root, block_number);
1294 state_data
1295 .blocks
1296 .push_front(smallvec![ClientDatabaseBlock::InMemory(
1297 ClientDatabaseBlockInMemory {
1298 block,
1299 block_details: BlockDetails {
1300 system_contract_states,
1301 mmr_with_block: Arc::new({
1302 let mut mmr = BlockMerkleMountainRange::new();
1303 mmr.add_leaf(&block_root);
1304 mmr
1305 })
1306 },
1307 }
1308 )]);
1309 }
1310
1311 let state = State {
1312 data: state_data,
1313 segment_headers_cache,
1314 storage_backend_adapter: AsyncRwLock::new(storage_backend_adapter),
1315 };
1316
1317 let inner = Inner {
1318 state: AsyncRwLock::new(state),
1319 options,
1320 };
1321
1322 Ok(Self {
1323 inner: Arc::new(inner),
1324 })
1325 }
1326
1327 pub async fn format(
1329 storage_backend: &StorageBackend,
1330 options: ClientDatabaseFormatOptions,
1331 ) -> Result<(), ClientDatabaseFormatError> {
1332 StorageBackendAdapter::format(storage_backend, options).await
1333 }
1334
1335 fn insert_first_block(state: &mut StateData<Block>, block: Block, block_details: BlockDetails) {
1336 let header = block.header().header();
1338 let block_number = header.prefix.number;
1339 let block_root = *header.root();
1340
1341 state.fork_tips.clear();
1342 state.fork_tips.push_front(ForkTip {
1343 number: block_number,
1344 root: block_root,
1345 });
1346 state.block_roots.clear();
1347 state.block_roots.insert(block_root, block_number);
1348 state.blocks.clear();
1349 state
1350 .blocks
1351 .push_front(smallvec![ClientDatabaseBlock::InMemory(
1352 ClientDatabaseBlockInMemory {
1353 block,
1354 block_details,
1355 }
1356 )]);
1357 }
1358
1359 async fn insert_new_best_block(
1360 mut state: AsyncRwLockWriteGuard<'_, State<Block, StorageBackend>>,
1361 inner: &Inner<Block, StorageBackend>,
1362 block: Block,
1363 block_details: BlockDetails,
1364 ) -> Result<(), PersistBlockError> {
1365 let header = block.header().header();
1366 let block_number = header.prefix.number;
1367 let block_root = *header.root();
1368 let parent_root = header.prefix.parent_root;
1369
1370 if !Self::adjust_ancestor_block_forks(&mut state.data.blocks, parent_root) {
1373 return Err(PersistBlockError::MissingParent);
1374 }
1375
1376 {
1378 for (index, fork_tip) in state.data.fork_tips.iter_mut().enumerate() {
1379 if fork_tip.root == parent_root {
1381 state.data.fork_tips.remove(index);
1382 break;
1383 }
1384 }
1385
1386 state.data.fork_tips.push_front(ForkTip {
1387 number: block_number,
1388 root: block_root,
1389 });
1390 state.data.block_roots.insert(block_root, block_number);
1391 state
1392 .data
1393 .blocks
1394 .push_front(smallvec![ClientDatabaseBlock::InMemory(
1395 ClientDatabaseBlockInMemory {
1396 block,
1397 block_details: block_details.clone()
1398 }
1399 )]);
1400 }
1401
1402 let options = &inner.options;
1403
1404 Self::confirm_canonical_block(block_number, &mut state.data, options);
1405 Self::prune_outdated_fork_tips(block_number, &mut state.data, options);
1406
1407 let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1412
1413 let mut blocks_to_persist = Vec::new();
1414 for block_offset in options.soft_confirmation_depth.as_u64() as usize.. {
1415 let Some(fork_blocks) = state.data.blocks.get(block_offset) else {
1416 break;
1417 };
1418
1419 let len_before = blocks_to_persist.len();
1420 fork_blocks
1421 .iter()
1422 .enumerate()
1423 .filter_map(|(fork_offset, client_database_block)| {
1424 match client_database_block {
1425 ClientDatabaseBlock::InMemory(block) => Some(BlockToPersist {
1426 block_offset,
1427 fork_offset,
1428 block,
1429 }),
1430 ClientDatabaseBlock::Persisted { .. }
1431 | ClientDatabaseBlock::PersistedConfirmed { .. } => {
1432 None
1434 }
1435 }
1436 })
1437 .collect_into(&mut blocks_to_persist);
1438
1439 if blocks_to_persist.len() == len_before {
1440 break;
1441 }
1442 }
1443
1444 let mut persisted_blocks = Vec::with_capacity(blocks_to_persist.len());
1446 {
1447 let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1448
1449 for block_to_persist in blocks_to_persist.into_iter().rev() {
1450 let BlockToPersist {
1451 block_offset,
1452 fork_offset,
1453 block,
1454 } = block_to_persist;
1455
1456 let write_location = storage_backend_adapter
1457 .write_storage_item(StorageItemBlock::Block(StorageItemBlockBlock {
1458 header: block.block.header().buffer().clone(),
1459 body: block.block.body().buffer().clone(),
1460 mmr_with_block: Arc::clone(&block.block_details.mmr_with_block),
1461 system_contract_states: StdArc::clone(
1462 &block.block_details.system_contract_states,
1463 ),
1464 }))
1465 .await?;
1466
1467 persisted_blocks.push(PersistedBlock {
1468 block_offset,
1469 fork_offset,
1470 write_location,
1471 });
1472 }
1473 }
1474
1475 let mut state = RwLockUpgradableReadGuard::upgrade(state).await;
1477 for persisted_block in persisted_blocks {
1478 let PersistedBlock {
1479 block_offset,
1480 fork_offset,
1481 write_location,
1482 } = persisted_block;
1483
1484 let block = state
1485 .data
1486 .blocks
1487 .get_mut(block_offset)
1488 .expect("Still holding the same lock since last check; qed")
1489 .get_mut(fork_offset)
1490 .expect("Still holding the same lock since last check; qed");
1491
1492 replace_with_or_abort(block, |block| {
1493 if let ClientDatabaseBlock::InMemory(in_memory) = block {
1494 let (header, _body) = in_memory.block.split();
1495
1496 ClientDatabaseBlock::Persisted {
1497 header,
1498 block_details: in_memory.block_details,
1499 write_location,
1500 }
1501 } else {
1502 unreachable!("Still holding the same lock since last check; qed");
1503 }
1504 });
1505 }
1506
1507 Ok(())
1511 }
1512
1513 #[must_use]
1518 fn adjust_ancestor_block_forks(
1519 blocks: &mut VecDeque<SmallVec<[ClientDatabaseBlock<Block>; 2]>>,
1520 mut parent_block_root: BlockRoot,
1521 ) -> bool {
1522 let mut ancestor_blocks = blocks.iter_mut();
1523
1524 loop {
1525 if ancestor_blocks.len() == 1 {
1526 break;
1528 }
1529
1530 let Some(parent_blocks) = ancestor_blocks.next() else {
1531 break;
1533 };
1534
1535 let Some(fork_offset_parent_block_root) =
1536 parent_blocks
1537 .iter()
1538 .enumerate()
1539 .find_map(|(fork_offset, fork_block)| {
1540 let fork_header = fork_block.header().header();
1541 if *fork_header.root() == parent_block_root {
1542 Some((fork_offset, fork_header.prefix.parent_root))
1543 } else {
1544 None
1545 }
1546 })
1547 else {
1548 return false;
1549 };
1550
1551 let fork_offset;
1552 (fork_offset, parent_block_root) = fork_offset_parent_block_root;
1553
1554 parent_blocks.swap(0, fork_offset);
1555 }
1556
1557 true
1558 }
1559
1560 fn prune_outdated_fork_tips(
1567 best_number: BlockNumber,
1568 state: &mut StateData<Block>,
1569 options: &ClientDatabaseInnerOptions,
1570 ) {
1571 let state = &mut *state;
1572
1573 let mut candidate_forks_to_remove = Vec::with_capacity(options.max_fork_tips.get());
1576
1577 state.fork_tips.retain(|fork_tip| {
1579 if best_number - fork_tip.number > options.max_fork_tip_distance {
1580 candidate_forks_to_remove.push(*fork_tip);
1581 false
1582 } else {
1583 true
1584 }
1585 });
1586 if state.fork_tips.len() > options.max_fork_tips.get() {
1588 state
1589 .fork_tips
1590 .drain(options.max_fork_tips.get()..)
1591 .collect_into(&mut candidate_forks_to_remove);
1592 }
1593
1594 candidate_forks_to_remove
1596 .retain(|fork_tip| !Self::prune_outdated_fork(best_number, fork_tip, state));
1597 state.fork_tips.extend(candidate_forks_to_remove);
1599 }
1600
1601 #[must_use]
1604 fn prune_outdated_fork(
1605 best_number: BlockNumber,
1606 fork_tip: &ForkTip,
1607 state: &mut StateData<Block>,
1608 ) -> bool {
1609 let block_offset = (best_number - fork_tip.number).as_u64() as usize;
1610
1611 let mut block_root_to_prune = fork_tip.root;
1613 let mut pruned_tip = false;
1614 for block_offset in block_offset.. {
1615 let Some(fork_blocks) = state.blocks.get_mut(block_offset) else {
1616 if !pruned_tip {
1617 error!(
1618 %best_number,
1619 ?fork_tip,
1620 block_offset,
1621 "Block offset was not present in the database, this is an implementation \
1622 bug #1"
1623 );
1624 }
1625 break;
1627 };
1628
1629 if fork_blocks.len() == 1 {
1630 if !pruned_tip {
1631 error!(
1632 %best_number,
1633 ?fork_tip,
1634 block_offset,
1635 "Block offset was not present in the database, this is an implementation \
1636 bug #2"
1637 );
1638 }
1639
1640 break;
1642 }
1643
1644 let Some((fork_offset, block)) = fork_blocks
1645 .iter()
1646 .enumerate()
1647 .skip(1)
1649 .find(|(_fork_offset, block)| {
1650 *block.header().header().root() == block_root_to_prune
1651 })
1652 else {
1653 if !pruned_tip {
1654 error!(
1655 %best_number,
1656 ?fork_tip,
1657 block_offset,
1658 "Block offset was not present in the database, this is an implementation \
1659 bug #3"
1660 );
1661 }
1662
1663 break;
1665 };
1666
1667 if block.header().ref_count() > 1 {
1669 break;
1670 }
1671
1672 match block {
1674 ClientDatabaseBlock::InMemory(_) => {
1675 }
1677 ClientDatabaseBlock::Persisted { .. }
1678 | ClientDatabaseBlock::PersistedConfirmed { .. } => {
1679 pruned_tip = true;
1681 break;
1682 }
1683 }
1684
1685 state.block_roots.get_mut(&block_root_to_prune);
1686 block_root_to_prune = block.header().header().prefix.parent_root;
1687 fork_blocks.swap_remove(fork_offset);
1688
1689 pruned_tip = true;
1690 }
1691
1692 pruned_tip
1693 }
1694
1695 fn confirm_canonical_block(
1698 best_number: BlockNumber,
1699 state_data: &mut StateData<Block>,
1700 options: &ClientDatabaseInnerOptions,
1701 ) {
1702 let block_offset = (options.confirmation_depth_k + BlockNumber::ONE).as_u64() as usize;
1706
1707 let Some(fork_blocks) = state_data.blocks.get_mut(block_offset) else {
1708 return;
1710 };
1711
1712 {
1714 let Some(canonical_block) = fork_blocks.first_mut() else {
1715 error!(
1716 %best_number,
1717 block_offset,
1718 "Have not found a canonical block to confirm, this is an implementation bug"
1719 );
1720 return;
1721 };
1722
1723 replace_with_or_abort(canonical_block, |block| match block {
1724 ClientDatabaseBlock::InMemory(_) => {
1725 error!(
1726 %best_number,
1727 block_offset,
1728 header = ?block.header(),
1729 "Block to be confirmed must not be in memory, this is an implementation bug"
1730 );
1731 block
1732 }
1733 ClientDatabaseBlock::Persisted {
1734 header,
1735 block_details: _,
1736 write_location,
1737 } => ClientDatabaseBlock::PersistedConfirmed {
1738 header,
1739 write_location,
1740 },
1741 ClientDatabaseBlock::PersistedConfirmed { .. } => {
1742 error!(
1743 %best_number,
1744 block_offset,
1745 header = ?block.header(),
1746 "Block to be confirmed must not be confirmed yet, this is an \
1747 implementation bug"
1748 );
1749 block
1750 }
1751 });
1752 }
1753
1754 let mut block_roots_to_prune = fork_blocks
1756 .drain(1..)
1757 .map(|block| *block.header().header().root())
1758 .collect::<Vec<_>>();
1759 let mut current_block_offset = block_offset;
1760 while !block_roots_to_prune.is_empty() {
1761 state_data
1763 .fork_tips
1764 .retain(|fork_tip| !block_roots_to_prune.contains(&fork_tip.root));
1765
1766 for block_root in &block_roots_to_prune {
1768 state_data.block_roots.remove(block_root);
1769 }
1770
1771 if let Some(next_block_offset) = current_block_offset.checked_sub(1) {
1773 current_block_offset = next_block_offset;
1774 } else {
1775 break;
1777 }
1778
1779 let fork_blocks = state_data
1780 .blocks
1781 .get_mut(current_block_offset)
1782 .expect("Lower block offset always exists; qed");
1783
1784 block_roots_to_prune = fork_blocks
1786 .drain_filter(|block| {
1787 let header = block.header().header();
1788
1789 block_roots_to_prune.contains(&header.prefix.parent_root)
1790 })
1791 .map(|block| *block.header().header().root())
1792 .collect();
1793 }
1794 }
1795}