1#![expect(incomplete_features, reason = "generic_const_exprs")]
44#![feature(generic_const_exprs)]
47#![feature(
48 default_field_values,
49 get_mut_unchecked,
50 iter_collect_into,
51 maybe_uninit_as_bytes,
52 maybe_uninit_fill,
53 maybe_uninit_slice,
54 maybe_uninit_write_slice,
55 push_mut,
56 try_blocks
57)]
58
59mod page_group;
60pub mod storage_backend;
61mod storage_backend_adapter;
62
63use crate::page_group::block::StorageItemBlock;
64use crate::page_group::block::block::StorageItemBlockBlock;
65use crate::storage_backend::ClientDatabaseStorageBackend;
66use crate::storage_backend_adapter::{
67 StorageBackendAdapter, StorageItemHandlerArg, StorageItemHandlers, WriteLocation,
68};
69use ab_client_api::{
70 BlockDetails, BlockMerkleMountainRange, ChainInfo, ChainInfoWrite, ContractSlotState,
71 PersistBlockError,
72};
73use ab_core_primitives::block::body::owned::GenericOwnedBlockBody;
74use ab_core_primitives::block::header::GenericBlockHeader;
75use ab_core_primitives::block::header::owned::GenericOwnedBlockHeader;
76use ab_core_primitives::block::owned::GenericOwnedBlock;
77use ab_core_primitives::block::{BlockNumber, BlockRoot};
78use ab_io_type::trivial_type::TrivialType;
79use async_lock::{
80 Mutex as AsyncMutex, RwLock as AsyncRwLock, RwLockUpgradableReadGuard,
81 RwLockWriteGuard as AsyncRwLockWriteGuard,
82};
83use rand_core::OsError;
84use rclite::Arc;
85use replace_with::replace_with_or_abort;
86use smallvec::{SmallVec, smallvec};
87use std::collections::{HashMap, VecDeque};
88use std::hash::{BuildHasherDefault, Hasher};
89use std::num::{NonZeroU32, NonZeroUsize};
90use std::ops::Deref;
91use std::sync::Arc as StdArc;
92use std::{fmt, io};
93use tracing::error;
94
95#[derive(Debug, Copy, Clone, Eq, PartialEq, TrivialType)]
97#[repr(C)]
98pub struct DatabaseId([u8; 32]);
99
100impl Deref for DatabaseId {
101 type Target = [u8; 32];
102
103 #[inline(always)]
104 fn deref(&self) -> &Self::Target {
105 &self.0
106 }
107}
108
109impl AsRef<[u8]> for DatabaseId {
110 #[inline(always)]
111 fn as_ref(&self) -> &[u8] {
112 &self.0
113 }
114}
115
116impl DatabaseId {
117 #[inline(always)]
118 pub const fn new(bytes: [u8; 32]) -> Self {
119 Self(bytes)
120 }
121}
122
123#[derive(Default)]
124struct BlockRootHasher(u64);
125
126impl Hasher for BlockRootHasher {
127 #[inline(always)]
128 fn finish(&self) -> u64 {
129 self.0
130 }
131
132 #[inline(always)]
133 fn write(&mut self, bytes: &[u8]) {
134 let Some(state) = bytes.as_chunks().0.first().copied().map(u64::from_le_bytes) else {
135 return;
136 };
137
138 self.0 = state;
139 }
140}
141
142#[derive(Debug)]
143pub struct GenesisBlockBuilderResult<Block> {
144 pub block: Block,
146 pub system_contract_states: StdArc<[ContractSlotState]>,
148}
149
150#[derive(Debug, Copy, Clone)]
152pub struct ClientDatabaseOptions<GBB, StorageBackend> {
153 pub write_buffer_size: usize = 5,
161 pub confirmation_depth_k: BlockNumber,
167 pub soft_confirmation_depth: BlockNumber = BlockNumber::new(3),
181 pub max_fork_tips: NonZeroUsize = NonZeroUsize::new(3).expect("Not zero; qed"),
192 pub max_fork_tip_distance: BlockNumber = BlockNumber::new(5),
203 pub genesis_block_builder: GBB,
206 pub storage_backend: StorageBackend,
208}
209
210#[derive(Debug, Copy, Clone)]
212pub struct ClientDatabaseFormatOptions {
213 pub page_group_size: NonZeroU32,
230 pub force: bool,
234}
235
236#[derive(Debug, thiserror::Error)]
237pub enum ClientDatabaseError {
238 #[error("Invalid soft confirmation depth, it must be smaller than confirmation depth k")]
240 InvalidSoftConfirmationDepth,
241 #[error("Invalid max fork tip distance, it must be smaller or equal to confirmation depth k")]
243 InvalidMaxForkTipDistance,
244 #[error("Storage backend has canceled read request")]
246 ReadRequestCancelled,
247 #[error("Storage backend read error: {error}")]
249 ReadError {
250 error: io::Error,
252 },
253 #[error("Unsupported database version: {database_version}")]
255 UnsupportedDatabaseVersion {
256 database_version: u8,
258 },
259 #[error("Page group size is too small ({page_group_size}), must be at least two pages")]
261 PageGroupSizeTooSmall {
262 page_group_size: u32,
264 },
265 #[error(
267 "Unexpected sequence number {actual} at page offset {page_offset} (expected \
268 {expected})"
269 )]
270 UnexpectedSequenceNumber {
271 actual: u64,
273 expected: u64,
275 page_offset: u32,
277 },
278 #[error("Unexpected storage item at offset {page_offset}: {storage_item:?}")]
280 UnexpectedStorageItem {
281 storage_item: Box<dyn fmt::Debug + Send + Sync>,
283 page_offset: u32,
285 },
286 #[error("Invalid block at offset {page_offset}")]
288 InvalidBlock {
289 page_offset: u32,
291 },
292 #[error("Failed to adjust ancestor block forks")]
294 FailedToAdjustAncestorBlockForks,
295 #[error("Database is not formatted yet")]
297 Unformatted,
298 #[error("Non-permanent first page group")]
300 NonPermanentFirstPageGroup,
301}
302
303#[derive(Debug, thiserror::Error)]
305pub enum ClientDatabaseFormatError {
306 #[error("Storage backend has canceled read request")]
308 ReadRequestCancelled,
309 #[error("Storage backend read error: {error}")]
311 ReadError {
312 error: io::Error,
314 },
315 #[error("Failed to generate database id")]
317 FailedToGenerateDatabaseId {
318 #[from]
320 error: OsError,
321 },
322 #[error("Database is already formatted yet")]
324 AlreadyFormatted,
325 #[error("Storage backend has canceled a writing request")]
327 WriteRequestCancelled,
328 #[error("Storage item write error")]
330 StorageItemWriteError {
331 #[from]
333 error: io::Error,
334 },
335}
336
337#[derive(Debug, Copy, Clone)]
338struct ForkTip {
339 number: BlockNumber,
340 root: BlockRoot,
341}
342
343#[derive(Debug)]
344struct ClientDatabaseBlockInMemory<Block>
345where
346 Block: GenericOwnedBlock,
347{
348 block: Block,
349 block_details: BlockDetails,
350}
351
352#[derive(Debug)]
359enum ClientDatabaseBlock<Block>
360where
361 Block: GenericOwnedBlock,
362{
363 InMemory(ClientDatabaseBlockInMemory<Block>),
365 Persisted {
367 header: Block::Header,
368 block_details: BlockDetails,
369 write_location: WriteLocation,
370 },
371 PersistedConfirmed {
374 header: Block::Header,
375 #[expect(dead_code, reason = "Not used yet")]
376 write_location: WriteLocation,
377 },
378}
379
380impl<Block> ClientDatabaseBlock<Block>
381where
382 Block: GenericOwnedBlock,
383{
384 #[inline(always)]
385 fn header(&self) -> &Block::Header {
386 match self {
387 Self::InMemory(in_memory) => in_memory.block.header(),
388 Self::Persisted { header, .. } => header,
389 Self::PersistedConfirmed { header, .. } => header,
390 }
391 }
392
393 #[inline(always)]
394 fn block_details(&self) -> Option<&BlockDetails> {
395 match self {
396 Self::InMemory(in_memory) => Some(&in_memory.block_details),
397 Self::Persisted { block_details, .. } => Some(block_details),
398 Self::PersistedConfirmed { .. } => None,
399 }
400 }
401}
402
403#[derive(Debug)]
404struct State<Block>
405where
406 Block: GenericOwnedBlock,
407{
408 fork_tips: VecDeque<ForkTip>,
413 block_roots: HashMap<BlockRoot, BlockNumber, BuildHasherDefault<BlockRootHasher>>,
418 blocks: VecDeque<SmallVec<[ClientDatabaseBlock<Block>; 2]>>,
433}
434
435impl<Block> State<Block>
436where
437 Block: GenericOwnedBlock,
438{
439 #[inline(always)]
440 fn best_tip(&self) -> &ForkTip {
441 self.fork_tips
442 .front()
443 .expect("The best block is always present; qed")
444 }
445
446 #[inline(always)]
447 fn best_block(&self) -> &ClientDatabaseBlock<Block> {
448 self.blocks
449 .front()
450 .expect("The best block is always present; qed")
451 .first()
452 .expect("The best block is always present; qed")
453 }
454}
455
456#[derive(Debug)]
457struct BlockToPersist<'a, Block>
458where
459 Block: GenericOwnedBlock,
460{
461 block_offset: usize,
462 fork_offset: usize,
463 block: &'a ClientDatabaseBlockInMemory<Block>,
464}
465
466#[derive(Debug)]
467struct PersistedBlock {
468 block_offset: usize,
469 fork_offset: usize,
470 write_location: WriteLocation,
471}
472
473#[derive(Debug)]
474struct ClientDatabaseInnerOptions {
475 confirmation_depth_k: BlockNumber,
476 soft_confirmation_depth: BlockNumber,
477 max_fork_tips: NonZeroUsize,
478 max_fork_tip_distance: BlockNumber,
479}
480
481#[derive(Debug)]
482struct Inner<Block, StorageBackend>
483where
484 Block: GenericOwnedBlock,
485{
486 state: AsyncRwLock<State<Block>>,
487 storage_backend_adapter: AsyncMutex<StorageBackendAdapter>,
488 storage_backend: StorageBackend,
489 options: ClientDatabaseInnerOptions,
490}
491
492#[derive(Debug)]
494pub struct ClientDatabase<Block, StorageBackend>
495where
496 Block: GenericOwnedBlock,
497{
498 inner: Arc<Inner<Block, StorageBackend>>,
499}
500
501impl<Block, StorageBackend> Clone for ClientDatabase<Block, StorageBackend>
502where
503 Block: GenericOwnedBlock,
504{
505 fn clone(&self) -> Self {
506 Self {
507 inner: self.inner.clone(),
508 }
509 }
510}
511
512impl<Block, StorageBackend> Drop for ClientDatabase<Block, StorageBackend>
513where
514 Block: GenericOwnedBlock,
515{
516 fn drop(&mut self) {
517 }
519}
520
521impl<Block, StorageBackend> ChainInfo<Block> for ClientDatabase<Block, StorageBackend>
522where
523 Block: GenericOwnedBlock,
524 StorageBackend: ClientDatabaseStorageBackend,
525{
526 fn best_root(&self) -> BlockRoot {
527 self.inner.state.read_blocking().best_tip().root
530 }
531
532 fn best_header(&self) -> Block::Header {
533 self.inner
536 .state
537 .read_blocking()
538 .best_block()
539 .header()
540 .clone()
541 }
542
543 fn best_header_with_details(&self) -> (Block::Header, BlockDetails) {
544 let state = self.inner.state.read_blocking();
547 let best_block = state.best_block();
548 (
549 best_block.header().clone(),
550 best_block
551 .block_details()
552 .expect("Always present for the best block; qed")
553 .clone(),
554 )
555 }
556
557 fn ancestor_header(
558 &self,
559 ancestor_block_number: BlockNumber,
560 descendant_block_root: &BlockRoot,
561 ) -> Option<Block::Header> {
562 let state = self.inner.state.read_blocking();
565 let best_number = state.best_tip().number;
566
567 let ancestor_block_offset =
568 best_number.checked_sub(ancestor_block_number)?.as_u64() as usize;
569 let ancestor_block_candidates = state.blocks.get(ancestor_block_offset)?;
570
571 let descendant_block_number = *state.block_roots.get(descendant_block_root)?;
572 if ancestor_block_number >= descendant_block_number {
573 return None;
574 }
575 let descendant_block_offset =
576 best_number.checked_sub(descendant_block_number)?.as_u64() as usize;
577
578 let mut blocks_range_iter = state
580 .blocks
581 .iter()
582 .enumerate()
583 .skip(descendant_block_offset);
584
585 let (_offset, descendant_block_candidates) = blocks_range_iter.next()?;
586 let descendant_header = descendant_block_candidates
587 .iter()
588 .find(|block| &*block.header().header().root() == descendant_block_root)?
589 .header()
590 .header();
591
592 if descendant_block_candidates.len() == 1 || ancestor_block_candidates.len() == 1 {
597 return ancestor_block_candidates
598 .iter()
599 .next()
600 .map(|block| block.header().clone());
601 }
602
603 let mut parent_block_root = &descendant_header.prefix.parent_root;
604
605 for (block_offset, parent_candidates) in blocks_range_iter {
607 let parent_header = parent_candidates
608 .iter()
609 .find(|header| &*header.header().header().root() == parent_block_root)?
610 .header();
611
612 if block_offset == ancestor_block_offset {
614 return Some(parent_header.clone());
615 }
616
617 parent_block_root = &parent_header.header().prefix.parent_root;
618 }
619
620 None
621 }
622
623 fn header(&self, block_root: &BlockRoot) -> Option<Block::Header> {
624 let state = self.inner.state.read_blocking();
627 let best_number = state.best_tip().number;
628
629 let block_number = *state.block_roots.get(block_root)?;
630 let block_offset = best_number.checked_sub(block_number)?.as_u64() as usize;
631 let block_candidates = state.blocks.get(block_offset)?;
632
633 block_candidates.iter().find_map(|block| {
634 let header = block.header();
635
636 if &*header.header().root() == block_root {
637 Some(header.clone())
638 } else {
639 None
640 }
641 })
642 }
643
644 fn header_with_details(&self, block_root: &BlockRoot) -> Option<(Block::Header, BlockDetails)> {
645 let state = self.inner.state.read_blocking();
648 let best_number = state.best_tip().number;
649
650 let block_number = *state.block_roots.get(block_root)?;
651 let block_offset = best_number.checked_sub(block_number)?.as_u64() as usize;
652 let block_candidates = state.blocks.get(block_offset)?;
653
654 block_candidates.iter().find_map(|block| {
655 let header = block.header();
656 let block_details = block.block_details().cloned()?;
657
658 if &*header.header().root() == block_root {
659 Some((header.clone(), block_details))
660 } else {
661 None
662 }
663 })
664 }
665}
666
667impl<Block, StorageBackend> ChainInfoWrite<Block> for ClientDatabase<Block, StorageBackend>
668where
669 Block: GenericOwnedBlock,
670 StorageBackend: ClientDatabaseStorageBackend,
671{
672 async fn persist_block(
673 &self,
674 block: Block,
675 block_details: BlockDetails,
676 ) -> Result<(), PersistBlockError> {
677 let mut state = self.inner.state.write().await;
678 let best_number = state.best_tip().number;
679
680 let header = block.header().header();
681
682 let block_number = header.prefix.number;
683
684 if best_number == BlockNumber::ZERO && block_number != BlockNumber::ONE {
685 Self::insert_first_block(&mut state, block, block_details);
687
688 return Ok(());
689 }
690
691 if block_number == best_number + BlockNumber::ONE {
692 return Self::insert_new_best_block(state, &self.inner, block, block_details).await;
693 }
694
695 let block_offset = best_number
696 .checked_sub(block_number)
697 .ok_or(PersistBlockError::MissingParent)?
698 .as_u64() as usize;
699
700 if block_offset >= self.inner.options.confirmation_depth_k.as_u64() as usize {
701 return Err(PersistBlockError::OutsideAcceptableRange);
702 }
703
704 let state = &mut *state;
705
706 let block_forks = state.blocks.get_mut(block_offset).ok_or_else(|| {
707 error!(
708 %block_number,
709 %block_offset,
710 "Failed to store block fork, header offset is missing despite being within \
711 acceptable range"
712 );
713
714 PersistBlockError::OutsideAcceptableRange
715 })?;
716
717 for (index, fork_tip) in state.fork_tips.iter_mut().enumerate() {
718 if fork_tip.root == header.prefix.parent_root {
720 state.fork_tips.remove(index);
721 break;
722 }
723 }
724
725 let block_root = *header.root();
726 state.fork_tips.insert(
729 1,
730 ForkTip {
731 number: block_number,
732 root: block_root,
733 },
734 );
735 state.block_roots.insert(block_root, block_number);
736 block_forks.push(ClientDatabaseBlock::InMemory(ClientDatabaseBlockInMemory {
737 block,
738 block_details,
739 }));
740
741 Self::prune_outdated_fork_tips(block_number, state, &self.inner.options);
742
743 Ok(())
744 }
745}
746
747impl<Block, StorageBackend> ClientDatabase<Block, StorageBackend>
748where
749 Block: GenericOwnedBlock,
750 StorageBackend: ClientDatabaseStorageBackend,
751{
752 pub async fn open<GBB>(
756 options: ClientDatabaseOptions<GBB, StorageBackend>,
757 ) -> Result<Self, ClientDatabaseError>
758 where
759 GBB: FnOnce() -> GenesisBlockBuilderResult<Block>,
760 {
761 let ClientDatabaseOptions {
762 write_buffer_size,
763 confirmation_depth_k,
764 soft_confirmation_depth,
765 max_fork_tips,
766 max_fork_tip_distance,
767 genesis_block_builder,
768 storage_backend,
769 } = options;
770 if soft_confirmation_depth >= confirmation_depth_k {
771 return Err(ClientDatabaseError::InvalidSoftConfirmationDepth);
772 }
773
774 if max_fork_tip_distance > confirmation_depth_k {
775 return Err(ClientDatabaseError::InvalidMaxForkTipDistance);
776 }
777
778 let mut state = State {
779 fork_tips: VecDeque::new(),
780 block_roots: HashMap::default(),
781 blocks: VecDeque::new(),
782 };
783
784 let options = ClientDatabaseInnerOptions {
785 confirmation_depth_k,
786 soft_confirmation_depth,
787 max_fork_tips,
788 max_fork_tip_distance,
789 };
790
791 let storage_item_handlers = StorageItemHandlers {
792 permanent: |_arg| {
793 Ok(())
795 },
796 block: |arg| {
797 let StorageItemHandlerArg {
798 storage_item,
799 page_offset,
800 } = arg;
801 #[expect(
802 clippy::infallible_destructuring_match,
803 reason = "Only a single variant for now"
804 )]
805 let storage_item_block = match storage_item {
806 StorageItemBlock::Block(storage_item_block) => storage_item_block,
807 };
808
809 let StorageItemBlockBlock {
812 header,
813 body: _,
814 mmr_with_block,
815 system_contract_states,
816 } = storage_item_block;
817
818 let header = Block::Header::from_buffer(header).map_err(|_buffer| {
819 error!(%page_offset, "Failed to decode block header from bytes");
820
821 ClientDatabaseError::InvalidBlock { page_offset }
822 })?;
823
824 let block_root = *header.header().root();
825 let block_number = header.header().prefix.number;
826
827 state.block_roots.insert(block_root, block_number);
828
829 let maybe_best_number = state
830 .blocks
831 .front()
832 .and_then(|block_forks| block_forks.first())
833 .map(|best_block| {
834 let header: &Block::Header = best_block.header();
836
837 header.header().prefix.number
838 });
839
840 let block_offset = if let Some(best_number) = maybe_best_number {
841 if block_number <= best_number {
842 (best_number - block_number).as_u64() as usize
843 } else {
844 if block_number - best_number != BlockNumber::ONE {
846 error!(
847 %page_offset,
848 %best_number,
849 %block_number,
850 "Invalid new best block number, it must be only one block \
851 higher than the best block"
852 );
853
854 return Err(ClientDatabaseError::InvalidBlock { page_offset });
855 }
856
857 state.blocks.push_front(SmallVec::new());
858 0
860 }
861 } else {
862 state.blocks.push_front(SmallVec::new());
863 0
865 };
866
867 let block_forks = match state.blocks.get_mut(block_offset) {
868 Some(block_forks) => block_forks,
869 None => {
870 return Ok(());
874 }
875 };
876
877 block_forks.push(ClientDatabaseBlock::Persisted {
879 header,
880 block_details: BlockDetails {
881 mmr_with_block,
882 system_contract_states,
883 },
884 write_location: WriteLocation { page_offset },
885 });
886
887 if block_offset == 0 && block_forks.len() == 1 {
890 Self::confirm_canonical_block(block_number, &mut state, &options);
891 }
892
893 Ok(())
894 },
895 };
896
897 let storage_backend_adapter =
898 StorageBackendAdapter::open(write_buffer_size, storage_item_handlers, &storage_backend)
899 .await?;
900
901 if let Some(best_block) = state.blocks.front().and_then(|block_forks| {
902 block_forks.last()
905 }) {
906 let header: &Block::Header = best_block.header();
908 let header = header.header();
909 let block_number = header.prefix.number;
910 let block_root = *header.root();
911
912 if !Self::adjust_ancestor_block_forks(&mut state.blocks, block_root) {
913 return Err(ClientDatabaseError::FailedToAdjustAncestorBlockForks);
914 }
915
916 state.fork_tips.push_front(ForkTip {
918 number: block_number,
919 root: block_root,
920 });
921 } else {
922 let GenesisBlockBuilderResult {
923 block,
924 system_contract_states,
925 } = genesis_block_builder();
926
927 let header = block.header().header();
929 let block_number = header.prefix.number;
930 let block_root = *header.root();
931
932 state.fork_tips.push_front(ForkTip {
933 number: block_number,
934 root: block_root,
935 });
936 state.block_roots.insert(block_root, block_number);
937 state
938 .blocks
939 .push_front(smallvec![ClientDatabaseBlock::InMemory(
940 ClientDatabaseBlockInMemory {
941 block,
942 block_details: BlockDetails {
943 system_contract_states,
944 mmr_with_block: Arc::new({
945 let mut mmr = BlockMerkleMountainRange::new();
946 mmr.add_leaf(&block_root);
947 mmr
948 })
949 },
950 }
951 )]);
952 }
953
954 let inner = Inner {
955 state: AsyncRwLock::new(state),
956 storage_backend_adapter: AsyncMutex::new(storage_backend_adapter),
957 storage_backend,
958 options,
959 };
960
961 Ok(Self {
962 inner: Arc::new(inner),
963 })
964 }
965
966 pub async fn format(
968 storage_backend: &StorageBackend,
969 options: ClientDatabaseFormatOptions,
970 ) -> Result<(), ClientDatabaseFormatError> {
971 StorageBackendAdapter::format(storage_backend, options).await
972 }
973
974 fn insert_first_block(state: &mut State<Block>, block: Block, block_details: BlockDetails) {
975 let header = block.header().header();
977 let block_number = header.prefix.number;
978 let block_root = *header.root();
979
980 state.fork_tips.clear();
981 state.fork_tips.push_front(ForkTip {
982 number: block_number,
983 root: block_root,
984 });
985 state.block_roots.clear();
986 state.block_roots.insert(block_root, block_number);
987 state.blocks.clear();
988 state
989 .blocks
990 .push_front(smallvec![ClientDatabaseBlock::InMemory(
991 ClientDatabaseBlockInMemory {
992 block,
993 block_details,
994 }
995 )]);
996 }
997
998 async fn insert_new_best_block(
999 mut state: AsyncRwLockWriteGuard<'_, State<Block>>,
1000 inner: &Inner<Block, StorageBackend>,
1001 block: Block,
1002 block_details: BlockDetails,
1003 ) -> Result<(), PersistBlockError> {
1004 let header = block.header().header();
1005 let block_number = header.prefix.number;
1006 let block_root = *header.root();
1007 let parent_root = header.prefix.parent_root;
1008
1009 if !Self::adjust_ancestor_block_forks(&mut state.blocks, parent_root) {
1012 return Err(PersistBlockError::MissingParent);
1013 }
1014
1015 {
1017 for (index, fork_tip) in state.fork_tips.iter_mut().enumerate() {
1018 if fork_tip.root == parent_root {
1020 state.fork_tips.remove(index);
1021 break;
1022 }
1023 }
1024
1025 state.fork_tips.push_front(ForkTip {
1026 number: block_number,
1027 root: block_root,
1028 });
1029 state.block_roots.insert(block_root, block_number);
1030 state
1031 .blocks
1032 .push_front(smallvec![ClientDatabaseBlock::InMemory(
1033 ClientDatabaseBlockInMemory {
1034 block,
1035 block_details: block_details.clone()
1036 }
1037 )]);
1038 }
1039
1040 let options = &inner.options;
1041
1042 Self::confirm_canonical_block(block_number, &mut state, options);
1043 Self::prune_outdated_fork_tips(block_number, &mut state, options);
1044
1045 let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1050
1051 let mut blocks_to_persist = Vec::with_capacity(
1052 options
1053 .confirmation_depth_k
1054 .saturating_sub(options.soft_confirmation_depth)
1055 .as_u64() as usize,
1056 );
1057 for block_offset in options.soft_confirmation_depth.as_u64() as usize.. {
1058 let Some(fork_blocks) = state.blocks.get(block_offset) else {
1059 break;
1060 };
1061
1062 let len_before = blocks_to_persist.len();
1063 fork_blocks
1064 .iter()
1065 .enumerate()
1066 .filter_map(|(fork_offset, client_database_block)| {
1067 match client_database_block {
1068 ClientDatabaseBlock::InMemory(block) => Some(BlockToPersist {
1069 block_offset,
1070 fork_offset,
1071 block,
1072 }),
1073 ClientDatabaseBlock::Persisted { .. }
1074 | ClientDatabaseBlock::PersistedConfirmed { .. } => {
1075 None
1077 }
1078 }
1079 })
1080 .collect_into(&mut blocks_to_persist);
1081
1082 if blocks_to_persist.len() == len_before {
1083 break;
1084 }
1085 }
1086
1087 let mut storage_backend_adapter = inner.storage_backend_adapter.lock().await;
1088
1089 let mut persisted_blocks = Vec::with_capacity(blocks_to_persist.len());
1091 for block_to_persist in blocks_to_persist.into_iter().rev() {
1092 let BlockToPersist {
1093 block_offset,
1094 fork_offset,
1095 block,
1096 } = block_to_persist;
1097
1098 let write_location = storage_backend_adapter
1099 .write_storage_item(
1100 &inner.storage_backend,
1101 StorageItemBlock::Block(StorageItemBlockBlock {
1102 header: block.block.header().buffer().clone(),
1103 body: block.block.body().buffer().clone(),
1104 mmr_with_block: Arc::clone(&block.block_details.mmr_with_block),
1105 system_contract_states: StdArc::clone(
1106 &block.block_details.system_contract_states,
1107 ),
1108 }),
1109 )
1110 .await?;
1111
1112 persisted_blocks.push(PersistedBlock {
1113 block_offset,
1114 fork_offset,
1115 write_location,
1116 });
1117 }
1118
1119 let mut state = RwLockUpgradableReadGuard::upgrade(state).await;
1121 for persisted_block in persisted_blocks {
1122 let PersistedBlock {
1123 block_offset,
1124 fork_offset,
1125 write_location,
1126 } = persisted_block;
1127
1128 let block = state
1129 .blocks
1130 .get_mut(block_offset)
1131 .expect("Still holding the same lock since last check; qed")
1132 .get_mut(fork_offset)
1133 .expect("Still holding the same lock since last check; qed");
1134
1135 replace_with_or_abort(block, |block| {
1136 if let ClientDatabaseBlock::InMemory(in_memory) = block {
1137 let (header, _body) = in_memory.block.split();
1138
1139 ClientDatabaseBlock::Persisted {
1140 header,
1141 block_details: in_memory.block_details,
1142 write_location,
1143 }
1144 } else {
1145 unreachable!("Still holding the same lock since last check; qed");
1146 }
1147 });
1148 }
1149
1150 Ok(())
1154 }
1155
1156 #[must_use]
1161 fn adjust_ancestor_block_forks(
1162 blocks: &mut VecDeque<SmallVec<[ClientDatabaseBlock<Block>; 2]>>,
1163 mut parent_block_root: BlockRoot,
1164 ) -> bool {
1165 let mut ancestor_blocks = blocks.iter_mut();
1166
1167 loop {
1168 if ancestor_blocks.len() == 1 {
1169 break;
1171 }
1172
1173 let Some(parent_blocks) = ancestor_blocks.next() else {
1174 break;
1176 };
1177
1178 let Some(fork_offset_parent_block_root) =
1179 parent_blocks
1180 .iter()
1181 .enumerate()
1182 .find_map(|(fork_offset, fork_block)| {
1183 let fork_header = fork_block.header().header();
1184 if *fork_header.root() == parent_block_root {
1185 Some((fork_offset, fork_header.prefix.parent_root))
1186 } else {
1187 None
1188 }
1189 })
1190 else {
1191 return false;
1192 };
1193
1194 let fork_offset;
1195 (fork_offset, parent_block_root) = fork_offset_parent_block_root;
1196
1197 parent_blocks.swap(0, fork_offset);
1198 }
1199
1200 true
1201 }
1202
1203 fn prune_outdated_fork_tips(
1210 best_number: BlockNumber,
1211 state: &mut State<Block>,
1212 options: &ClientDatabaseInnerOptions,
1213 ) {
1214 let state = &mut *state;
1215
1216 let mut candidate_forks_to_remove = Vec::with_capacity(options.max_fork_tips.get());
1219
1220 state.fork_tips.retain(|fork_tip| {
1222 if best_number - fork_tip.number > options.max_fork_tip_distance {
1223 candidate_forks_to_remove.push(*fork_tip);
1224 false
1225 } else {
1226 true
1227 }
1228 });
1229 if state.fork_tips.len() > options.max_fork_tips.get() {
1231 state
1232 .fork_tips
1233 .drain(options.max_fork_tips.get()..)
1234 .collect_into(&mut candidate_forks_to_remove);
1235 }
1236
1237 candidate_forks_to_remove
1239 .retain(|fork_tip| !Self::prune_outdated_fork(best_number, fork_tip, state));
1240 state.fork_tips.extend(candidate_forks_to_remove);
1242 }
1243
1244 #[must_use]
1247 fn prune_outdated_fork(
1248 best_number: BlockNumber,
1249 fork_tip: &ForkTip,
1250 state: &mut State<Block>,
1251 ) -> bool {
1252 let block_offset = (best_number - fork_tip.number).as_u64() as usize;
1253
1254 let mut block_root_to_prune = fork_tip.root;
1256 let mut pruned_tip = false;
1257 for block_offset in block_offset.. {
1258 let Some(fork_blocks) = state.blocks.get_mut(block_offset) else {
1259 if !pruned_tip {
1260 error!(
1261 %best_number,
1262 ?fork_tip,
1263 block_offset,
1264 "Block offset was not present in the database, this is an implementation \
1265 bug #1"
1266 );
1267 }
1268 break;
1270 };
1271
1272 if fork_blocks.len() == 1 {
1273 if !pruned_tip {
1274 error!(
1275 %best_number,
1276 ?fork_tip,
1277 block_offset,
1278 "Block offset was not present in the database, this is an implementation \
1279 bug #2"
1280 );
1281 }
1282
1283 break;
1285 }
1286
1287 let Some((fork_offset, block)) = fork_blocks
1288 .iter()
1289 .enumerate()
1290 .skip(1)
1292 .find(|(_fork_offset, block)| {
1293 *block.header().header().root() == block_root_to_prune
1294 })
1295 else {
1296 if !pruned_tip {
1297 error!(
1298 %best_number,
1299 ?fork_tip,
1300 block_offset,
1301 "Block offset was not present in the database, this is an implementation \
1302 bug #3"
1303 );
1304 }
1305
1306 break;
1308 };
1309
1310 if block.header().ref_count() > 1 {
1312 break;
1313 }
1314
1315 match block {
1317 ClientDatabaseBlock::InMemory(_) => {
1318 }
1320 ClientDatabaseBlock::Persisted { .. }
1321 | ClientDatabaseBlock::PersistedConfirmed { .. } => {
1322 pruned_tip = true;
1324 break;
1325 }
1326 }
1327
1328 state.block_roots.get_mut(&block_root_to_prune);
1329 block_root_to_prune = block.header().header().prefix.parent_root;
1330 fork_blocks.swap_remove(fork_offset);
1331
1332 pruned_tip = true;
1333 }
1334
1335 pruned_tip
1336 }
1337
1338 fn confirm_canonical_block(
1341 best_number: BlockNumber,
1342 state: &mut State<Block>,
1343 options: &ClientDatabaseInnerOptions,
1344 ) {
1345 let Some(block_offset) =
1349 best_number.checked_sub(options.confirmation_depth_k + BlockNumber::ONE)
1350 else {
1351 return;
1353 };
1354 let block_offset = block_offset.as_u64() as usize;
1355
1356 let Some(fork_blocks) = state.blocks.get_mut(block_offset) else {
1357 error!(
1358 %best_number,
1359 block_offset,
1360 "Have not found fork blocks to confirm, this is an implementation bug"
1361 );
1362 return;
1363 };
1364
1365 {
1367 let Some(canonical_block) = fork_blocks.first_mut() else {
1368 error!(
1369 %best_number,
1370 block_offset,
1371 "Have not found a canonical block to confirm, this is an implementation bug"
1372 );
1373 return;
1374 };
1375
1376 replace_with_or_abort(canonical_block, |block| match block {
1377 ClientDatabaseBlock::InMemory(_) => {
1378 error!(
1379 %best_number,
1380 block_offset,
1381 header = ?block.header(),
1382 "Block to be confirmed must not be in memory, this is an implementation bug"
1383 );
1384 block
1385 }
1386 ClientDatabaseBlock::Persisted {
1387 header,
1388 block_details: _,
1389 write_location,
1390 } => ClientDatabaseBlock::PersistedConfirmed {
1391 header,
1392 write_location,
1393 },
1394 ClientDatabaseBlock::PersistedConfirmed { .. } => {
1395 error!(
1396 %best_number,
1397 block_offset,
1398 header = ?block.header(),
1399 "Block to be confirmed must not be confirmed yet, this is an \
1400 implementation bug"
1401 );
1402 block
1403 }
1404 });
1405 }
1406
1407 let mut block_roots_to_prune = fork_blocks
1409 .drain(1..)
1410 .map(|block| *block.header().header().root())
1411 .collect::<Vec<_>>();
1412 let mut current_block_offset = block_offset;
1413 while !block_roots_to_prune.is_empty() {
1414 state
1416 .fork_tips
1417 .retain(|fork_tip| !block_roots_to_prune.contains(&fork_tip.root));
1418
1419 for block_root in &block_roots_to_prune {
1421 state.block_roots.remove(block_root);
1422 }
1423
1424 if let Some(next_block_offset) = current_block_offset.checked_sub(1) {
1426 current_block_offset = next_block_offset;
1427 } else {
1428 break;
1430 }
1431
1432 let fork_blocks = state
1433 .blocks
1434 .get_mut(current_block_offset)
1435 .expect("Lower block offset always exists; qed");
1436
1437 block_roots_to_prune = fork_blocks
1439 .drain_filter(|block| {
1440 let header = block.header().header();
1441
1442 block_roots_to_prune.contains(&header.prefix.parent_root)
1443 })
1444 .map(|block| *block.header().header().root())
1445 .collect();
1446 }
1447 }
1448}