1#![expect(incomplete_features, reason = "generic_const_exprs")]
44#![feature(generic_const_exprs)]
47#![feature(
48 const_block_items,
49 const_convert,
50 const_trait_impl,
51 default_field_values,
52 get_mut_unchecked,
53 iter_collect_into,
54 maybe_uninit_as_bytes,
55 maybe_uninit_fill,
56 try_blocks
57)]
58
59mod page_group;
60pub mod storage_backend;
61mod storage_backend_adapter;
62
63use crate::page_group::block::StorageItemBlock;
64use crate::page_group::block::block::StorageItemBlockBlock;
65use crate::page_group::block::segment_headers::StorageItemBlockSegmentHeaders;
66use crate::storage_backend::ClientDatabaseStorageBackend;
67use crate::storage_backend_adapter::{
68 StorageBackendAdapter, StorageItemHandlerArg, StorageItemHandlers, WriteLocation,
69};
70use ab_client_api::{
71 BlockDetails, BlockMerkleMountainRange, ChainInfo, ChainInfoWrite, ContractSlotState,
72 PersistBlockError, PersistSegmentHeadersError, ReadBlockError,
73};
74use ab_core_primitives::block::body::owned::GenericOwnedBlockBody;
75use ab_core_primitives::block::header::GenericBlockHeader;
76use ab_core_primitives::block::header::owned::GenericOwnedBlockHeader;
77use ab_core_primitives::block::owned::GenericOwnedBlock;
78use ab_core_primitives::block::{BlockNumber, BlockRoot, GenericBlock};
79use ab_core_primitives::segments::{LocalSegmentIndex, SegmentHeader};
80use ab_core_primitives::shard::RealShardKind;
81use ab_io_type::trivial_type::TrivialType;
82use async_lock::{
83 RwLock as AsyncRwLock, RwLockUpgradableReadGuard, RwLockWriteGuard as AsyncRwLockWriteGuard,
84};
85use rand::rngs::SysError;
86use rclite::Arc;
87use replace_with::replace_with_or_abort;
88use smallvec::{SmallVec, smallvec};
89use std::collections::{HashMap, VecDeque};
90use std::hash::{BuildHasherDefault, Hasher};
91use std::num::{NonZeroU32, NonZeroUsize};
92use std::ops::Deref;
93use std::sync::Arc as StdArc;
94use std::{fmt, io};
95use tracing::error;
96
97#[derive(Debug, Copy, Clone, Eq, PartialEq, TrivialType)]
99#[repr(C)]
100pub struct DatabaseId([u8; 32]);
101
102impl Deref for DatabaseId {
103 type Target = [u8; 32];
104
105 #[inline(always)]
106 fn deref(&self) -> &Self::Target {
107 &self.0
108 }
109}
110
111impl AsRef<[u8]> for DatabaseId {
112 #[inline(always)]
113 fn as_ref(&self) -> &[u8] {
114 &self.0
115 }
116}
117
118impl DatabaseId {
119 #[inline(always)]
120 pub const fn new(bytes: [u8; 32]) -> Self {
121 Self(bytes)
122 }
123}
124
125#[derive(Default)]
126struct BlockRootHasher(u64);
127
128impl Hasher for BlockRootHasher {
129 #[inline(always)]
130 fn finish(&self) -> u64 {
131 self.0
132 }
133
134 #[inline(always)]
135 fn write(&mut self, bytes: &[u8]) {
136 let Some(state) = bytes.as_chunks().0.first().copied().map(u64::from_le_bytes) else {
137 return;
138 };
139
140 self.0 = state;
141 }
142}
143
144#[derive(Debug)]
145pub struct GenesisBlockBuilderResult<Block> {
146 pub block: Block,
148 pub system_contract_states: StdArc<[ContractSlotState]>,
150}
151
152#[derive(Debug, Copy, Clone)]
154pub struct ClientDatabaseOptions<GBB, StorageBackend> {
155 pub write_buffer_size: usize = 5,
163 pub confirmation_depth_k: BlockNumber,
169 pub soft_confirmation_depth: BlockNumber = BlockNumber::from(3),
183 pub max_fork_tips: NonZeroUsize = NonZeroUsize::new(3).expect("Not zero; qed"),
194 pub max_fork_tip_distance: BlockNumber = BlockNumber::from(5),
205 pub genesis_block_builder: GBB,
208 pub storage_backend: StorageBackend,
210}
211
212#[derive(Debug, Copy, Clone)]
214pub struct ClientDatabaseFormatOptions {
215 pub page_group_size: NonZeroU32,
232 pub force: bool,
236}
237
238#[derive(Debug, thiserror::Error)]
239pub enum ClientDatabaseError {
240 #[error("Invalid soft confirmation depth, it must be smaller than confirmation depth k")]
242 InvalidSoftConfirmationDepth,
243 #[error("Invalid max fork tip distance, it must be smaller or equal to confirmation depth k")]
245 InvalidMaxForkTipDistance,
246 #[error("Storage backend has canceled read request")]
248 ReadRequestCancelled,
249 #[error("Storage backend read error: {error}")]
251 ReadError {
252 error: io::Error,
254 },
255 #[error("Unsupported database version: {database_version}")]
257 UnsupportedDatabaseVersion {
258 database_version: u8,
260 },
261 #[error("Page group size is too small ({page_group_size}), must be at least two pages")]
263 PageGroupSizeTooSmall {
264 page_group_size: u32,
266 },
267 #[error(
269 "Unexpected sequence number {actual} at page offset {page_offset} (expected \
270 {expected})"
271 )]
272 UnexpectedSequenceNumber {
273 actual: u64,
275 expected: u64,
277 page_offset: u32,
279 },
280 #[error("Unexpected storage item at offset {page_offset}: {storage_item:?}")]
282 UnexpectedStorageItem {
283 storage_item: Box<dyn fmt::Debug + Send + Sync>,
285 page_offset: u32,
287 },
288 #[error("Invalid block at offset {page_offset}")]
290 InvalidBlock {
291 page_offset: u32,
293 },
294 #[error("Invalid segment headers at offset {page_offset}")]
296 InvalidSegmentHeaders {
297 page_offset: u32,
299 },
300 #[error("Failed to adjust ancestor block forks")]
302 FailedToAdjustAncestorBlockForks,
303 #[error("Database is not formatted yet")]
305 Unformatted,
306 #[error("Non-permanent first page group")]
308 NonPermanentFirstPageGroup,
309}
310
311#[derive(Debug, thiserror::Error)]
313pub enum ClientDatabaseFormatError {
314 #[error("Storage backend has canceled read request")]
316 ReadRequestCancelled,
317 #[error("Storage backend read error: {error}")]
319 ReadError {
320 error: io::Error,
322 },
323 #[error("Failed to generate database id")]
325 FailedToGenerateDatabaseId {
326 #[from]
328 error: SysError,
329 },
330 #[error("Database is already formatted yet")]
332 AlreadyFormatted,
333 #[error("Storage backend has canceled a writing request")]
335 WriteRequestCancelled,
336 #[error("Storage item write error")]
338 StorageItemWriteError {
339 #[from]
341 error: io::Error,
342 },
343}
344
345#[derive(Debug, Copy, Clone)]
346struct ForkTip {
347 number: BlockNumber,
348 root: BlockRoot,
349}
350
351#[derive(Debug)]
352struct ClientDatabaseBlockInMemory<Block>
353where
354 Block: GenericOwnedBlock,
355{
356 block: Block,
357 block_details: BlockDetails,
358}
359
360enum FullBlock<'a, Block>
361where
362 Block: GenericOwnedBlock,
363{
364 InMemory(&'a Block),
365 Persisted {
366 header: &'a Block::Header,
367 write_location: WriteLocation,
368 },
369}
370
371#[derive(Debug)]
378enum ClientDatabaseBlock<Block>
379where
380 Block: GenericOwnedBlock,
381{
382 InMemory(ClientDatabaseBlockInMemory<Block>),
384 Persisted {
386 header: Block::Header,
387 block_details: BlockDetails,
388 write_location: WriteLocation,
389 },
390 PersistedConfirmed {
393 header: Block::Header,
394 write_location: WriteLocation,
395 },
396}
397
398impl<Block> ClientDatabaseBlock<Block>
399where
400 Block: GenericOwnedBlock,
401{
402 #[inline(always)]
403 fn header(&self) -> &Block::Header {
404 match self {
405 Self::InMemory(in_memory) => in_memory.block.header(),
406 Self::Persisted { header, .. } => header,
407 Self::PersistedConfirmed { header, .. } => header,
408 }
409 }
410
411 #[inline(always)]
412 fn full_block(&self) -> FullBlock<'_, Block> {
413 match self {
414 Self::InMemory(in_memory) => FullBlock::InMemory(&in_memory.block),
415 Self::Persisted {
416 header,
417 write_location,
418 ..
419 }
420 | Self::PersistedConfirmed {
421 header,
422 write_location,
423 } => FullBlock::Persisted {
424 header,
425 write_location: *write_location,
426 },
427 }
428 }
429
430 #[inline(always)]
431 fn block_details(&self) -> Option<&BlockDetails> {
432 match self {
433 Self::InMemory(in_memory) => Some(&in_memory.block_details),
434 Self::Persisted { block_details, .. } => Some(block_details),
435 Self::PersistedConfirmed { .. } => None,
436 }
437 }
438}
439
440#[derive(Debug)]
441struct StateData<Block>
442where
443 Block: GenericOwnedBlock,
444{
445 fork_tips: VecDeque<ForkTip>,
450 block_roots: HashMap<BlockRoot, BlockNumber, BuildHasherDefault<BlockRootHasher>>,
455 blocks: VecDeque<SmallVec<[ClientDatabaseBlock<Block>; 2]>>,
470}
471
472#[derive(Debug)]
473struct SegmentHeadersCache {
474 segment_headers_cache: Vec<SegmentHeader>,
475}
476
477impl SegmentHeadersCache {
478 #[inline(always)]
479 fn last_segment_header(&self) -> Option<SegmentHeader> {
480 self.segment_headers_cache.last().cloned()
481 }
482
483 #[inline(always)]
484 fn max_local_segment_index(&self) -> Option<LocalSegmentIndex> {
485 self.segment_headers_cache
486 .last()
487 .map(|segment_header| segment_header.segment_index.as_inner())
488 }
489
490 #[inline(always)]
491 fn get_segment_header(&self, local_segment_index: LocalSegmentIndex) -> Option<SegmentHeader> {
492 self.segment_headers_cache
493 .get(u64::from(local_segment_index) as usize)
494 .copied()
495 }
496
497 fn add_segment_headers(
499 &mut self,
500 mut segment_headers: Vec<SegmentHeader>,
501 ) -> Result<Vec<SegmentHeader>, PersistSegmentHeadersError> {
502 self.segment_headers_cache.reserve(segment_headers.len());
503
504 let mut maybe_last_local_segment_index = self.max_local_segment_index();
505
506 if let Some(last_segment_index) = maybe_last_local_segment_index {
507 segment_headers.retain(|segment_header| {
509 segment_header.segment_index.as_inner() > last_segment_index
510 });
511 }
512
513 for segment_header in segment_headers.iter().copied() {
516 let local_segment_index = segment_header.local_segment_index();
517 match maybe_last_local_segment_index {
518 Some(last_local_segment_index) => {
519 if local_segment_index != last_local_segment_index + LocalSegmentIndex::ONE {
520 return Err(PersistSegmentHeadersError::MustFollowLastSegmentIndex {
521 local_segment_index,
522 last_local_segment_index,
523 });
524 }
525
526 self.segment_headers_cache.push(segment_header);
527 maybe_last_local_segment_index.replace(local_segment_index);
528 }
529 None => {
530 if local_segment_index != LocalSegmentIndex::ZERO {
531 return Err(PersistSegmentHeadersError::FirstSegmentIndexZero {
532 local_segment_index,
533 });
534 }
535
536 self.segment_headers_cache.push(segment_header);
537 maybe_last_local_segment_index.replace(local_segment_index);
538 }
539 }
540 }
541
542 Ok(segment_headers)
543 }
544}
545
546#[derive(Debug)]
548struct State<Block, StorageBackend>
549where
550 Block: GenericOwnedBlock,
551{
552 data: StateData<Block>,
553 segment_headers_cache: SegmentHeadersCache,
554 storage_backend_adapter: AsyncRwLock<StorageBackendAdapter<StorageBackend>>,
555}
556
557impl<Block, StorageBackend> State<Block, StorageBackend>
558where
559 Block: GenericOwnedBlock,
560{
561 #[inline(always)]
562 fn best_tip(&self) -> &ForkTip {
563 self.data
564 .fork_tips
565 .front()
566 .expect("The best block is always present; qed")
567 }
568
569 #[inline(always)]
570 fn best_block(&self) -> &ClientDatabaseBlock<Block> {
571 self.data
572 .blocks
573 .front()
574 .expect("The best block is always present; qed")
575 .first()
576 .expect("The best block is always present; qed")
577 }
578}
579
580#[derive(Debug)]
581struct BlockToPersist<'a, Block>
582where
583 Block: GenericOwnedBlock,
584{
585 block_offset: usize,
586 fork_offset: usize,
587 block: &'a ClientDatabaseBlockInMemory<Block>,
588}
589
590#[derive(Debug)]
591struct PersistedBlock {
592 block_offset: usize,
593 fork_offset: usize,
594 write_location: WriteLocation,
595}
596
597#[derive(Debug)]
598struct ClientDatabaseInnerOptions {
599 confirmation_depth_k: BlockNumber,
600 soft_confirmation_depth: BlockNumber,
601 max_fork_tips: NonZeroUsize,
602 max_fork_tip_distance: BlockNumber,
603}
604
605#[derive(Debug)]
606struct Inner<Block, StorageBackend>
607where
608 Block: GenericOwnedBlock,
609{
610 state: AsyncRwLock<State<Block, StorageBackend>>,
611 options: ClientDatabaseInnerOptions,
612}
613
614#[derive(Debug)]
616pub struct ClientDatabase<Block, StorageBackend>
617where
618 Block: GenericOwnedBlock,
619{
620 inner: Arc<Inner<Block, StorageBackend>>,
621}
622
623impl<Block, StorageBackend> Clone for ClientDatabase<Block, StorageBackend>
624where
625 Block: GenericOwnedBlock,
626{
627 fn clone(&self) -> Self {
628 Self {
629 inner: self.inner.clone(),
630 }
631 }
632}
633
634impl<Block, StorageBackend> Drop for ClientDatabase<Block, StorageBackend>
635where
636 Block: GenericOwnedBlock,
637{
638 fn drop(&mut self) {
639 }
641}
642
643impl<Block, StorageBackend> ChainInfo<Block> for ClientDatabase<Block, StorageBackend>
644where
645 Block: GenericOwnedBlock,
646 StorageBackend: ClientDatabaseStorageBackend,
647{
648 #[inline]
649 fn best_root(&self) -> BlockRoot {
650 self.inner.state.read_blocking().best_tip().root
653 }
654
655 #[inline]
656 fn best_header(&self) -> Block::Header {
657 self.inner
660 .state
661 .read_blocking()
662 .best_block()
663 .header()
664 .clone()
665 }
666
667 #[inline]
668 fn best_header_with_details(&self) -> (Block::Header, BlockDetails) {
669 let state = self.inner.state.read_blocking();
672 let best_block = state.best_block();
673 (
674 best_block.header().clone(),
675 best_block
676 .block_details()
677 .expect("Always present for the best block; qed")
678 .clone(),
679 )
680 }
681
682 #[inline]
684 fn ancestor_header(
685 &self,
686 ancestor_block_number: BlockNumber,
687 descendant_block_root: &BlockRoot,
688 ) -> Option<Block::Header> {
689 let state = self.inner.state.read_blocking();
692 let best_number = state.best_tip().number;
693
694 let ancestor_block_offset =
695 u64::from(best_number.checked_sub(ancestor_block_number)?) as usize;
696 let ancestor_block_candidates = state.data.blocks.get(ancestor_block_offset)?;
697
698 let descendant_block_number = *state.data.block_roots.get(descendant_block_root)?;
699 if ancestor_block_number > descendant_block_number {
700 return None;
701 }
702 let descendant_block_offset =
703 u64::from(best_number.checked_sub(descendant_block_number)?) as usize;
704
705 let mut blocks_range_iter = state
707 .data
708 .blocks
709 .iter()
710 .enumerate()
711 .skip(descendant_block_offset);
712
713 let (_offset, descendant_block_candidates) = blocks_range_iter.next()?;
714 let descendant_header = descendant_block_candidates
715 .iter()
716 .find(|block| &*block.header().header().root() == descendant_block_root)?
717 .header()
718 .header();
719
720 if descendant_block_candidates.len() == 1 || ancestor_block_candidates.len() == 1 {
725 return ancestor_block_candidates
726 .iter()
727 .next()
728 .map(|block| block.header().clone());
729 }
730
731 let mut parent_block_root = &descendant_header.prefix.parent_root;
732
733 for (block_offset, parent_candidates) in blocks_range_iter {
735 let parent_header = parent_candidates
736 .iter()
737 .find(|header| &*header.header().header().root() == parent_block_root)?
738 .header();
739
740 if block_offset == ancestor_block_offset {
742 return Some(parent_header.clone());
743 }
744
745 parent_block_root = &parent_header.header().prefix.parent_root;
746 }
747
748 None
749 }
750
751 #[inline]
752 fn header(&self, block_root: &BlockRoot) -> Option<Block::Header> {
753 let state = self.inner.state.read_blocking();
756 let best_number = state.best_tip().number;
757
758 let block_number = *state.data.block_roots.get(block_root)?;
759 let block_offset = u64::from(best_number.checked_sub(block_number)?) as usize;
760 let block_candidates = state.data.blocks.get(block_offset)?;
761
762 block_candidates.iter().find_map(|block| {
763 let header = block.header();
764
765 if &*header.header().root() == block_root {
766 Some(header.clone())
767 } else {
768 None
769 }
770 })
771 }
772
773 #[inline]
774 fn header_with_details(&self, block_root: &BlockRoot) -> Option<(Block::Header, BlockDetails)> {
775 let state = self.inner.state.read_blocking();
778 let best_number = state.best_tip().number;
779
780 let block_number = *state.data.block_roots.get(block_root)?;
781 let block_offset = u64::from(best_number.checked_sub(block_number)?) as usize;
782 let block_candidates = state.data.blocks.get(block_offset)?;
783
784 block_candidates.iter().find_map(|block| {
785 let header = block.header();
786 let block_details = block.block_details().cloned()?;
787
788 if &*header.header().root() == block_root {
789 Some((header.clone(), block_details))
790 } else {
791 None
792 }
793 })
794 }
795
796 #[inline]
797 async fn block(&self, block_root: &BlockRoot) -> Result<Block, ReadBlockError> {
798 let state = self.inner.state.read().await;
799 let best_number = state.best_tip().number;
800
801 let block_number = *state
802 .data
803 .block_roots
804 .get(block_root)
805 .ok_or(ReadBlockError::UnknownBlockRoot)?;
806 let block_offset = u64::from(
807 best_number
808 .checked_sub(block_number)
809 .expect("Known block roots always have valid block offset; qed"),
810 ) as usize;
811 let block_candidates = state
812 .data
813 .blocks
814 .get(block_offset)
815 .expect("Valid block offsets always have block entries; qed");
816
817 for block_candidate in block_candidates {
818 let header = block_candidate.header();
819
820 if &*header.header().root() == block_root {
821 return match block_candidate.full_block() {
822 FullBlock::InMemory(block) => Ok(block.clone()),
823 FullBlock::Persisted {
824 header,
825 write_location,
826 } => {
827 let storage_backend_adapter = state.storage_backend_adapter.read().await;
828
829 let storage_item = storage_backend_adapter
830 .read_storage_item::<StorageItemBlock>(write_location)
831 .await?;
832
833 let storage_item_block = match storage_item {
834 StorageItemBlock::Block(storage_item_block) => storage_item_block,
835 StorageItemBlock::SegmentHeaders(_) => {
836 return Err(ReadBlockError::StorageItemReadError {
837 error: io::Error::other(
838 "Unexpected storage item: SegmentHeaders",
839 ),
840 });
841 }
842 };
843
844 let StorageItemBlockBlock {
845 header: _,
846 body,
847 mmr_with_block: _,
848 system_contract_states: _,
849 } = storage_item_block;
850
851 Block::from_buffers(header.buffer().clone(), body)
852 .ok_or(ReadBlockError::FailedToDecode)
853 }
854 };
855 }
856 }
857
858 unreachable!("Known block root always has block candidate associated with it; qed")
859 }
860
861 #[inline]
862 fn last_segment_header(&self) -> Option<SegmentHeader> {
863 let state = self.inner.state.read_blocking();
866 state.segment_headers_cache.last_segment_header()
867 }
868
869 #[inline]
870 fn max_local_segment_index(&self) -> Option<LocalSegmentIndex> {
871 let state = self.inner.state.read_blocking();
874
875 state.segment_headers_cache.max_local_segment_index()
876 }
877
878 #[inline]
879 fn get_segment_header(&self, segment_index: LocalSegmentIndex) -> Option<SegmentHeader> {
880 let state = self.inner.state.read_blocking();
883
884 state
885 .segment_headers_cache
886 .get_segment_header(segment_index)
887 }
888
889 #[inline]
890 fn segment_headers_for_block(&self, block_number: BlockNumber) -> Vec<SegmentHeader> {
891 let state = self.inner.state.read_blocking();
894
895 let Some(last_local_segment_index) = state.segment_headers_cache.max_local_segment_index()
896 else {
897 return Vec::new();
899 };
900
901 if Block::Block::SHARD_KIND == RealShardKind::BeaconChain
903 && block_number == BlockNumber::ONE
904 {
905 return vec![
908 state
909 .segment_headers_cache
910 .get_segment_header(LocalSegmentIndex::ZERO)
911 .expect("Segment headers are stored in monotonically increasing order; qed"),
912 ];
913 }
914
915 if last_local_segment_index == LocalSegmentIndex::ZERO {
916 return Vec::new();
918 }
919
920 let mut current_local_segment_index = last_local_segment_index;
921 loop {
922 let current_segment_header = state
925 .segment_headers_cache
926 .get_segment_header(current_local_segment_index)
927 .expect("Segment headers are stored in monotonically increasing order; qed");
928
929 let target_block_number = current_segment_header.last_archived_block.number()
931 + BlockNumber::ONE
932 + self.inner.options.confirmation_depth_k;
933 if target_block_number == block_number {
934 let mut headers_for_block = vec![current_segment_header];
935
936 let last_archived_block_number = current_segment_header.last_archived_block.number;
938 let mut local_segment_index = current_local_segment_index - LocalSegmentIndex::ONE;
939
940 while let Some(segment_header) = state
941 .segment_headers_cache
942 .get_segment_header(local_segment_index)
943 {
944 if segment_header.last_archived_block.number == last_archived_block_number {
945 headers_for_block.insert(0, segment_header);
946 local_segment_index -= LocalSegmentIndex::ONE;
947 } else {
948 break;
949 }
950 }
951
952 return headers_for_block;
953 }
954
955 if target_block_number > block_number {
957 if current_local_segment_index > LocalSegmentIndex::ONE {
959 current_local_segment_index -= LocalSegmentIndex::ONE
960 } else {
961 break;
962 }
963 } else {
964 return Vec::new();
966 }
967 }
968
969 Vec::new()
971 }
972}
973
974impl<Block, StorageBackend> ChainInfoWrite<Block> for ClientDatabase<Block, StorageBackend>
975where
976 Block: GenericOwnedBlock,
977 StorageBackend: ClientDatabaseStorageBackend,
978{
979 async fn persist_block(
980 &self,
981 block: Block,
982 block_details: BlockDetails,
983 ) -> Result<(), PersistBlockError> {
984 let mut state = self.inner.state.write().await;
985 let best_number = state.best_tip().number;
986
987 let header = block.header().header();
988
989 let block_number = header.prefix.number;
990
991 if best_number == BlockNumber::ZERO && block_number != BlockNumber::ONE {
992 Self::insert_first_block(&mut state.data, block, block_details);
994
995 return Ok(());
996 }
997
998 if block_number == best_number + BlockNumber::ONE {
999 return Self::insert_new_best_block(state, &self.inner, block, block_details).await;
1000 }
1001
1002 let block_offset = u64::from(
1003 best_number
1004 .checked_sub(block_number)
1005 .ok_or(PersistBlockError::MissingParent)?,
1006 ) as usize;
1007
1008 if block_offset >= u64::from(self.inner.options.confirmation_depth_k) as usize {
1009 return Err(PersistBlockError::OutsideAcceptableRange);
1010 }
1011
1012 let state = &mut *state;
1013
1014 let block_forks = state.data.blocks.get_mut(block_offset).ok_or_else(|| {
1015 error!(
1016 %block_number,
1017 %block_offset,
1018 "Failed to store block fork, header offset is missing despite being within \
1019 acceptable range"
1020 );
1021
1022 PersistBlockError::OutsideAcceptableRange
1023 })?;
1024
1025 for (index, fork_tip) in state.data.fork_tips.iter_mut().enumerate() {
1026 if fork_tip.root == header.prefix.parent_root {
1028 state.data.fork_tips.remove(index);
1029 break;
1030 }
1031 }
1032
1033 let block_root = *header.root();
1034 state.data.fork_tips.insert(
1037 1,
1038 ForkTip {
1039 number: block_number,
1040 root: block_root,
1041 },
1042 );
1043 state.data.block_roots.insert(block_root, block_number);
1044 block_forks.push(ClientDatabaseBlock::InMemory(ClientDatabaseBlockInMemory {
1045 block,
1046 block_details,
1047 }));
1048
1049 Self::prune_outdated_fork_tips(block_number, &mut state.data, &self.inner.options);
1050
1051 Ok(())
1052 }
1053
1054 async fn persist_segment_headers(
1055 &self,
1056 segment_headers: Vec<SegmentHeader>,
1057 ) -> Result<(), PersistSegmentHeadersError> {
1058 let mut state = self.inner.state.write().await;
1059
1060 let added_segment_headers = state
1061 .segment_headers_cache
1062 .add_segment_headers(segment_headers)?;
1063
1064 if added_segment_headers.is_empty() {
1065 return Ok(());
1066 }
1067
1068 let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1073
1074 let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1075
1076 storage_backend_adapter
1077 .write_storage_item(StorageItemBlock::SegmentHeaders(
1078 StorageItemBlockSegmentHeaders {
1079 segment_headers: added_segment_headers,
1080 },
1081 ))
1082 .await?;
1083
1084 Ok(())
1085 }
1086}
1087
1088impl<Block, StorageBackend> ClientDatabase<Block, StorageBackend>
1089where
1090 Block: GenericOwnedBlock,
1091 StorageBackend: ClientDatabaseStorageBackend,
1092{
1093 pub async fn open<GBB>(
1097 options: ClientDatabaseOptions<GBB, StorageBackend>,
1098 ) -> Result<Self, ClientDatabaseError>
1099 where
1100 GBB: FnOnce() -> GenesisBlockBuilderResult<Block>,
1101 {
1102 let ClientDatabaseOptions {
1103 write_buffer_size,
1104 confirmation_depth_k,
1105 soft_confirmation_depth,
1106 max_fork_tips,
1107 max_fork_tip_distance,
1108 genesis_block_builder,
1109 storage_backend,
1110 } = options;
1111 if soft_confirmation_depth >= confirmation_depth_k {
1112 return Err(ClientDatabaseError::InvalidSoftConfirmationDepth);
1113 }
1114
1115 if max_fork_tip_distance > confirmation_depth_k {
1116 return Err(ClientDatabaseError::InvalidMaxForkTipDistance);
1117 }
1118
1119 let mut state_data = StateData {
1120 fork_tips: VecDeque::new(),
1121 block_roots: HashMap::default(),
1122 blocks: VecDeque::new(),
1123 };
1124 let mut segment_headers_cache = SegmentHeadersCache {
1125 segment_headers_cache: Vec::new(),
1126 };
1127
1128 let options = ClientDatabaseInnerOptions {
1129 confirmation_depth_k,
1130 soft_confirmation_depth,
1131 max_fork_tips,
1132 max_fork_tip_distance,
1133 };
1134
1135 let storage_item_handlers = StorageItemHandlers {
1136 permanent: |_arg| {
1137 Ok(())
1139 },
1140 block: |arg| {
1141 let StorageItemHandlerArg {
1142 storage_item,
1143 page_offset,
1144 num_pages,
1145 } = arg;
1146 let storage_item_block = match storage_item {
1147 StorageItemBlock::Block(storage_item_block) => storage_item_block,
1148 StorageItemBlock::SegmentHeaders(segment_headers) => {
1149 let num_segment_headers = segment_headers.segment_headers.len();
1150 return match segment_headers_cache
1151 .add_segment_headers(segment_headers.segment_headers)
1152 {
1153 Ok(_) => Ok(()),
1154 Err(error) => {
1155 error!(
1156 %page_offset,
1157 %num_segment_headers,
1158 %error,
1159 "Failed to add segment headers from storage item"
1160 );
1161
1162 Err(ClientDatabaseError::InvalidSegmentHeaders { page_offset })
1163 }
1164 };
1165 }
1166 };
1167
1168 let StorageItemBlockBlock {
1171 header,
1172 body: _,
1173 mmr_with_block,
1174 system_contract_states,
1175 } = storage_item_block;
1176
1177 let header = Block::Header::from_buffer(header).map_err(|_buffer| {
1178 error!(%page_offset, "Failed to decode block header from bytes");
1179
1180 ClientDatabaseError::InvalidBlock { page_offset }
1181 })?;
1182
1183 let block_root = *header.header().root();
1184 let block_number = header.header().prefix.number;
1185
1186 state_data.block_roots.insert(block_root, block_number);
1187
1188 let maybe_best_number = state_data
1189 .blocks
1190 .front()
1191 .and_then(|block_forks| block_forks.first())
1192 .map(|best_block| {
1193 let header: &Block::Header = best_block.header();
1195
1196 header.header().prefix.number
1197 });
1198
1199 let block_offset = if let Some(best_number) = maybe_best_number {
1200 if block_number <= best_number {
1201 u64::from(best_number - block_number) as usize
1202 } else {
1203 if block_number - best_number != BlockNumber::ONE {
1205 error!(
1206 %page_offset,
1207 %best_number,
1208 %block_number,
1209 "Invalid new best block number, it must be only one block \
1210 higher than the best block"
1211 );
1212
1213 return Err(ClientDatabaseError::InvalidBlock { page_offset });
1214 }
1215
1216 state_data.blocks.push_front(SmallVec::new());
1217 0
1219 }
1220 } else {
1221 state_data.blocks.push_front(SmallVec::new());
1222 0
1224 };
1225
1226 let block_forks = match state_data.blocks.get_mut(block_offset) {
1227 Some(block_forks) => block_forks,
1228 None => {
1229 return Ok(());
1233 }
1234 };
1235
1236 block_forks.push(ClientDatabaseBlock::Persisted {
1238 header,
1239 block_details: BlockDetails {
1240 mmr_with_block,
1241 system_contract_states,
1242 },
1243 write_location: WriteLocation {
1244 page_offset,
1245 num_pages,
1246 },
1247 });
1248
1249 if block_offset == 0 && block_forks.len() == 1 {
1252 Self::confirm_canonical_block(block_number, &mut state_data, &options);
1253 }
1254
1255 Ok(())
1256 },
1257 };
1258
1259 let storage_backend_adapter =
1260 StorageBackendAdapter::open(write_buffer_size, storage_item_handlers, storage_backend)
1261 .await?;
1262
1263 if let Some(best_block) = state_data.blocks.front().and_then(|block_forks| {
1264 block_forks.last()
1267 }) {
1268 let header: &Block::Header = best_block.header();
1270 let header = header.header();
1271 let block_number = header.prefix.number;
1272 let block_root = *header.root();
1273
1274 if !Self::adjust_ancestor_block_forks(&mut state_data.blocks, block_root) {
1275 return Err(ClientDatabaseError::FailedToAdjustAncestorBlockForks);
1276 }
1277
1278 state_data.fork_tips.push_front(ForkTip {
1280 number: block_number,
1281 root: block_root,
1282 });
1283 } else {
1284 let GenesisBlockBuilderResult {
1285 block,
1286 system_contract_states,
1287 } = genesis_block_builder();
1288
1289 let header = block.header().header();
1291 let block_number = header.prefix.number;
1292 let block_root = *header.root();
1293
1294 state_data.fork_tips.push_front(ForkTip {
1295 number: block_number,
1296 root: block_root,
1297 });
1298 state_data.block_roots.insert(block_root, block_number);
1299 state_data
1300 .blocks
1301 .push_front(smallvec![ClientDatabaseBlock::InMemory(
1302 ClientDatabaseBlockInMemory {
1303 block,
1304 block_details: BlockDetails {
1305 system_contract_states,
1306 mmr_with_block: Arc::new({
1307 let mut mmr = BlockMerkleMountainRange::new();
1308 mmr.add_leaf(&block_root);
1309 mmr
1310 })
1311 },
1312 }
1313 )]);
1314 }
1315
1316 let state = State {
1317 data: state_data,
1318 segment_headers_cache,
1319 storage_backend_adapter: AsyncRwLock::new(storage_backend_adapter),
1320 };
1321
1322 let inner = Inner {
1323 state: AsyncRwLock::new(state),
1324 options,
1325 };
1326
1327 Ok(Self {
1328 inner: Arc::new(inner),
1329 })
1330 }
1331
1332 pub async fn format(
1334 storage_backend: &StorageBackend,
1335 options: ClientDatabaseFormatOptions,
1336 ) -> Result<(), ClientDatabaseFormatError> {
1337 StorageBackendAdapter::format(storage_backend, options).await
1338 }
1339
1340 fn insert_first_block(state: &mut StateData<Block>, block: Block, block_details: BlockDetails) {
1341 let header = block.header().header();
1343 let block_number = header.prefix.number;
1344 let block_root = *header.root();
1345
1346 state.fork_tips.clear();
1347 state.fork_tips.push_front(ForkTip {
1348 number: block_number,
1349 root: block_root,
1350 });
1351 state.block_roots.clear();
1352 state.block_roots.insert(block_root, block_number);
1353 state.blocks.clear();
1354 state
1355 .blocks
1356 .push_front(smallvec![ClientDatabaseBlock::InMemory(
1357 ClientDatabaseBlockInMemory {
1358 block,
1359 block_details,
1360 }
1361 )]);
1362 }
1363
1364 async fn insert_new_best_block(
1365 mut state: AsyncRwLockWriteGuard<'_, State<Block, StorageBackend>>,
1366 inner: &Inner<Block, StorageBackend>,
1367 block: Block,
1368 block_details: BlockDetails,
1369 ) -> Result<(), PersistBlockError> {
1370 let header = block.header().header();
1371 let block_number = header.prefix.number;
1372 let block_root = *header.root();
1373 let parent_root = header.prefix.parent_root;
1374
1375 if !Self::adjust_ancestor_block_forks(&mut state.data.blocks, parent_root) {
1378 return Err(PersistBlockError::MissingParent);
1379 }
1380
1381 {
1383 for (index, fork_tip) in state.data.fork_tips.iter_mut().enumerate() {
1384 if fork_tip.root == parent_root {
1386 state.data.fork_tips.remove(index);
1387 break;
1388 }
1389 }
1390
1391 state.data.fork_tips.push_front(ForkTip {
1392 number: block_number,
1393 root: block_root,
1394 });
1395 state.data.block_roots.insert(block_root, block_number);
1396 state
1397 .data
1398 .blocks
1399 .push_front(smallvec![ClientDatabaseBlock::InMemory(
1400 ClientDatabaseBlockInMemory {
1401 block,
1402 block_details: block_details.clone()
1403 }
1404 )]);
1405 }
1406
1407 let options = &inner.options;
1408
1409 Self::confirm_canonical_block(block_number, &mut state.data, options);
1410 Self::prune_outdated_fork_tips(block_number, &mut state.data, options);
1411
1412 let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1417
1418 let mut blocks_to_persist = Vec::new();
1419 for block_offset in u64::from(options.soft_confirmation_depth) as usize.. {
1420 let Some(fork_blocks) = state.data.blocks.get(block_offset) else {
1421 break;
1422 };
1423
1424 let len_before = blocks_to_persist.len();
1425 fork_blocks
1426 .iter()
1427 .enumerate()
1428 .filter_map(|(fork_offset, client_database_block)| {
1429 match client_database_block {
1430 ClientDatabaseBlock::InMemory(block) => Some(BlockToPersist {
1431 block_offset,
1432 fork_offset,
1433 block,
1434 }),
1435 ClientDatabaseBlock::Persisted { .. }
1436 | ClientDatabaseBlock::PersistedConfirmed { .. } => {
1437 None
1439 }
1440 }
1441 })
1442 .collect_into(&mut blocks_to_persist);
1443
1444 if blocks_to_persist.len() == len_before {
1445 break;
1446 }
1447 }
1448
1449 let mut persisted_blocks = Vec::with_capacity(blocks_to_persist.len());
1451 {
1452 let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1453
1454 for block_to_persist in blocks_to_persist.into_iter().rev() {
1455 let BlockToPersist {
1456 block_offset,
1457 fork_offset,
1458 block,
1459 } = block_to_persist;
1460
1461 let write_location = storage_backend_adapter
1462 .write_storage_item(StorageItemBlock::Block(StorageItemBlockBlock {
1463 header: block.block.header().buffer().clone(),
1464 body: block.block.body().buffer().clone(),
1465 mmr_with_block: Arc::clone(&block.block_details.mmr_with_block),
1466 system_contract_states: StdArc::clone(
1467 &block.block_details.system_contract_states,
1468 ),
1469 }))
1470 .await?;
1471
1472 persisted_blocks.push(PersistedBlock {
1473 block_offset,
1474 fork_offset,
1475 write_location,
1476 });
1477 }
1478 }
1479
1480 let mut state = RwLockUpgradableReadGuard::upgrade(state).await;
1482 for persisted_block in persisted_blocks {
1483 let PersistedBlock {
1484 block_offset,
1485 fork_offset,
1486 write_location,
1487 } = persisted_block;
1488
1489 let block = state
1490 .data
1491 .blocks
1492 .get_mut(block_offset)
1493 .expect("Still holding the same lock since last check; qed")
1494 .get_mut(fork_offset)
1495 .expect("Still holding the same lock since last check; qed");
1496
1497 replace_with_or_abort(block, |block| {
1498 if let ClientDatabaseBlock::InMemory(in_memory) = block {
1499 let (header, _body) = in_memory.block.split();
1500
1501 ClientDatabaseBlock::Persisted {
1502 header,
1503 block_details: in_memory.block_details,
1504 write_location,
1505 }
1506 } else {
1507 unreachable!("Still holding the same lock since last check; qed");
1508 }
1509 });
1510 }
1511
1512 Ok(())
1516 }
1517
1518 #[must_use]
1523 fn adjust_ancestor_block_forks(
1524 blocks: &mut VecDeque<SmallVec<[ClientDatabaseBlock<Block>; 2]>>,
1525 mut parent_block_root: BlockRoot,
1526 ) -> bool {
1527 let mut ancestor_blocks = blocks.iter_mut();
1528
1529 loop {
1530 if ancestor_blocks.len() == 1 {
1531 break;
1533 }
1534
1535 let Some(parent_blocks) = ancestor_blocks.next() else {
1536 break;
1538 };
1539
1540 let Some(fork_offset_parent_block_root) =
1541 parent_blocks
1542 .iter()
1543 .enumerate()
1544 .find_map(|(fork_offset, fork_block)| {
1545 let fork_header = fork_block.header().header();
1546 if *fork_header.root() == parent_block_root {
1547 Some((fork_offset, fork_header.prefix.parent_root))
1548 } else {
1549 None
1550 }
1551 })
1552 else {
1553 return false;
1554 };
1555
1556 let fork_offset;
1557 (fork_offset, parent_block_root) = fork_offset_parent_block_root;
1558
1559 parent_blocks.swap(0, fork_offset);
1560 }
1561
1562 true
1563 }
1564
1565 fn prune_outdated_fork_tips(
1572 best_number: BlockNumber,
1573 state: &mut StateData<Block>,
1574 options: &ClientDatabaseInnerOptions,
1575 ) {
1576 let state = &mut *state;
1577
1578 let mut candidate_forks_to_remove = Vec::with_capacity(options.max_fork_tips.get());
1581
1582 state.fork_tips.retain(|fork_tip| {
1584 if best_number - fork_tip.number > options.max_fork_tip_distance {
1585 candidate_forks_to_remove.push(*fork_tip);
1586 false
1587 } else {
1588 true
1589 }
1590 });
1591 if state.fork_tips.len() > options.max_fork_tips.get() {
1593 state
1594 .fork_tips
1595 .drain(options.max_fork_tips.get()..)
1596 .collect_into(&mut candidate_forks_to_remove);
1597 }
1598
1599 candidate_forks_to_remove
1601 .retain(|fork_tip| !Self::prune_outdated_fork(best_number, fork_tip, state));
1602 state.fork_tips.extend(candidate_forks_to_remove);
1604 }
1605
1606 #[must_use]
1609 fn prune_outdated_fork(
1610 best_number: BlockNumber,
1611 fork_tip: &ForkTip,
1612 state: &mut StateData<Block>,
1613 ) -> bool {
1614 let block_offset = u64::from(best_number - fork_tip.number) as usize;
1615
1616 let mut block_root_to_prune = fork_tip.root;
1618 let mut pruned_tip = false;
1619 for block_offset in block_offset.. {
1620 let Some(fork_blocks) = state.blocks.get_mut(block_offset) else {
1621 if !pruned_tip {
1622 error!(
1623 %best_number,
1624 ?fork_tip,
1625 block_offset,
1626 "Block offset was not present in the database, this is an implementation \
1627 bug #1"
1628 );
1629 }
1630 break;
1632 };
1633
1634 if fork_blocks.len() == 1 {
1635 if !pruned_tip {
1636 error!(
1637 %best_number,
1638 ?fork_tip,
1639 block_offset,
1640 "Block offset was not present in the database, this is an implementation \
1641 bug #2"
1642 );
1643 }
1644
1645 break;
1647 }
1648
1649 let Some((fork_offset, block)) = fork_blocks
1650 .iter()
1651 .enumerate()
1652 .skip(1)
1654 .find(|(_fork_offset, block)| {
1655 *block.header().header().root() == block_root_to_prune
1656 })
1657 else {
1658 if !pruned_tip {
1659 error!(
1660 %best_number,
1661 ?fork_tip,
1662 block_offset,
1663 "Block offset was not present in the database, this is an implementation \
1664 bug #3"
1665 );
1666 }
1667
1668 break;
1670 };
1671
1672 if block.header().ref_count() > 1 {
1674 break;
1675 }
1676
1677 match block {
1679 ClientDatabaseBlock::InMemory(_) => {
1680 }
1682 ClientDatabaseBlock::Persisted { .. }
1683 | ClientDatabaseBlock::PersistedConfirmed { .. } => {
1684 pruned_tip = true;
1686 break;
1687 }
1688 }
1689
1690 state.block_roots.get_mut(&block_root_to_prune);
1691 block_root_to_prune = block.header().header().prefix.parent_root;
1692 fork_blocks.swap_remove(fork_offset);
1693
1694 pruned_tip = true;
1695 }
1696
1697 pruned_tip
1698 }
1699
1700 fn confirm_canonical_block(
1703 best_number: BlockNumber,
1704 state_data: &mut StateData<Block>,
1705 options: &ClientDatabaseInnerOptions,
1706 ) {
1707 let block_offset = u64::from(options.confirmation_depth_k + BlockNumber::ONE) as usize;
1711
1712 let Some(fork_blocks) = state_data.blocks.get_mut(block_offset) else {
1713 return;
1715 };
1716
1717 {
1719 let Some(canonical_block) = fork_blocks.first_mut() else {
1720 error!(
1721 %best_number,
1722 block_offset,
1723 "Have not found a canonical block to confirm, this is an implementation bug"
1724 );
1725 return;
1726 };
1727
1728 replace_with_or_abort(canonical_block, |block| match block {
1729 ClientDatabaseBlock::InMemory(_) => {
1730 error!(
1731 %best_number,
1732 block_offset,
1733 header = ?block.header(),
1734 "Block to be confirmed must not be in memory, this is an implementation bug"
1735 );
1736 block
1737 }
1738 ClientDatabaseBlock::Persisted {
1739 header,
1740 block_details: _,
1741 write_location,
1742 } => ClientDatabaseBlock::PersistedConfirmed {
1743 header,
1744 write_location,
1745 },
1746 ClientDatabaseBlock::PersistedConfirmed { .. } => {
1747 error!(
1748 %best_number,
1749 block_offset,
1750 header = ?block.header(),
1751 "Block to be confirmed must not be confirmed yet, this is an \
1752 implementation bug"
1753 );
1754 block
1755 }
1756 });
1757 }
1758
1759 let mut block_roots_to_prune = fork_blocks
1761 .drain(1..)
1762 .map(|block| *block.header().header().root())
1763 .collect::<Vec<_>>();
1764 let mut current_block_offset = block_offset;
1765 while !block_roots_to_prune.is_empty() {
1766 state_data
1768 .fork_tips
1769 .retain(|fork_tip| !block_roots_to_prune.contains(&fork_tip.root));
1770
1771 for block_root in &block_roots_to_prune {
1773 state_data.block_roots.remove(block_root);
1774 }
1775
1776 if let Some(next_block_offset) = current_block_offset.checked_sub(1) {
1778 current_block_offset = next_block_offset;
1779 } else {
1780 break;
1782 }
1783
1784 let fork_blocks = state_data
1785 .blocks
1786 .get_mut(current_block_offset)
1787 .expect("Lower block offset always exists; qed");
1788
1789 block_roots_to_prune = fork_blocks
1791 .drain_filter(|block| {
1792 let header = block.header().header();
1793
1794 block_roots_to_prune.contains(&header.prefix.parent_root)
1795 })
1796 .map(|block| *block.header().header().root())
1797 .collect();
1798 }
1799 }
1800}