1#![expect(incomplete_features, reason = "generic_const_exprs")]
44#![feature(generic_const_exprs)]
47#![feature(
48 const_block_items,
49 default_field_values,
50 get_mut_unchecked,
51 iter_collect_into,
52 maybe_uninit_as_bytes,
53 maybe_uninit_fill,
54 try_blocks
55)]
56
57mod page_group;
58pub mod storage_backend;
59mod storage_backend_adapter;
60
61use crate::page_group::block::StorageItemBlock;
62use crate::page_group::block::block::StorageItemBlockBlock;
63use crate::page_group::block::segment_headers::StorageItemBlockSegmentHeaders;
64use crate::storage_backend::ClientDatabaseStorageBackend;
65use crate::storage_backend_adapter::{
66 StorageBackendAdapter, StorageItemHandlerArg, StorageItemHandlers, WriteLocation,
67};
68use ab_client_api::{
69 BlockDetails, BlockMerkleMountainRange, ChainInfo, ChainInfoWrite, ContractSlotState,
70 PersistBlockError, PersistSegmentHeadersError, ReadBlockError,
71};
72use ab_core_primitives::block::body::owned::GenericOwnedBlockBody;
73use ab_core_primitives::block::header::GenericBlockHeader;
74use ab_core_primitives::block::header::owned::GenericOwnedBlockHeader;
75use ab_core_primitives::block::owned::GenericOwnedBlock;
76use ab_core_primitives::block::{BlockNumber, BlockRoot, GenericBlock};
77use ab_core_primitives::segments::{LocalSegmentIndex, SegmentHeader};
78use ab_core_primitives::shard::RealShardKind;
79use ab_io_type::trivial_type::TrivialType;
80use async_lock::{
81 RwLock as AsyncRwLock, RwLockUpgradableReadGuard, RwLockWriteGuard as AsyncRwLockWriteGuard,
82};
83use rand::rngs::SysError;
84use rclite::Arc;
85use replace_with::replace_with_or_abort;
86use smallvec::{SmallVec, smallvec};
87use std::collections::{HashMap, VecDeque};
88use std::hash::{BuildHasherDefault, Hasher};
89use std::num::{NonZeroU32, NonZeroUsize};
90use std::ops::Deref;
91use std::sync::Arc as StdArc;
92use std::{fmt, io};
93use tracing::error;
94
95#[derive(Debug, Copy, Clone, Eq, PartialEq, TrivialType)]
97#[repr(C)]
98pub struct DatabaseId([u8; 32]);
99
100impl Deref for DatabaseId {
101 type Target = [u8; 32];
102
103 #[inline(always)]
104 fn deref(&self) -> &Self::Target {
105 &self.0
106 }
107}
108
109impl AsRef<[u8]> for DatabaseId {
110 #[inline(always)]
111 fn as_ref(&self) -> &[u8] {
112 &self.0
113 }
114}
115
116impl DatabaseId {
117 #[inline(always)]
118 pub const fn new(bytes: [u8; 32]) -> Self {
119 Self(bytes)
120 }
121}
122
123#[derive(Default)]
124struct BlockRootHasher(u64);
125
126impl Hasher for BlockRootHasher {
127 #[inline(always)]
128 fn finish(&self) -> u64 {
129 self.0
130 }
131
132 #[inline(always)]
133 fn write(&mut self, bytes: &[u8]) {
134 let Some(state) = bytes.as_chunks().0.first().copied().map(u64::from_le_bytes) else {
135 return;
136 };
137
138 self.0 = state;
139 }
140}
141
142#[derive(Debug)]
143pub struct GenesisBlockBuilderResult<Block> {
144 pub block: Block,
146 pub system_contract_states: StdArc<[ContractSlotState]>,
148}
149
150#[derive(Debug, Copy, Clone)]
152pub struct ClientDatabaseOptions<GBB, StorageBackend> {
153 pub write_buffer_size: usize = 5,
161 pub confirmation_depth_k: BlockNumber,
167 pub soft_confirmation_depth: BlockNumber = BlockNumber::new(3),
181 pub max_fork_tips: NonZeroUsize = NonZeroUsize::new(3).expect("Not zero; qed"),
192 pub max_fork_tip_distance: BlockNumber = BlockNumber::new(5),
203 pub genesis_block_builder: GBB,
206 pub storage_backend: StorageBackend,
208}
209
210#[derive(Debug, Copy, Clone)]
212pub struct ClientDatabaseFormatOptions {
213 pub page_group_size: NonZeroU32,
230 pub force: bool,
234}
235
236#[derive(Debug, thiserror::Error)]
237pub enum ClientDatabaseError {
238 #[error("Invalid soft confirmation depth, it must be smaller than confirmation depth k")]
240 InvalidSoftConfirmationDepth,
241 #[error("Invalid max fork tip distance, it must be smaller or equal to confirmation depth k")]
243 InvalidMaxForkTipDistance,
244 #[error("Storage backend has canceled read request")]
246 ReadRequestCancelled,
247 #[error("Storage backend read error: {error}")]
249 ReadError {
250 error: io::Error,
252 },
253 #[error("Unsupported database version: {database_version}")]
255 UnsupportedDatabaseVersion {
256 database_version: u8,
258 },
259 #[error("Page group size is too small ({page_group_size}), must be at least two pages")]
261 PageGroupSizeTooSmall {
262 page_group_size: u32,
264 },
265 #[error(
267 "Unexpected sequence number {actual} at page offset {page_offset} (expected \
268 {expected})"
269 )]
270 UnexpectedSequenceNumber {
271 actual: u64,
273 expected: u64,
275 page_offset: u32,
277 },
278 #[error("Unexpected storage item at offset {page_offset}: {storage_item:?}")]
280 UnexpectedStorageItem {
281 storage_item: Box<dyn fmt::Debug + Send + Sync>,
283 page_offset: u32,
285 },
286 #[error("Invalid block at offset {page_offset}")]
288 InvalidBlock {
289 page_offset: u32,
291 },
292 #[error("Invalid segment headers at offset {page_offset}")]
294 InvalidSegmentHeaders {
295 page_offset: u32,
297 },
298 #[error("Failed to adjust ancestor block forks")]
300 FailedToAdjustAncestorBlockForks,
301 #[error("Database is not formatted yet")]
303 Unformatted,
304 #[error("Non-permanent first page group")]
306 NonPermanentFirstPageGroup,
307}
308
309#[derive(Debug, thiserror::Error)]
311pub enum ClientDatabaseFormatError {
312 #[error("Storage backend has canceled read request")]
314 ReadRequestCancelled,
315 #[error("Storage backend read error: {error}")]
317 ReadError {
318 error: io::Error,
320 },
321 #[error("Failed to generate database id")]
323 FailedToGenerateDatabaseId {
324 #[from]
326 error: SysError,
327 },
328 #[error("Database is already formatted yet")]
330 AlreadyFormatted,
331 #[error("Storage backend has canceled a writing request")]
333 WriteRequestCancelled,
334 #[error("Storage item write error")]
336 StorageItemWriteError {
337 #[from]
339 error: io::Error,
340 },
341}
342
343#[derive(Debug, Copy, Clone)]
344struct ForkTip {
345 number: BlockNumber,
346 root: BlockRoot,
347}
348
349#[derive(Debug)]
350struct ClientDatabaseBlockInMemory<Block>
351where
352 Block: GenericOwnedBlock,
353{
354 block: Block,
355 block_details: BlockDetails,
356}
357
358enum FullBlock<'a, Block>
359where
360 Block: GenericOwnedBlock,
361{
362 InMemory(&'a Block),
363 Persisted {
364 header: &'a Block::Header,
365 write_location: WriteLocation,
366 },
367}
368
369#[derive(Debug)]
376enum ClientDatabaseBlock<Block>
377where
378 Block: GenericOwnedBlock,
379{
380 InMemory(ClientDatabaseBlockInMemory<Block>),
382 Persisted {
384 header: Block::Header,
385 block_details: BlockDetails,
386 write_location: WriteLocation,
387 },
388 PersistedConfirmed {
391 header: Block::Header,
392 write_location: WriteLocation,
393 },
394}
395
396impl<Block> ClientDatabaseBlock<Block>
397where
398 Block: GenericOwnedBlock,
399{
400 #[inline(always)]
401 fn header(&self) -> &Block::Header {
402 match self {
403 Self::InMemory(in_memory) => in_memory.block.header(),
404 Self::Persisted { header, .. } => header,
405 Self::PersistedConfirmed { header, .. } => header,
406 }
407 }
408
409 #[inline(always)]
410 fn full_block(&self) -> FullBlock<'_, Block> {
411 match self {
412 Self::InMemory(in_memory) => FullBlock::InMemory(&in_memory.block),
413 Self::Persisted {
414 header,
415 write_location,
416 ..
417 }
418 | Self::PersistedConfirmed {
419 header,
420 write_location,
421 } => FullBlock::Persisted {
422 header,
423 write_location: *write_location,
424 },
425 }
426 }
427
428 #[inline(always)]
429 fn block_details(&self) -> Option<&BlockDetails> {
430 match self {
431 Self::InMemory(in_memory) => Some(&in_memory.block_details),
432 Self::Persisted { block_details, .. } => Some(block_details),
433 Self::PersistedConfirmed { .. } => None,
434 }
435 }
436}
437
438#[derive(Debug)]
439struct StateData<Block>
440where
441 Block: GenericOwnedBlock,
442{
443 fork_tips: VecDeque<ForkTip>,
448 block_roots: HashMap<BlockRoot, BlockNumber, BuildHasherDefault<BlockRootHasher>>,
453 blocks: VecDeque<SmallVec<[ClientDatabaseBlock<Block>; 2]>>,
468}
469
470#[derive(Debug)]
471struct SegmentHeadersCache {
472 segment_headers_cache: Vec<SegmentHeader>,
473}
474
475impl SegmentHeadersCache {
476 #[inline(always)]
477 fn last_segment_header(&self) -> Option<SegmentHeader> {
478 self.segment_headers_cache.last().cloned()
479 }
480
481 #[inline(always)]
482 fn max_local_segment_index(&self) -> Option<LocalSegmentIndex> {
483 self.segment_headers_cache
484 .last()
485 .map(|segment_header| segment_header.segment_index.as_inner())
486 }
487
488 #[inline(always)]
489 fn get_segment_header(&self, local_segment_index: LocalSegmentIndex) -> Option<SegmentHeader> {
490 self.segment_headers_cache
491 .get(u64::from(local_segment_index) as usize)
492 .copied()
493 }
494
495 fn add_segment_headers(
497 &mut self,
498 mut segment_headers: Vec<SegmentHeader>,
499 ) -> Result<Vec<SegmentHeader>, PersistSegmentHeadersError> {
500 self.segment_headers_cache.reserve(segment_headers.len());
501
502 let mut maybe_last_local_segment_index = self.max_local_segment_index();
503
504 if let Some(last_segment_index) = maybe_last_local_segment_index {
505 segment_headers.retain(|segment_header| {
507 segment_header.segment_index.as_inner() > last_segment_index
508 });
509 }
510
511 for segment_header in segment_headers.iter().copied() {
514 let local_segment_index = segment_header.local_segment_index();
515 match maybe_last_local_segment_index {
516 Some(last_local_segment_index) => {
517 if local_segment_index != last_local_segment_index + LocalSegmentIndex::ONE {
518 return Err(PersistSegmentHeadersError::MustFollowLastSegmentIndex {
519 local_segment_index,
520 last_local_segment_index,
521 });
522 }
523
524 self.segment_headers_cache.push(segment_header);
525 maybe_last_local_segment_index.replace(local_segment_index);
526 }
527 None => {
528 if local_segment_index != LocalSegmentIndex::ZERO {
529 return Err(PersistSegmentHeadersError::FirstSegmentIndexZero {
530 local_segment_index,
531 });
532 }
533
534 self.segment_headers_cache.push(segment_header);
535 maybe_last_local_segment_index.replace(local_segment_index);
536 }
537 }
538 }
539
540 Ok(segment_headers)
541 }
542}
543
544#[derive(Debug)]
546struct State<Block, StorageBackend>
547where
548 Block: GenericOwnedBlock,
549{
550 data: StateData<Block>,
551 segment_headers_cache: SegmentHeadersCache,
552 storage_backend_adapter: AsyncRwLock<StorageBackendAdapter<StorageBackend>>,
553}
554
555impl<Block, StorageBackend> State<Block, StorageBackend>
556where
557 Block: GenericOwnedBlock,
558{
559 #[inline(always)]
560 fn best_tip(&self) -> &ForkTip {
561 self.data
562 .fork_tips
563 .front()
564 .expect("The best block is always present; qed")
565 }
566
567 #[inline(always)]
568 fn best_block(&self) -> &ClientDatabaseBlock<Block> {
569 self.data
570 .blocks
571 .front()
572 .expect("The best block is always present; qed")
573 .first()
574 .expect("The best block is always present; qed")
575 }
576}
577
578#[derive(Debug)]
579struct BlockToPersist<'a, Block>
580where
581 Block: GenericOwnedBlock,
582{
583 block_offset: usize,
584 fork_offset: usize,
585 block: &'a ClientDatabaseBlockInMemory<Block>,
586}
587
588#[derive(Debug)]
589struct PersistedBlock {
590 block_offset: usize,
591 fork_offset: usize,
592 write_location: WriteLocation,
593}
594
595#[derive(Debug)]
596struct ClientDatabaseInnerOptions {
597 confirmation_depth_k: BlockNumber,
598 soft_confirmation_depth: BlockNumber,
599 max_fork_tips: NonZeroUsize,
600 max_fork_tip_distance: BlockNumber,
601}
602
603#[derive(Debug)]
604struct Inner<Block, StorageBackend>
605where
606 Block: GenericOwnedBlock,
607{
608 state: AsyncRwLock<State<Block, StorageBackend>>,
609 options: ClientDatabaseInnerOptions,
610}
611
612#[derive(Debug)]
614pub struct ClientDatabase<Block, StorageBackend>
615where
616 Block: GenericOwnedBlock,
617{
618 inner: Arc<Inner<Block, StorageBackend>>,
619}
620
621impl<Block, StorageBackend> Clone for ClientDatabase<Block, StorageBackend>
622where
623 Block: GenericOwnedBlock,
624{
625 fn clone(&self) -> Self {
626 Self {
627 inner: self.inner.clone(),
628 }
629 }
630}
631
632impl<Block, StorageBackend> Drop for ClientDatabase<Block, StorageBackend>
633where
634 Block: GenericOwnedBlock,
635{
636 fn drop(&mut self) {
637 }
639}
640
641impl<Block, StorageBackend> ChainInfo<Block> for ClientDatabase<Block, StorageBackend>
642where
643 Block: GenericOwnedBlock,
644 StorageBackend: ClientDatabaseStorageBackend,
645{
646 #[inline]
647 fn best_root(&self) -> BlockRoot {
648 self.inner.state.read_blocking().best_tip().root
651 }
652
653 #[inline]
654 fn best_header(&self) -> Block::Header {
655 self.inner
658 .state
659 .read_blocking()
660 .best_block()
661 .header()
662 .clone()
663 }
664
665 #[inline]
666 fn best_header_with_details(&self) -> (Block::Header, BlockDetails) {
667 let state = self.inner.state.read_blocking();
670 let best_block = state.best_block();
671 (
672 best_block.header().clone(),
673 best_block
674 .block_details()
675 .expect("Always present for the best block; qed")
676 .clone(),
677 )
678 }
679
680 #[inline]
682 fn ancestor_header(
683 &self,
684 ancestor_block_number: BlockNumber,
685 descendant_block_root: &BlockRoot,
686 ) -> Option<Block::Header> {
687 let state = self.inner.state.read_blocking();
690 let best_number = state.best_tip().number;
691
692 let ancestor_block_offset =
693 best_number.checked_sub(ancestor_block_number)?.as_u64() as usize;
694 let ancestor_block_candidates = state.data.blocks.get(ancestor_block_offset)?;
695
696 let descendant_block_number = *state.data.block_roots.get(descendant_block_root)?;
697 if ancestor_block_number > descendant_block_number {
698 return None;
699 }
700 let descendant_block_offset =
701 best_number.checked_sub(descendant_block_number)?.as_u64() as usize;
702
703 let mut blocks_range_iter = state
705 .data
706 .blocks
707 .iter()
708 .enumerate()
709 .skip(descendant_block_offset);
710
711 let (_offset, descendant_block_candidates) = blocks_range_iter.next()?;
712 let descendant_header = descendant_block_candidates
713 .iter()
714 .find(|block| &*block.header().header().root() == descendant_block_root)?
715 .header()
716 .header();
717
718 if descendant_block_candidates.len() == 1 || ancestor_block_candidates.len() == 1 {
723 return ancestor_block_candidates
724 .iter()
725 .next()
726 .map(|block| block.header().clone());
727 }
728
729 let mut parent_block_root = &descendant_header.prefix.parent_root;
730
731 for (block_offset, parent_candidates) in blocks_range_iter {
733 let parent_header = parent_candidates
734 .iter()
735 .find(|header| &*header.header().header().root() == parent_block_root)?
736 .header();
737
738 if block_offset == ancestor_block_offset {
740 return Some(parent_header.clone());
741 }
742
743 parent_block_root = &parent_header.header().prefix.parent_root;
744 }
745
746 None
747 }
748
749 #[inline]
750 fn header(&self, block_root: &BlockRoot) -> Option<Block::Header> {
751 let state = self.inner.state.read_blocking();
754 let best_number = state.best_tip().number;
755
756 let block_number = *state.data.block_roots.get(block_root)?;
757 let block_offset = best_number.checked_sub(block_number)?.as_u64() as usize;
758 let block_candidates = state.data.blocks.get(block_offset)?;
759
760 block_candidates.iter().find_map(|block| {
761 let header = block.header();
762
763 if &*header.header().root() == block_root {
764 Some(header.clone())
765 } else {
766 None
767 }
768 })
769 }
770
771 #[inline]
772 fn header_with_details(&self, block_root: &BlockRoot) -> Option<(Block::Header, BlockDetails)> {
773 let state = self.inner.state.read_blocking();
776 let best_number = state.best_tip().number;
777
778 let block_number = *state.data.block_roots.get(block_root)?;
779 let block_offset = best_number.checked_sub(block_number)?.as_u64() as usize;
780 let block_candidates = state.data.blocks.get(block_offset)?;
781
782 block_candidates.iter().find_map(|block| {
783 let header = block.header();
784 let block_details = block.block_details().cloned()?;
785
786 if &*header.header().root() == block_root {
787 Some((header.clone(), block_details))
788 } else {
789 None
790 }
791 })
792 }
793
794 #[inline]
795 async fn block(&self, block_root: &BlockRoot) -> Result<Block, ReadBlockError> {
796 let state = self.inner.state.read().await;
797 let best_number = state.best_tip().number;
798
799 let block_number = *state
800 .data
801 .block_roots
802 .get(block_root)
803 .ok_or(ReadBlockError::UnknownBlockRoot)?;
804 let block_offset = best_number
805 .checked_sub(block_number)
806 .expect("Known block roots always have valid block offset; qed")
807 .as_u64() as usize;
808 let block_candidates = state
809 .data
810 .blocks
811 .get(block_offset)
812 .expect("Valid block offsets always have block entries; qed");
813
814 for block_candidate in block_candidates {
815 let header = block_candidate.header();
816
817 if &*header.header().root() == block_root {
818 return match block_candidate.full_block() {
819 FullBlock::InMemory(block) => Ok(block.clone()),
820 FullBlock::Persisted {
821 header,
822 write_location,
823 } => {
824 let storage_backend_adapter = state.storage_backend_adapter.read().await;
825
826 let storage_item = storage_backend_adapter
827 .read_storage_item::<StorageItemBlock>(write_location)
828 .await?;
829
830 let storage_item_block = match storage_item {
831 StorageItemBlock::Block(storage_item_block) => storage_item_block,
832 StorageItemBlock::SegmentHeaders(_) => {
833 return Err(ReadBlockError::StorageItemReadError {
834 error: io::Error::other(
835 "Unexpected storage item: SegmentHeaders",
836 ),
837 });
838 }
839 };
840
841 let StorageItemBlockBlock {
842 header: _,
843 body,
844 mmr_with_block: _,
845 system_contract_states: _,
846 } = storage_item_block;
847
848 Block::from_buffers(header.buffer().clone(), body)
849 .ok_or(ReadBlockError::FailedToDecode)
850 }
851 };
852 }
853 }
854
855 unreachable!("Known block root always has block candidate associated with it; qed")
856 }
857
858 #[inline]
859 fn last_segment_header(&self) -> Option<SegmentHeader> {
860 let state = self.inner.state.read_blocking();
863 state.segment_headers_cache.last_segment_header()
864 }
865
866 #[inline]
867 fn max_local_segment_index(&self) -> Option<LocalSegmentIndex> {
868 let state = self.inner.state.read_blocking();
871
872 state.segment_headers_cache.max_local_segment_index()
873 }
874
875 #[inline]
876 fn get_segment_header(&self, segment_index: LocalSegmentIndex) -> Option<SegmentHeader> {
877 let state = self.inner.state.read_blocking();
880
881 state
882 .segment_headers_cache
883 .get_segment_header(segment_index)
884 }
885
886 #[inline]
887 fn segment_headers_for_block(&self, block_number: BlockNumber) -> Vec<SegmentHeader> {
888 let state = self.inner.state.read_blocking();
891
892 let Some(last_local_segment_index) = state.segment_headers_cache.max_local_segment_index()
893 else {
894 return Vec::new();
896 };
897
898 if Block::Block::SHARD_KIND == RealShardKind::BeaconChain
900 && block_number == BlockNumber::ONE
901 {
902 return vec![
905 state
906 .segment_headers_cache
907 .get_segment_header(LocalSegmentIndex::ZERO)
908 .expect("Segment headers are stored in monotonically increasing order; qed"),
909 ];
910 }
911
912 if last_local_segment_index == LocalSegmentIndex::ZERO {
913 return Vec::new();
915 }
916
917 let mut current_local_segment_index = last_local_segment_index;
918 loop {
919 let current_segment_header = state
922 .segment_headers_cache
923 .get_segment_header(current_local_segment_index)
924 .expect("Segment headers are stored in monotonically increasing order; qed");
925
926 let target_block_number = current_segment_header.last_archived_block.number()
928 + BlockNumber::ONE
929 + self.inner.options.confirmation_depth_k;
930 if target_block_number == block_number {
931 let mut headers_for_block = vec![current_segment_header];
932
933 let last_archived_block_number = current_segment_header.last_archived_block.number;
935 let mut local_segment_index = current_local_segment_index - LocalSegmentIndex::ONE;
936
937 while let Some(segment_header) = state
938 .segment_headers_cache
939 .get_segment_header(local_segment_index)
940 {
941 if segment_header.last_archived_block.number == last_archived_block_number {
942 headers_for_block.insert(0, segment_header);
943 local_segment_index -= LocalSegmentIndex::ONE;
944 } else {
945 break;
946 }
947 }
948
949 return headers_for_block;
950 }
951
952 if target_block_number > block_number {
954 if current_local_segment_index > LocalSegmentIndex::ONE {
956 current_local_segment_index -= LocalSegmentIndex::ONE
957 } else {
958 break;
959 }
960 } else {
961 return Vec::new();
963 }
964 }
965
966 Vec::new()
968 }
969}
970
971impl<Block, StorageBackend> ChainInfoWrite<Block> for ClientDatabase<Block, StorageBackend>
972where
973 Block: GenericOwnedBlock,
974 StorageBackend: ClientDatabaseStorageBackend,
975{
976 async fn persist_block(
977 &self,
978 block: Block,
979 block_details: BlockDetails,
980 ) -> Result<(), PersistBlockError> {
981 let mut state = self.inner.state.write().await;
982 let best_number = state.best_tip().number;
983
984 let header = block.header().header();
985
986 let block_number = header.prefix.number;
987
988 if best_number == BlockNumber::ZERO && block_number != BlockNumber::ONE {
989 Self::insert_first_block(&mut state.data, block, block_details);
991
992 return Ok(());
993 }
994
995 if block_number == best_number + BlockNumber::ONE {
996 return Self::insert_new_best_block(state, &self.inner, block, block_details).await;
997 }
998
999 let block_offset = best_number
1000 .checked_sub(block_number)
1001 .ok_or(PersistBlockError::MissingParent)?
1002 .as_u64() as usize;
1003
1004 if block_offset >= self.inner.options.confirmation_depth_k.as_u64() as usize {
1005 return Err(PersistBlockError::OutsideAcceptableRange);
1006 }
1007
1008 let state = &mut *state;
1009
1010 let block_forks = state.data.blocks.get_mut(block_offset).ok_or_else(|| {
1011 error!(
1012 %block_number,
1013 %block_offset,
1014 "Failed to store block fork, header offset is missing despite being within \
1015 acceptable range"
1016 );
1017
1018 PersistBlockError::OutsideAcceptableRange
1019 })?;
1020
1021 for (index, fork_tip) in state.data.fork_tips.iter_mut().enumerate() {
1022 if fork_tip.root == header.prefix.parent_root {
1024 state.data.fork_tips.remove(index);
1025 break;
1026 }
1027 }
1028
1029 let block_root = *header.root();
1030 state.data.fork_tips.insert(
1033 1,
1034 ForkTip {
1035 number: block_number,
1036 root: block_root,
1037 },
1038 );
1039 state.data.block_roots.insert(block_root, block_number);
1040 block_forks.push(ClientDatabaseBlock::InMemory(ClientDatabaseBlockInMemory {
1041 block,
1042 block_details,
1043 }));
1044
1045 Self::prune_outdated_fork_tips(block_number, &mut state.data, &self.inner.options);
1046
1047 Ok(())
1048 }
1049
1050 async fn persist_segment_headers(
1051 &self,
1052 segment_headers: Vec<SegmentHeader>,
1053 ) -> Result<(), PersistSegmentHeadersError> {
1054 let mut state = self.inner.state.write().await;
1055
1056 let added_segment_headers = state
1057 .segment_headers_cache
1058 .add_segment_headers(segment_headers)?;
1059
1060 if added_segment_headers.is_empty() {
1061 return Ok(());
1062 }
1063
1064 let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1069
1070 let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1071
1072 storage_backend_adapter
1073 .write_storage_item(StorageItemBlock::SegmentHeaders(
1074 StorageItemBlockSegmentHeaders {
1075 segment_headers: added_segment_headers,
1076 },
1077 ))
1078 .await?;
1079
1080 Ok(())
1081 }
1082}
1083
1084impl<Block, StorageBackend> ClientDatabase<Block, StorageBackend>
1085where
1086 Block: GenericOwnedBlock,
1087 StorageBackend: ClientDatabaseStorageBackend,
1088{
1089 pub async fn open<GBB>(
1093 options: ClientDatabaseOptions<GBB, StorageBackend>,
1094 ) -> Result<Self, ClientDatabaseError>
1095 where
1096 GBB: FnOnce() -> GenesisBlockBuilderResult<Block>,
1097 {
1098 let ClientDatabaseOptions {
1099 write_buffer_size,
1100 confirmation_depth_k,
1101 soft_confirmation_depth,
1102 max_fork_tips,
1103 max_fork_tip_distance,
1104 genesis_block_builder,
1105 storage_backend,
1106 } = options;
1107 if soft_confirmation_depth >= confirmation_depth_k {
1108 return Err(ClientDatabaseError::InvalidSoftConfirmationDepth);
1109 }
1110
1111 if max_fork_tip_distance > confirmation_depth_k {
1112 return Err(ClientDatabaseError::InvalidMaxForkTipDistance);
1113 }
1114
1115 let mut state_data = StateData {
1116 fork_tips: VecDeque::new(),
1117 block_roots: HashMap::default(),
1118 blocks: VecDeque::new(),
1119 };
1120 let mut segment_headers_cache = SegmentHeadersCache {
1121 segment_headers_cache: Vec::new(),
1122 };
1123
1124 let options = ClientDatabaseInnerOptions {
1125 confirmation_depth_k,
1126 soft_confirmation_depth,
1127 max_fork_tips,
1128 max_fork_tip_distance,
1129 };
1130
1131 let storage_item_handlers = StorageItemHandlers {
1132 permanent: |_arg| {
1133 Ok(())
1135 },
1136 block: |arg| {
1137 let StorageItemHandlerArg {
1138 storage_item,
1139 page_offset,
1140 num_pages,
1141 } = arg;
1142 let storage_item_block = match storage_item {
1143 StorageItemBlock::Block(storage_item_block) => storage_item_block,
1144 StorageItemBlock::SegmentHeaders(segment_headers) => {
1145 let num_segment_headers = segment_headers.segment_headers.len();
1146 return match segment_headers_cache
1147 .add_segment_headers(segment_headers.segment_headers)
1148 {
1149 Ok(_) => Ok(()),
1150 Err(error) => {
1151 error!(
1152 %page_offset,
1153 %num_segment_headers,
1154 %error,
1155 "Failed to add segment headers from storage item"
1156 );
1157
1158 Err(ClientDatabaseError::InvalidSegmentHeaders { page_offset })
1159 }
1160 };
1161 }
1162 };
1163
1164 let StorageItemBlockBlock {
1167 header,
1168 body: _,
1169 mmr_with_block,
1170 system_contract_states,
1171 } = storage_item_block;
1172
1173 let header = Block::Header::from_buffer(header).map_err(|_buffer| {
1174 error!(%page_offset, "Failed to decode block header from bytes");
1175
1176 ClientDatabaseError::InvalidBlock { page_offset }
1177 })?;
1178
1179 let block_root = *header.header().root();
1180 let block_number = header.header().prefix.number;
1181
1182 state_data.block_roots.insert(block_root, block_number);
1183
1184 let maybe_best_number = state_data
1185 .blocks
1186 .front()
1187 .and_then(|block_forks| block_forks.first())
1188 .map(|best_block| {
1189 let header: &Block::Header = best_block.header();
1191
1192 header.header().prefix.number
1193 });
1194
1195 let block_offset = if let Some(best_number) = maybe_best_number {
1196 if block_number <= best_number {
1197 (best_number - block_number).as_u64() as usize
1198 } else {
1199 if block_number - best_number != BlockNumber::ONE {
1201 error!(
1202 %page_offset,
1203 %best_number,
1204 %block_number,
1205 "Invalid new best block number, it must be only one block \
1206 higher than the best block"
1207 );
1208
1209 return Err(ClientDatabaseError::InvalidBlock { page_offset });
1210 }
1211
1212 state_data.blocks.push_front(SmallVec::new());
1213 0
1215 }
1216 } else {
1217 state_data.blocks.push_front(SmallVec::new());
1218 0
1220 };
1221
1222 let block_forks = match state_data.blocks.get_mut(block_offset) {
1223 Some(block_forks) => block_forks,
1224 None => {
1225 return Ok(());
1229 }
1230 };
1231
1232 block_forks.push(ClientDatabaseBlock::Persisted {
1234 header,
1235 block_details: BlockDetails {
1236 mmr_with_block,
1237 system_contract_states,
1238 },
1239 write_location: WriteLocation {
1240 page_offset,
1241 num_pages,
1242 },
1243 });
1244
1245 if block_offset == 0 && block_forks.len() == 1 {
1248 Self::confirm_canonical_block(block_number, &mut state_data, &options);
1249 }
1250
1251 Ok(())
1252 },
1253 };
1254
1255 let storage_backend_adapter =
1256 StorageBackendAdapter::open(write_buffer_size, storage_item_handlers, storage_backend)
1257 .await?;
1258
1259 if let Some(best_block) = state_data.blocks.front().and_then(|block_forks| {
1260 block_forks.last()
1263 }) {
1264 let header: &Block::Header = best_block.header();
1266 let header = header.header();
1267 let block_number = header.prefix.number;
1268 let block_root = *header.root();
1269
1270 if !Self::adjust_ancestor_block_forks(&mut state_data.blocks, block_root) {
1271 return Err(ClientDatabaseError::FailedToAdjustAncestorBlockForks);
1272 }
1273
1274 state_data.fork_tips.push_front(ForkTip {
1276 number: block_number,
1277 root: block_root,
1278 });
1279 } else {
1280 let GenesisBlockBuilderResult {
1281 block,
1282 system_contract_states,
1283 } = genesis_block_builder();
1284
1285 let header = block.header().header();
1287 let block_number = header.prefix.number;
1288 let block_root = *header.root();
1289
1290 state_data.fork_tips.push_front(ForkTip {
1291 number: block_number,
1292 root: block_root,
1293 });
1294 state_data.block_roots.insert(block_root, block_number);
1295 state_data
1296 .blocks
1297 .push_front(smallvec![ClientDatabaseBlock::InMemory(
1298 ClientDatabaseBlockInMemory {
1299 block,
1300 block_details: BlockDetails {
1301 system_contract_states,
1302 mmr_with_block: Arc::new({
1303 let mut mmr = BlockMerkleMountainRange::new();
1304 mmr.add_leaf(&block_root);
1305 mmr
1306 })
1307 },
1308 }
1309 )]);
1310 }
1311
1312 let state = State {
1313 data: state_data,
1314 segment_headers_cache,
1315 storage_backend_adapter: AsyncRwLock::new(storage_backend_adapter),
1316 };
1317
1318 let inner = Inner {
1319 state: AsyncRwLock::new(state),
1320 options,
1321 };
1322
1323 Ok(Self {
1324 inner: Arc::new(inner),
1325 })
1326 }
1327
1328 pub async fn format(
1330 storage_backend: &StorageBackend,
1331 options: ClientDatabaseFormatOptions,
1332 ) -> Result<(), ClientDatabaseFormatError> {
1333 StorageBackendAdapter::format(storage_backend, options).await
1334 }
1335
1336 fn insert_first_block(state: &mut StateData<Block>, block: Block, block_details: BlockDetails) {
1337 let header = block.header().header();
1339 let block_number = header.prefix.number;
1340 let block_root = *header.root();
1341
1342 state.fork_tips.clear();
1343 state.fork_tips.push_front(ForkTip {
1344 number: block_number,
1345 root: block_root,
1346 });
1347 state.block_roots.clear();
1348 state.block_roots.insert(block_root, block_number);
1349 state.blocks.clear();
1350 state
1351 .blocks
1352 .push_front(smallvec![ClientDatabaseBlock::InMemory(
1353 ClientDatabaseBlockInMemory {
1354 block,
1355 block_details,
1356 }
1357 )]);
1358 }
1359
1360 async fn insert_new_best_block(
1361 mut state: AsyncRwLockWriteGuard<'_, State<Block, StorageBackend>>,
1362 inner: &Inner<Block, StorageBackend>,
1363 block: Block,
1364 block_details: BlockDetails,
1365 ) -> Result<(), PersistBlockError> {
1366 let header = block.header().header();
1367 let block_number = header.prefix.number;
1368 let block_root = *header.root();
1369 let parent_root = header.prefix.parent_root;
1370
1371 if !Self::adjust_ancestor_block_forks(&mut state.data.blocks, parent_root) {
1374 return Err(PersistBlockError::MissingParent);
1375 }
1376
1377 {
1379 for (index, fork_tip) in state.data.fork_tips.iter_mut().enumerate() {
1380 if fork_tip.root == parent_root {
1382 state.data.fork_tips.remove(index);
1383 break;
1384 }
1385 }
1386
1387 state.data.fork_tips.push_front(ForkTip {
1388 number: block_number,
1389 root: block_root,
1390 });
1391 state.data.block_roots.insert(block_root, block_number);
1392 state
1393 .data
1394 .blocks
1395 .push_front(smallvec![ClientDatabaseBlock::InMemory(
1396 ClientDatabaseBlockInMemory {
1397 block,
1398 block_details: block_details.clone()
1399 }
1400 )]);
1401 }
1402
1403 let options = &inner.options;
1404
1405 Self::confirm_canonical_block(block_number, &mut state.data, options);
1406 Self::prune_outdated_fork_tips(block_number, &mut state.data, options);
1407
1408 let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1413
1414 let mut blocks_to_persist = Vec::new();
1415 for block_offset in options.soft_confirmation_depth.as_u64() as usize.. {
1416 let Some(fork_blocks) = state.data.blocks.get(block_offset) else {
1417 break;
1418 };
1419
1420 let len_before = blocks_to_persist.len();
1421 fork_blocks
1422 .iter()
1423 .enumerate()
1424 .filter_map(|(fork_offset, client_database_block)| {
1425 match client_database_block {
1426 ClientDatabaseBlock::InMemory(block) => Some(BlockToPersist {
1427 block_offset,
1428 fork_offset,
1429 block,
1430 }),
1431 ClientDatabaseBlock::Persisted { .. }
1432 | ClientDatabaseBlock::PersistedConfirmed { .. } => {
1433 None
1435 }
1436 }
1437 })
1438 .collect_into(&mut blocks_to_persist);
1439
1440 if blocks_to_persist.len() == len_before {
1441 break;
1442 }
1443 }
1444
1445 let mut persisted_blocks = Vec::with_capacity(blocks_to_persist.len());
1447 {
1448 let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1449
1450 for block_to_persist in blocks_to_persist.into_iter().rev() {
1451 let BlockToPersist {
1452 block_offset,
1453 fork_offset,
1454 block,
1455 } = block_to_persist;
1456
1457 let write_location = storage_backend_adapter
1458 .write_storage_item(StorageItemBlock::Block(StorageItemBlockBlock {
1459 header: block.block.header().buffer().clone(),
1460 body: block.block.body().buffer().clone(),
1461 mmr_with_block: Arc::clone(&block.block_details.mmr_with_block),
1462 system_contract_states: StdArc::clone(
1463 &block.block_details.system_contract_states,
1464 ),
1465 }))
1466 .await?;
1467
1468 persisted_blocks.push(PersistedBlock {
1469 block_offset,
1470 fork_offset,
1471 write_location,
1472 });
1473 }
1474 }
1475
1476 let mut state = RwLockUpgradableReadGuard::upgrade(state).await;
1478 for persisted_block in persisted_blocks {
1479 let PersistedBlock {
1480 block_offset,
1481 fork_offset,
1482 write_location,
1483 } = persisted_block;
1484
1485 let block = state
1486 .data
1487 .blocks
1488 .get_mut(block_offset)
1489 .expect("Still holding the same lock since last check; qed")
1490 .get_mut(fork_offset)
1491 .expect("Still holding the same lock since last check; qed");
1492
1493 replace_with_or_abort(block, |block| {
1494 if let ClientDatabaseBlock::InMemory(in_memory) = block {
1495 let (header, _body) = in_memory.block.split();
1496
1497 ClientDatabaseBlock::Persisted {
1498 header,
1499 block_details: in_memory.block_details,
1500 write_location,
1501 }
1502 } else {
1503 unreachable!("Still holding the same lock since last check; qed");
1504 }
1505 });
1506 }
1507
1508 Ok(())
1512 }
1513
1514 #[must_use]
1519 fn adjust_ancestor_block_forks(
1520 blocks: &mut VecDeque<SmallVec<[ClientDatabaseBlock<Block>; 2]>>,
1521 mut parent_block_root: BlockRoot,
1522 ) -> bool {
1523 let mut ancestor_blocks = blocks.iter_mut();
1524
1525 loop {
1526 if ancestor_blocks.len() == 1 {
1527 break;
1529 }
1530
1531 let Some(parent_blocks) = ancestor_blocks.next() else {
1532 break;
1534 };
1535
1536 let Some(fork_offset_parent_block_root) =
1537 parent_blocks
1538 .iter()
1539 .enumerate()
1540 .find_map(|(fork_offset, fork_block)| {
1541 let fork_header = fork_block.header().header();
1542 if *fork_header.root() == parent_block_root {
1543 Some((fork_offset, fork_header.prefix.parent_root))
1544 } else {
1545 None
1546 }
1547 })
1548 else {
1549 return false;
1550 };
1551
1552 let fork_offset;
1553 (fork_offset, parent_block_root) = fork_offset_parent_block_root;
1554
1555 parent_blocks.swap(0, fork_offset);
1556 }
1557
1558 true
1559 }
1560
1561 fn prune_outdated_fork_tips(
1568 best_number: BlockNumber,
1569 state: &mut StateData<Block>,
1570 options: &ClientDatabaseInnerOptions,
1571 ) {
1572 let state = &mut *state;
1573
1574 let mut candidate_forks_to_remove = Vec::with_capacity(options.max_fork_tips.get());
1577
1578 state.fork_tips.retain(|fork_tip| {
1580 if best_number - fork_tip.number > options.max_fork_tip_distance {
1581 candidate_forks_to_remove.push(*fork_tip);
1582 false
1583 } else {
1584 true
1585 }
1586 });
1587 if state.fork_tips.len() > options.max_fork_tips.get() {
1589 state
1590 .fork_tips
1591 .drain(options.max_fork_tips.get()..)
1592 .collect_into(&mut candidate_forks_to_remove);
1593 }
1594
1595 candidate_forks_to_remove
1597 .retain(|fork_tip| !Self::prune_outdated_fork(best_number, fork_tip, state));
1598 state.fork_tips.extend(candidate_forks_to_remove);
1600 }
1601
1602 #[must_use]
1605 fn prune_outdated_fork(
1606 best_number: BlockNumber,
1607 fork_tip: &ForkTip,
1608 state: &mut StateData<Block>,
1609 ) -> bool {
1610 let block_offset = (best_number - fork_tip.number).as_u64() as usize;
1611
1612 let mut block_root_to_prune = fork_tip.root;
1614 let mut pruned_tip = false;
1615 for block_offset in block_offset.. {
1616 let Some(fork_blocks) = state.blocks.get_mut(block_offset) else {
1617 if !pruned_tip {
1618 error!(
1619 %best_number,
1620 ?fork_tip,
1621 block_offset,
1622 "Block offset was not present in the database, this is an implementation \
1623 bug #1"
1624 );
1625 }
1626 break;
1628 };
1629
1630 if fork_blocks.len() == 1 {
1631 if !pruned_tip {
1632 error!(
1633 %best_number,
1634 ?fork_tip,
1635 block_offset,
1636 "Block offset was not present in the database, this is an implementation \
1637 bug #2"
1638 );
1639 }
1640
1641 break;
1643 }
1644
1645 let Some((fork_offset, block)) = fork_blocks
1646 .iter()
1647 .enumerate()
1648 .skip(1)
1650 .find(|(_fork_offset, block)| {
1651 *block.header().header().root() == block_root_to_prune
1652 })
1653 else {
1654 if !pruned_tip {
1655 error!(
1656 %best_number,
1657 ?fork_tip,
1658 block_offset,
1659 "Block offset was not present in the database, this is an implementation \
1660 bug #3"
1661 );
1662 }
1663
1664 break;
1666 };
1667
1668 if block.header().ref_count() > 1 {
1670 break;
1671 }
1672
1673 match block {
1675 ClientDatabaseBlock::InMemory(_) => {
1676 }
1678 ClientDatabaseBlock::Persisted { .. }
1679 | ClientDatabaseBlock::PersistedConfirmed { .. } => {
1680 pruned_tip = true;
1682 break;
1683 }
1684 }
1685
1686 state.block_roots.get_mut(&block_root_to_prune);
1687 block_root_to_prune = block.header().header().prefix.parent_root;
1688 fork_blocks.swap_remove(fork_offset);
1689
1690 pruned_tip = true;
1691 }
1692
1693 pruned_tip
1694 }
1695
1696 fn confirm_canonical_block(
1699 best_number: BlockNumber,
1700 state_data: &mut StateData<Block>,
1701 options: &ClientDatabaseInnerOptions,
1702 ) {
1703 let block_offset = (options.confirmation_depth_k + BlockNumber::ONE).as_u64() as usize;
1707
1708 let Some(fork_blocks) = state_data.blocks.get_mut(block_offset) else {
1709 return;
1711 };
1712
1713 {
1715 let Some(canonical_block) = fork_blocks.first_mut() else {
1716 error!(
1717 %best_number,
1718 block_offset,
1719 "Have not found a canonical block to confirm, this is an implementation bug"
1720 );
1721 return;
1722 };
1723
1724 replace_with_or_abort(canonical_block, |block| match block {
1725 ClientDatabaseBlock::InMemory(_) => {
1726 error!(
1727 %best_number,
1728 block_offset,
1729 header = ?block.header(),
1730 "Block to be confirmed must not be in memory, this is an implementation bug"
1731 );
1732 block
1733 }
1734 ClientDatabaseBlock::Persisted {
1735 header,
1736 block_details: _,
1737 write_location,
1738 } => ClientDatabaseBlock::PersistedConfirmed {
1739 header,
1740 write_location,
1741 },
1742 ClientDatabaseBlock::PersistedConfirmed { .. } => {
1743 error!(
1744 %best_number,
1745 block_offset,
1746 header = ?block.header(),
1747 "Block to be confirmed must not be confirmed yet, this is an \
1748 implementation bug"
1749 );
1750 block
1751 }
1752 });
1753 }
1754
1755 let mut block_roots_to_prune = fork_blocks
1757 .drain(1..)
1758 .map(|block| *block.header().header().root())
1759 .collect::<Vec<_>>();
1760 let mut current_block_offset = block_offset;
1761 while !block_roots_to_prune.is_empty() {
1762 state_data
1764 .fork_tips
1765 .retain(|fork_tip| !block_roots_to_prune.contains(&fork_tip.root));
1766
1767 for block_root in &block_roots_to_prune {
1769 state_data.block_roots.remove(block_root);
1770 }
1771
1772 if let Some(next_block_offset) = current_block_offset.checked_sub(1) {
1774 current_block_offset = next_block_offset;
1775 } else {
1776 break;
1778 }
1779
1780 let fork_blocks = state_data
1781 .blocks
1782 .get_mut(current_block_offset)
1783 .expect("Lower block offset always exists; qed");
1784
1785 block_roots_to_prune = fork_blocks
1787 .drain_filter(|block| {
1788 let header = block.header().header();
1789
1790 block_roots_to_prune.contains(&header.prefix.parent_root)
1791 })
1792 .map(|block| *block.header().header().root())
1793 .collect();
1794 }
1795 }
1796}