1#![expect(incomplete_features, reason = "generic_const_exprs")]
44#![feature(generic_const_exprs)]
47#![feature(
48 const_block_items,
49 const_convert,
50 const_trait_impl,
51 default_field_values,
52 get_mut_unchecked,
53 iter_collect_into,
54 maybe_uninit_fill
55)]
56
57mod page_group;
58pub mod storage_backend;
59mod storage_backend_adapter;
60
61use crate::page_group::temporary::StorageItemTemporary;
62use crate::page_group::temporary::block::StorageItemTemporaryBlock;
63use crate::page_group::temporary::segment_headers::StorageItemTemporarySegmentHeaders;
64use crate::page_group::temporary::super_segment_headers::StorageItemTemporarySuperSegmentHeaders;
65use crate::storage_backend::ClientDatabaseStorageBackend;
66use crate::storage_backend_adapter::{
67 StorageBackendAdapter, StorageItemHandlerArg, StorageItemHandlers, WriteLocation,
68};
69use ab_client_api::{
70 BeaconChainInfo, BeaconChainInfoWrite, BlockDetails, BlockMerkleMountainRange, ChainInfo,
71 ChainInfoWrite, ContractSlotState, PersistBlockError, PersistSegmentHeadersError,
72 PersistSuperSegmentHeadersError, ReadBlockError, ShardSegmentRoot, ShardSegmentRootsError,
73};
74use ab_core_primitives::block::body::BeaconChainBody;
75use ab_core_primitives::block::body::owned::{GenericOwnedBlockBody, OwnedBeaconChainBody};
76use ab_core_primitives::block::header::GenericBlockHeader;
77use ab_core_primitives::block::header::owned::GenericOwnedBlockHeader;
78use ab_core_primitives::block::owned::{GenericOwnedBlock, OwnedBeaconChainBlock};
79use ab_core_primitives::block::{BlockNumber, BlockRoot, GenericBlock};
80use ab_core_primitives::segments::{
81 LocalSegmentIndex, SegmentHeader, SegmentIndex, SuperSegmentHeader, SuperSegmentIndex,
82};
83use ab_core_primitives::shard::RealShardKind;
84use ab_io_type::trivial_type::TrivialType;
85use async_lock::{
86 RwLock as AsyncRwLock, RwLockUpgradableReadGuard, RwLockWriteGuard as AsyncRwLockWriteGuard,
87};
88use rand::rngs::SysError;
89use rclite::Arc;
90use replace_with::replace_with_or_abort;
91use smallvec::{SmallVec, smallvec};
92use std::any::Any;
93use std::collections::{HashMap, VecDeque};
94use std::hash::{BuildHasherDefault, Hasher};
95use std::num::{NonZeroU32, NonZeroUsize};
96use std::ops::Deref;
97use std::sync::Arc as StdArc;
98use std::{fmt, io};
99use tracing::error;
100
101#[derive(Debug, Copy, Clone, Eq, PartialEq, TrivialType)]
103#[repr(C)]
104pub struct DatabaseId([u8; 32]);
105
106impl Deref for DatabaseId {
107 type Target = [u8; 32];
108
109 #[inline(always)]
110 fn deref(&self) -> &Self::Target {
111 &self.0
112 }
113}
114
115impl AsRef<[u8]> for DatabaseId {
116 #[inline(always)]
117 fn as_ref(&self) -> &[u8] {
118 &self.0
119 }
120}
121
122impl DatabaseId {
123 #[inline(always)]
124 pub const fn new(bytes: [u8; 32]) -> Self {
125 Self(bytes)
126 }
127}
128
129#[derive(Default)]
130struct BlockRootHasher(u64);
131
132impl Hasher for BlockRootHasher {
133 #[inline(always)]
134 fn finish(&self) -> u64 {
135 self.0
136 }
137
138 #[inline(always)]
139 fn write(&mut self, bytes: &[u8]) {
140 let Some(state) = bytes.as_chunks().0.first().copied().map(u64::from_le_bytes) else {
141 return;
142 };
143
144 self.0 = state;
145 }
146}
147
148#[derive(Debug)]
149pub struct GenesisBlockBuilderResult<Block> {
150 pub block: Block,
152 pub system_contract_states: StdArc<[ContractSlotState]>,
154}
155
156#[derive(Debug, Copy, Clone)]
158pub struct ClientDatabaseOptions<GBB, StorageBackend> {
159 pub write_buffer_size: usize = 5,
167 pub block_confirmation_depth: BlockNumber,
173 pub soft_confirmation_depth: BlockNumber = BlockNumber::from(3),
187 pub max_fork_tips: NonZeroUsize = NonZeroUsize::new(3).expect("Not zero; qed"),
198 pub max_fork_tip_distance: BlockNumber = BlockNumber::from(5),
209 pub genesis_block_builder: GBB,
212 pub storage_backend: StorageBackend,
214}
215
216#[derive(Debug, Copy, Clone)]
218pub struct ClientDatabaseFormatOptions {
219 pub page_group_size: NonZeroU32,
236 pub force: bool,
240}
241
242#[derive(Debug, thiserror::Error)]
243pub enum ClientDatabaseError {
244 #[error("Invalid soft confirmation depth, it must be smaller than confirmation depth k")]
246 InvalidSoftConfirmationDepth,
247 #[error("Invalid max fork tip distance, it must be smaller or equal to confirmation depth k")]
249 InvalidMaxForkTipDistance,
250 #[error("Storage backend has canceled read request")]
252 ReadRequestCancelled,
253 #[error("Storage backend read error: {error}")]
255 ReadError {
256 error: io::Error,
258 },
259 #[error("Unsupported database version: {database_version}")]
261 UnsupportedDatabaseVersion {
262 database_version: u8,
264 },
265 #[error("Page group size is too small ({page_group_size}), must be at least two pages")]
267 PageGroupSizeTooSmall {
268 page_group_size: u32,
270 },
271 #[error(
273 "Unexpected sequence number {actual} at page offset {page_offset} (expected \
274 {expected})"
275 )]
276 UnexpectedSequenceNumber {
277 actual: u64,
279 expected: u64,
281 page_offset: u32,
283 },
284 #[error("Unexpected storage item at offset {page_offset}: {storage_item:?}")]
286 UnexpectedStorageItem {
287 storage_item: Box<dyn fmt::Debug + Send + Sync>,
289 page_offset: u32,
291 },
292 #[error("Invalid block at offset {page_offset}")]
294 InvalidBlock {
295 page_offset: u32,
297 },
298 #[error("Invalid segment headers at offset {page_offset}")]
300 InvalidSegmentHeaders {
301 page_offset: u32,
303 },
304 #[error("Failed to adjust ancestor block forks")]
306 FailedToAdjustAncestorBlockForks,
307 #[error("Database is not formatted yet")]
309 Unformatted,
310 #[error("Non-permanent first page group")]
312 NonPermanentFirstPageGroup,
313}
314
315#[derive(Debug, thiserror::Error)]
317pub enum ClientDatabaseFormatError {
318 #[error("Storage backend has canceled read request")]
320 ReadRequestCancelled,
321 #[error("Storage backend read error: {error}")]
323 ReadError {
324 error: io::Error,
326 },
327 #[error("Failed to generate database id")]
329 FailedToGenerateDatabaseId {
330 #[from]
332 error: SysError,
333 },
334 #[error("Database is already formatted yet")]
336 AlreadyFormatted,
337 #[error("Storage backend has canceled a writing request")]
339 WriteRequestCancelled,
340 #[error("Storage item write error")]
342 StorageItemWriteError {
343 #[from]
345 error: io::Error,
346 },
347}
348
349#[derive(Debug, Copy, Clone)]
350struct ForkTip {
351 number: BlockNumber,
352 root: BlockRoot,
353}
354
355enum FullBlock<'a, Block>
356where
357 Block: GenericOwnedBlock,
358{
359 InMemory(&'a Block),
360 Persisted {
361 header: &'a Block::Header,
362 write_location: WriteLocation,
363 },
364}
365
366#[derive(Debug)]
367struct BeaconChainBlockDetails {
368 shard_segment_roots: StdArc<[ShardSegmentRoot]>,
370}
371
372impl BeaconChainBlockDetails {
373 fn from_body(body: &BeaconChainBody<'_>) -> Self {
374 let shard_segment_roots = body
375 .intermediate_shard_blocks()
376 .iter()
377 .flat_map(|intermediate_shard_block_info| {
378 let own_segments = intermediate_shard_block_info
379 .own_segments
380 .into_iter()
381 .flat_map({
382 let shard_index = intermediate_shard_block_info.header.prefix.shard_index;
383
384 move |own_segments| {
385 (own_segments.first_local_segment_index..)
386 .zip(own_segments.segment_roots)
387 .map(move |(segment_index, &segment_root)| ShardSegmentRoot {
388 shard_index,
389 segment_index,
390 segment_root,
391 })
392 }
393 });
394 let child_shard_segment_roots = intermediate_shard_block_info
395 .leaf_shards_segments()
396 .flat_map(move |(shard_index, own_segments)| {
397 (own_segments.first_local_segment_index..)
398 .zip(own_segments.segment_roots)
399 .map(move |(segment_index, &segment_root)| ShardSegmentRoot {
400 shard_index,
401 segment_index,
402 segment_root,
403 })
404 });
405
406 own_segments.chain(child_shard_segment_roots)
407 })
408 .collect();
409
410 Self {
411 shard_segment_roots,
412 }
413 }
414}
415
416#[derive(Debug)]
423enum ClientDatabaseBlock<Block>
424where
425 Block: GenericOwnedBlock,
426{
427 InMemory {
429 block: Block,
430 block_details: BlockDetails,
431 beacon_chain_block_details: Option<BeaconChainBlockDetails>,
433 },
434 Persisted {
436 header: Block::Header,
437 block_details: BlockDetails,
438 beacon_chain_block_details: Option<BeaconChainBlockDetails>,
440 write_location: WriteLocation,
441 },
442 PersistedConfirmed {
445 header: Block::Header,
446 beacon_chain_block_details: Option<BeaconChainBlockDetails>,
448 write_location: WriteLocation,
449 },
450}
451
452impl<Block> ClientDatabaseBlock<Block>
453where
454 Block: GenericOwnedBlock,
455{
456 #[inline(always)]
457 fn header(&self) -> &Block::Header {
458 match self {
459 Self::InMemory { block, .. } => block.header(),
460 Self::Persisted { header, .. } | Self::PersistedConfirmed { header, .. } => header,
461 }
462 }
463
464 #[inline(always)]
465 fn full_block(&self) -> FullBlock<'_, Block> {
466 match self {
467 Self::InMemory { block, .. } => FullBlock::InMemory(block),
468 Self::Persisted {
469 header,
470 write_location,
471 ..
472 }
473 | Self::PersistedConfirmed {
474 header,
475 write_location,
476 ..
477 } => FullBlock::Persisted {
478 header,
479 write_location: *write_location,
480 },
481 }
482 }
483
484 #[inline(always)]
485 fn block_details(&self) -> Option<&BlockDetails> {
486 match self {
487 Self::InMemory { block_details, .. } | Self::Persisted { block_details, .. } => {
488 Some(block_details)
489 }
490 Self::PersistedConfirmed { .. } => None,
491 }
492 }
493
494 #[inline(always)]
495 fn beacon_chain_block_details(&self) -> &Option<BeaconChainBlockDetails> {
496 match self {
497 Self::InMemory {
498 beacon_chain_block_details,
499 ..
500 }
501 | Self::Persisted {
502 beacon_chain_block_details,
503 ..
504 }
505 | Self::PersistedConfirmed {
506 beacon_chain_block_details,
507 ..
508 } => beacon_chain_block_details,
509 }
510 }
511}
512
513#[derive(Debug)]
514struct StateData<Block>
515where
516 Block: GenericOwnedBlock,
517{
518 fork_tips: VecDeque<ForkTip>,
523 block_roots: HashMap<BlockRoot, BlockNumber, BuildHasherDefault<BlockRootHasher>>,
528 blocks: VecDeque<SmallVec<[ClientDatabaseBlock<Block>; 2]>>,
543}
544
545#[derive(Debug)]
546struct SegmentHeadersCache {
547 segment_headers_cache: Vec<SegmentHeader>,
548}
549
550impl SegmentHeadersCache {
551 #[inline(always)]
552 fn last_segment_header(&self) -> Option<SegmentHeader> {
553 self.segment_headers_cache.last().cloned()
554 }
555
556 #[inline(always)]
557 fn max_local_segment_index(&self) -> Option<LocalSegmentIndex> {
558 self.segment_headers_cache
559 .last()
560 .map(|segment_header| segment_header.index.as_inner())
561 }
562
563 #[inline(always)]
564 fn get_segment_header(&self, local_segment_index: LocalSegmentIndex) -> Option<SegmentHeader> {
565 self.segment_headers_cache
566 .get(u64::from(local_segment_index) as usize)
567 .copied()
568 }
569
570 fn add_segment_headers(
572 &mut self,
573 mut segment_headers: Vec<SegmentHeader>,
574 ) -> Result<Vec<SegmentHeader>, PersistSegmentHeadersError> {
575 self.segment_headers_cache.reserve(segment_headers.len());
576
577 let mut maybe_last_local_segment_index = self.max_local_segment_index();
578
579 if let Some(last_segment_index) = maybe_last_local_segment_index {
580 segment_headers
582 .retain(|segment_header| segment_header.index.as_inner() > last_segment_index);
583 }
584
585 for segment_header in segment_headers.iter().copied() {
588 let local_segment_index = segment_header.index.as_inner();
589 match maybe_last_local_segment_index {
590 Some(last_local_segment_index) => {
591 if local_segment_index != last_local_segment_index + LocalSegmentIndex::ONE {
592 return Err(PersistSegmentHeadersError::MustFollowLastSegmentIndex {
593 local_segment_index,
594 last_local_segment_index,
595 });
596 }
597
598 self.segment_headers_cache.push(segment_header);
599 maybe_last_local_segment_index.replace(local_segment_index);
600 }
601 None => {
602 if local_segment_index != LocalSegmentIndex::ZERO {
603 return Err(PersistSegmentHeadersError::FirstSegmentIndexZero {
604 local_segment_index,
605 });
606 }
607
608 self.segment_headers_cache.push(segment_header);
609 maybe_last_local_segment_index.replace(local_segment_index);
610 }
611 }
612 }
613
614 Ok(segment_headers)
615 }
616}
617
618#[derive(Debug)]
619struct SuperSegmentHeadersCache {
620 super_segment_headers_cache: Vec<SuperSegmentHeader>,
621}
622
623impl SuperSegmentHeadersCache {
624 #[inline(always)]
625 fn last_super_segment_header(&self) -> Option<SuperSegmentHeader> {
626 self.super_segment_headers_cache.last().cloned()
627 }
628
629 #[inline]
630 fn previous_super_segment_header(
631 &self,
632 target_block_number: BlockNumber,
633 ) -> Option<SuperSegmentHeader> {
634 let block_number = target_block_number.checked_sub(BlockNumber::ONE)?;
635 let index = match self.super_segment_headers_cache.binary_search_by_key(
636 &block_number,
637 |super_segment_header| {
638 super_segment_header
639 .target_beacon_chain_block_number
640 .as_inner()
641 },
642 ) {
643 Ok(found_index) => found_index,
644 Err(insert_index) => insert_index.checked_sub(1)?,
645 };
646
647 self.super_segment_headers_cache.get(index).copied()
648 }
649
650 #[inline(always)]
651 fn get_super_segment_header(
652 &self,
653 local_segment_index: SuperSegmentIndex,
654 ) -> Option<SuperSegmentHeader> {
655 self.super_segment_headers_cache
656 .get(u64::from(local_segment_index) as usize)
657 .copied()
658 }
659
660 #[inline(always)]
661 fn get_super_segment_header_for_segment_index(
662 &self,
663 segment_index: SegmentIndex,
664 ) -> Option<SuperSegmentHeader> {
665 let index = self
666 .super_segment_headers_cache
667 .binary_search_by_key(&segment_index, |super_segment_header| {
668 super_segment_header.max_segment_index.as_inner()
669 })
670 .unwrap_or_else(|insert_index| insert_index);
671
672 let super_segment_header = self.super_segment_headers_cache.get(index).copied()?;
673
674 let max_segment_index = super_segment_header.max_segment_index.as_inner();
675 let first_segment_index = max_segment_index
676 - SegmentIndex::from(u64::from(super_segment_header.num_segments))
677 + SegmentIndex::ONE;
678
679 (first_segment_index..=max_segment_index)
680 .contains(&segment_index)
681 .then_some(super_segment_header)
682 }
683
684 fn add_super_segment_headers(
686 &mut self,
687 mut super_segment_headers: Vec<SuperSegmentHeader>,
688 ) -> Result<Vec<SuperSegmentHeader>, PersistSuperSegmentHeadersError> {
689 self.super_segment_headers_cache
690 .reserve(super_segment_headers.len());
691
692 let mut maybe_last_super_segment_index = self
693 .super_segment_headers_cache
694 .last()
695 .map(|header| header.index.as_inner());
696
697 if let Some(last_super_segment_index) = maybe_last_super_segment_index {
698 super_segment_headers.retain(|super_segment_header| {
700 super_segment_header.index.as_inner() > last_super_segment_index
701 });
702 }
703
704 for super_segment_header in super_segment_headers.iter().copied() {
707 let super_segment_index = super_segment_header.index.as_inner();
708 match maybe_last_super_segment_index {
709 Some(last_super_segment_index) => {
710 if super_segment_index != last_super_segment_index + SuperSegmentIndex::ONE {
711 return Err(
712 PersistSuperSegmentHeadersError::MustFollowLastSegmentIndex {
713 super_segment_index,
714 last_super_segment_index,
715 },
716 );
717 }
718
719 self.super_segment_headers_cache.push(super_segment_header);
720 maybe_last_super_segment_index.replace(super_segment_index);
721 }
722 None => {
723 if super_segment_index != SuperSegmentIndex::ZERO {
724 return Err(PersistSuperSegmentHeadersError::FirstSegmentIndexZero {
725 super_segment_index,
726 });
727 }
728
729 self.super_segment_headers_cache.push(super_segment_header);
730 maybe_last_super_segment_index.replace(super_segment_index);
731 }
732 }
733 }
734
735 Ok(super_segment_headers)
736 }
737}
738
739#[derive(Debug)]
741struct State<Block, StorageBackend>
742where
743 Block: GenericOwnedBlock,
744{
745 data: StateData<Block>,
746 segment_headers_cache: SegmentHeadersCache,
747 super_segment_headers_cache: SuperSegmentHeadersCache,
748 storage_backend_adapter: AsyncRwLock<StorageBackendAdapter<StorageBackend>>,
749}
750
751impl<Block, StorageBackend> State<Block, StorageBackend>
752where
753 Block: GenericOwnedBlock,
754{
755 #[inline(always)]
756 fn best_tip(&self) -> &ForkTip {
757 self.data
758 .fork_tips
759 .front()
760 .expect("The best block is always present; qed")
761 }
762
763 #[inline(always)]
764 fn best_block(&self) -> &ClientDatabaseBlock<Block> {
765 self.data
766 .blocks
767 .front()
768 .expect("The best block is always present; qed")
769 .first()
770 .expect("The best block is always present; qed")
771 }
772}
773
774#[derive(Debug)]
775struct BlockToPersist<'a, Block>
776where
777 Block: GenericOwnedBlock,
778{
779 block_offset: usize,
780 fork_offset: usize,
781 block: &'a Block,
782 block_details: &'a BlockDetails,
783}
784
785#[derive(Debug)]
786struct PersistedBlock {
787 block_offset: usize,
788 fork_offset: usize,
789 write_location: WriteLocation,
790}
791
792#[derive(Debug)]
793struct ClientDatabaseInnerOptions {
794 block_confirmation_depth: BlockNumber,
795 soft_confirmation_depth: BlockNumber,
796 max_fork_tips: NonZeroUsize,
797 max_fork_tip_distance: BlockNumber,
798}
799
800#[derive(Debug)]
801struct Inner<Block, StorageBackend>
802where
803 Block: GenericOwnedBlock,
804{
805 state: AsyncRwLock<State<Block, StorageBackend>>,
806 options: ClientDatabaseInnerOptions,
807}
808
809#[derive(Debug)]
811pub struct ClientDatabase<Block, StorageBackend>
812where
813 Block: GenericOwnedBlock,
814{
815 inner: Arc<Inner<Block, StorageBackend>>,
816}
817
818impl<Block, StorageBackend> Clone for ClientDatabase<Block, StorageBackend>
819where
820 Block: GenericOwnedBlock,
821{
822 fn clone(&self) -> Self {
823 Self {
824 inner: self.inner.clone(),
825 }
826 }
827}
828
829impl<Block, StorageBackend> Drop for ClientDatabase<Block, StorageBackend>
830where
831 Block: GenericOwnedBlock,
832{
833 fn drop(&mut self) {
834 }
836}
837
838impl<Block, StorageBackend> ChainInfo<Block> for ClientDatabase<Block, StorageBackend>
839where
840 Block: GenericOwnedBlock,
841 StorageBackend: ClientDatabaseStorageBackend,
842{
843 #[inline]
844 fn best_root(&self) -> BlockRoot {
845 self.inner.state.read_blocking().best_tip().root
848 }
849
850 #[inline]
851 fn best_header(&self) -> Block::Header {
852 self.inner
855 .state
856 .read_blocking()
857 .best_block()
858 .header()
859 .clone()
860 }
861
862 #[inline]
863 fn best_header_with_details(&self) -> (Block::Header, BlockDetails) {
864 let state = self.inner.state.read_blocking();
867 let best_block = state.best_block();
868 (
869 best_block.header().clone(),
870 best_block
871 .block_details()
872 .expect("Always present for the best block; qed")
873 .clone(),
874 )
875 }
876
877 #[inline]
879 fn ancestor_header(
880 &self,
881 ancestor_block_number: BlockNumber,
882 descendant_block_root: &BlockRoot,
883 ) -> Option<Block::Header> {
884 let state = self.inner.state.read_blocking();
887 let best_number = state.best_tip().number;
888
889 let ancestor_block_offset =
890 u64::from(best_number.checked_sub(ancestor_block_number)?) as usize;
891 let ancestor_block_candidates = state.data.blocks.get(ancestor_block_offset)?;
892
893 let descendant_block_number = *state.data.block_roots.get(descendant_block_root)?;
894 if ancestor_block_number > descendant_block_number {
895 return None;
896 }
897 let descendant_block_offset =
898 u64::from(best_number.checked_sub(descendant_block_number)?) as usize;
899
900 let mut blocks_range_iter = state
902 .data
903 .blocks
904 .iter()
905 .enumerate()
906 .skip(descendant_block_offset);
907
908 let (_offset, descendant_block_candidates) = blocks_range_iter.next()?;
909 let descendant_header = descendant_block_candidates
910 .iter()
911 .find(|block| &*block.header().header().root() == descendant_block_root)?
912 .header()
913 .header();
914
915 if descendant_block_candidates.len() == 1 || ancestor_block_candidates.len() == 1 {
920 return ancestor_block_candidates
921 .iter()
922 .next()
923 .map(|block| block.header().clone());
924 }
925
926 let mut parent_block_root = &descendant_header.prefix.parent_root;
927
928 for (block_offset, parent_candidates) in blocks_range_iter {
930 let parent_header = parent_candidates
931 .iter()
932 .find(|header| &*header.header().header().root() == parent_block_root)?
933 .header();
934
935 if block_offset == ancestor_block_offset {
937 return Some(parent_header.clone());
938 }
939
940 parent_block_root = &parent_header.header().prefix.parent_root;
941 }
942
943 None
944 }
945
946 #[inline]
947 fn header(&self, block_root: &BlockRoot) -> Option<Block::Header> {
948 let state = self.inner.state.read_blocking();
951 let best_number = state.best_tip().number;
952
953 let block_number = *state.data.block_roots.get(block_root)?;
954 let block_offset = u64::from(best_number.checked_sub(block_number)?) as usize;
955 let block_candidates = state.data.blocks.get(block_offset)?;
956
957 block_candidates.iter().find_map(|block| {
958 let header = block.header();
959
960 if &*header.header().root() == block_root {
961 Some(header.clone())
962 } else {
963 None
964 }
965 })
966 }
967
968 #[inline]
969 fn header_with_details(&self, block_root: &BlockRoot) -> Option<(Block::Header, BlockDetails)> {
970 let state = self.inner.state.read_blocking();
973 let best_number = state.best_tip().number;
974
975 let block_number = *state.data.block_roots.get(block_root)?;
976 let block_offset = u64::from(best_number.checked_sub(block_number)?) as usize;
977 let block_candidates = state.data.blocks.get(block_offset)?;
978
979 block_candidates.iter().find_map(|block| {
980 let header = block.header();
981 let block_details = block.block_details().cloned()?;
982
983 if &*header.header().root() == block_root {
984 Some((header.clone(), block_details))
985 } else {
986 None
987 }
988 })
989 }
990
991 #[inline]
992 async fn block(&self, block_root: &BlockRoot) -> Result<Block, ReadBlockError> {
993 let state = self.inner.state.read().await;
994 let best_number = state.best_tip().number;
995
996 let block_number = *state
997 .data
998 .block_roots
999 .get(block_root)
1000 .ok_or(ReadBlockError::UnknownBlockRoot)?;
1001 let block_offset = u64::from(
1002 best_number
1003 .checked_sub(block_number)
1004 .expect("Known block roots always have valid block offset; qed"),
1005 ) as usize;
1006 let block_candidates = state
1007 .data
1008 .blocks
1009 .get(block_offset)
1010 .expect("Valid block offsets always have block entries; qed");
1011
1012 for block_candidate in block_candidates {
1013 let header = block_candidate.header();
1014
1015 if &*header.header().root() == block_root {
1016 return match block_candidate.full_block() {
1017 FullBlock::InMemory(block) => Ok(block.clone()),
1018 FullBlock::Persisted {
1019 header,
1020 write_location,
1021 } => {
1022 let storage_backend_adapter = state.storage_backend_adapter.read().await;
1023
1024 let storage_item = storage_backend_adapter
1025 .read_storage_item::<StorageItemTemporary>(write_location)
1026 .await?;
1027
1028 let storage_item_block = match storage_item {
1029 StorageItemTemporary::Block(storage_item_block) => storage_item_block,
1030 StorageItemTemporary::SegmentHeaders(_) => {
1031 return Err(ReadBlockError::StorageItemReadError {
1032 error: io::Error::other(
1033 "Unexpected storage item: `SegmentHeaders`",
1034 ),
1035 });
1036 }
1037 StorageItemTemporary::SuperSegmentHeaders(_) => {
1038 return Err(ReadBlockError::StorageItemReadError {
1039 error: io::Error::other(
1040 "Unexpected storage item: `SuperSegmentHeaders`",
1041 ),
1042 });
1043 }
1044 };
1045
1046 let StorageItemTemporaryBlock {
1047 header: _,
1048 body,
1049 mmr_with_block: _,
1050 system_contract_states: _,
1051 } = storage_item_block;
1052
1053 Block::from_buffers(header.buffer().clone(), body)
1054 .ok_or(ReadBlockError::FailedToDecode)
1055 }
1056 };
1057 }
1058 }
1059
1060 unreachable!("Known block root always has block candidate associated with it; qed")
1061 }
1062
1063 #[inline]
1064 fn last_segment_header(&self) -> Option<SegmentHeader> {
1065 let state = self.inner.state.read_blocking();
1068 state.segment_headers_cache.last_segment_header()
1069 }
1070
1071 #[inline]
1072 fn get_segment_header(&self, segment_index: LocalSegmentIndex) -> Option<SegmentHeader> {
1073 let state = self.inner.state.read_blocking();
1076
1077 state
1078 .segment_headers_cache
1079 .get_segment_header(segment_index)
1080 }
1081
1082 fn segment_headers_for_block(&self, block_number: BlockNumber) -> Vec<SegmentHeader> {
1083 let state = self.inner.state.read_blocking();
1086
1087 let Some(last_local_segment_index) = state.segment_headers_cache.max_local_segment_index()
1088 else {
1089 return Vec::new();
1091 };
1092
1093 if Block::Block::SHARD_KIND == RealShardKind::BeaconChain
1095 && block_number == BlockNumber::ONE
1096 {
1097 return vec![
1100 state
1101 .segment_headers_cache
1102 .get_segment_header(LocalSegmentIndex::ZERO)
1103 .expect("Segment headers are stored in monotonically increasing order; qed"),
1104 ];
1105 }
1106
1107 if last_local_segment_index == LocalSegmentIndex::ZERO {
1108 return Vec::new();
1110 }
1111
1112 let mut current_local_segment_index = last_local_segment_index;
1113 loop {
1114 let current_segment_header = state
1117 .segment_headers_cache
1118 .get_segment_header(current_local_segment_index)
1119 .expect("Segment headers are stored in monotonically increasing order; qed");
1120
1121 let target_block_number = current_segment_header.last_archived_block.number()
1123 + BlockNumber::ONE
1124 + self.inner.options.block_confirmation_depth;
1125 if target_block_number == block_number {
1126 let mut headers_for_block = vec![current_segment_header];
1127
1128 let last_archived_block_number = current_segment_header.last_archived_block.number;
1130 let mut local_segment_index = current_local_segment_index - LocalSegmentIndex::ONE;
1131
1132 while let Some(segment_header) = state
1133 .segment_headers_cache
1134 .get_segment_header(local_segment_index)
1135 {
1136 if segment_header.last_archived_block.number == last_archived_block_number {
1137 headers_for_block.insert(0, segment_header);
1138 local_segment_index -= LocalSegmentIndex::ONE;
1139 } else {
1140 break;
1141 }
1142 }
1143
1144 return headers_for_block;
1145 }
1146
1147 if target_block_number > block_number {
1149 if current_local_segment_index > LocalSegmentIndex::ONE {
1151 current_local_segment_index -= LocalSegmentIndex::ONE
1152 } else {
1153 break;
1154 }
1155 } else {
1156 return Vec::new();
1158 }
1159 }
1160
1161 Vec::new()
1163 }
1164}
1165
1166impl<Block, StorageBackend> ChainInfoWrite<Block> for ClientDatabase<Block, StorageBackend>
1167where
1168 Block: GenericOwnedBlock,
1169 StorageBackend: ClientDatabaseStorageBackend,
1170{
1171 async fn persist_block(
1172 &self,
1173 block: Block,
1174 block_details: BlockDetails,
1175 ) -> Result<(), PersistBlockError> {
1176 let mut state = self.inner.state.write().await;
1177 let best_number = state.best_tip().number;
1178
1179 let header = block.header().header();
1180
1181 let block_number = header.prefix.number;
1182
1183 if best_number == BlockNumber::ZERO && block_number != BlockNumber::ONE {
1184 Self::insert_first_block(&mut state.data, block, block_details);
1186
1187 return Ok(());
1188 }
1189
1190 if block_number == best_number + BlockNumber::ONE {
1191 return Self::insert_new_best_block(state, &self.inner, block, block_details).await;
1192 }
1193
1194 let block_offset = u64::from(
1195 best_number
1196 .checked_sub(block_number)
1197 .ok_or(PersistBlockError::MissingParent)?,
1198 ) as usize;
1199
1200 if block_offset >= u64::from(self.inner.options.block_confirmation_depth) as usize {
1201 return Err(PersistBlockError::OutsideAcceptableRange);
1202 }
1203
1204 let state = &mut *state;
1205
1206 let block_forks = state.data.blocks.get_mut(block_offset).ok_or_else(|| {
1207 error!(
1208 %block_number,
1209 %block_offset,
1210 "Failed to store block fork, header offset is missing despite being within \
1211 acceptable range"
1212 );
1213
1214 PersistBlockError::OutsideAcceptableRange
1215 })?;
1216
1217 for (index, fork_tip) in state.data.fork_tips.iter_mut().enumerate() {
1218 if fork_tip.root == header.prefix.parent_root {
1220 state.data.fork_tips.remove(index);
1221 break;
1222 }
1223 }
1224
1225 let block_root = *header.root();
1226 state.data.fork_tips.insert(
1229 1,
1230 ForkTip {
1231 number: block_number,
1232 root: block_root,
1233 },
1234 );
1235 state.data.block_roots.insert(block_root, block_number);
1236 let beacon_chain_block_details = <dyn Any>::downcast_ref::<OwnedBeaconChainBlock>(&block)
1237 .map(|block| BeaconChainBlockDetails::from_body(block.body.body()));
1238 block_forks.push(ClientDatabaseBlock::InMemory {
1239 block,
1240 block_details,
1241 beacon_chain_block_details,
1242 });
1243
1244 Self::prune_outdated_fork_tips(block_number, &mut state.data, &self.inner.options);
1245
1246 Ok(())
1247 }
1248
1249 async fn persist_segment_headers(
1250 &self,
1251 segment_headers: Vec<SegmentHeader>,
1252 ) -> Result<(), PersistSegmentHeadersError> {
1253 let mut state = self.inner.state.write().await;
1254
1255 let added_segment_headers = state
1256 .segment_headers_cache
1257 .add_segment_headers(segment_headers)?;
1258
1259 if added_segment_headers.is_empty() {
1260 return Ok(());
1261 }
1262
1263 let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1268
1269 let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1270
1271 storage_backend_adapter
1272 .write_storage_item(StorageItemTemporary::SegmentHeaders(
1273 StorageItemTemporarySegmentHeaders {
1274 segment_headers: added_segment_headers,
1275 },
1276 ))
1277 .await?;
1278
1279 Ok(())
1280 }
1281}
1282
1283impl<StorageBackend> BeaconChainInfo for ClientDatabase<OwnedBeaconChainBlock, StorageBackend>
1284where
1285 StorageBackend: ClientDatabaseStorageBackend,
1286{
1287 fn shard_segment_roots(
1288 &self,
1289 block_number: BlockNumber,
1290 ) -> Result<StdArc<[ShardSegmentRoot]>, ShardSegmentRootsError> {
1291 let state = self.inner.state.read_blocking();
1294 let best_number = state.best_tip().number;
1295
1296 let block_offset = u64::from(
1297 best_number
1298 .checked_sub(block_number)
1299 .ok_or(ShardSegmentRootsError::BlockMissing { block_number })?,
1300 ) as usize;
1301
1302 let block = state
1303 .data
1304 .blocks
1305 .get(block_offset)
1306 .ok_or(ShardSegmentRootsError::BlockMissing { block_number })?
1307 .first()
1308 .expect("There is always at least one block candidate; qed");
1309
1310 Ok(StdArc::clone(
1311 &block
1312 .beacon_chain_block_details()
1313 .as_ref()
1314 .expect("Always present in the beacon chain block; qed")
1315 .shard_segment_roots,
1316 ))
1317 }
1318
1319 #[inline]
1320 fn last_super_segment_header(&self) -> Option<SuperSegmentHeader> {
1321 let state = self.inner.state.read_blocking();
1324 state
1325 .super_segment_headers_cache
1326 .last_super_segment_header()
1327 }
1328
1329 #[inline]
1330 fn previous_super_segment_header(
1331 &self,
1332 target_block_number: BlockNumber,
1333 ) -> Option<SuperSegmentHeader> {
1334 let state = self.inner.state.read_blocking();
1337
1338 state
1339 .super_segment_headers_cache
1340 .previous_super_segment_header(target_block_number)
1341 }
1342
1343 #[inline]
1344 fn get_super_segment_header(
1345 &self,
1346 super_segment_index: SuperSegmentIndex,
1347 ) -> Option<SuperSegmentHeader> {
1348 let state = self.inner.state.read_blocking();
1351
1352 state
1353 .super_segment_headers_cache
1354 .get_super_segment_header(super_segment_index)
1355 }
1356
1357 fn get_super_segment_header_for_segment_index(
1358 &self,
1359 segment_index: SegmentIndex,
1360 ) -> Option<SuperSegmentHeader> {
1361 let state = self.inner.state.read_blocking();
1364
1365 state
1366 .super_segment_headers_cache
1367 .get_super_segment_header_for_segment_index(segment_index)
1368 }
1369}
1370
1371impl<StorageBackend> BeaconChainInfoWrite for ClientDatabase<OwnedBeaconChainBlock, StorageBackend>
1372where
1373 StorageBackend: ClientDatabaseStorageBackend,
1374{
1375 async fn persist_super_segment_header(
1376 &self,
1377 super_segment_header: SuperSegmentHeader,
1378 ) -> Result<bool, PersistSuperSegmentHeadersError> {
1379 let mut state = self.inner.state.write().await;
1380
1381 let added_super_segment_headers = state
1382 .super_segment_headers_cache
1383 .add_super_segment_headers(vec![super_segment_header])?;
1384
1385 if added_super_segment_headers.is_empty() {
1386 return Ok(false);
1387 }
1388
1389 let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1394
1395 let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1396
1397 storage_backend_adapter
1398 .write_storage_item(StorageItemTemporary::SuperSegmentHeaders(
1399 StorageItemTemporarySuperSegmentHeaders {
1400 super_segment_headers: added_super_segment_headers,
1401 },
1402 ))
1403 .await?;
1404
1405 Ok(true)
1406 }
1407
1408 async fn persist_super_segment_headers(
1409 &self,
1410 super_segment_headers: Vec<SuperSegmentHeader>,
1411 ) -> Result<(), PersistSuperSegmentHeadersError> {
1412 let mut state = self.inner.state.write().await;
1413
1414 let added_super_segment_headers = state
1415 .super_segment_headers_cache
1416 .add_super_segment_headers(super_segment_headers)?;
1417
1418 if added_super_segment_headers.is_empty() {
1419 return Ok(());
1420 }
1421
1422 let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1427
1428 let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1429
1430 storage_backend_adapter
1431 .write_storage_item(StorageItemTemporary::SuperSegmentHeaders(
1432 StorageItemTemporarySuperSegmentHeaders {
1433 super_segment_headers: added_super_segment_headers,
1434 },
1435 ))
1436 .await?;
1437
1438 Ok(())
1439 }
1440}
1441
1442impl<Block, StorageBackend> ClientDatabase<Block, StorageBackend>
1443where
1444 Block: GenericOwnedBlock,
1445 StorageBackend: ClientDatabaseStorageBackend,
1446{
1447 pub async fn open<GBB>(
1451 options: ClientDatabaseOptions<GBB, StorageBackend>,
1452 ) -> Result<Self, ClientDatabaseError>
1453 where
1454 GBB: FnOnce() -> GenesisBlockBuilderResult<Block>,
1455 {
1456 let ClientDatabaseOptions {
1457 write_buffer_size,
1458 block_confirmation_depth,
1459 soft_confirmation_depth,
1460 max_fork_tips,
1461 max_fork_tip_distance,
1462 genesis_block_builder,
1463 storage_backend,
1464 } = options;
1465 if soft_confirmation_depth >= block_confirmation_depth {
1466 return Err(ClientDatabaseError::InvalidSoftConfirmationDepth);
1467 }
1468
1469 if max_fork_tip_distance > block_confirmation_depth {
1470 return Err(ClientDatabaseError::InvalidMaxForkTipDistance);
1471 }
1472
1473 let mut state_data = StateData {
1474 fork_tips: VecDeque::new(),
1475 block_roots: HashMap::default(),
1476 blocks: VecDeque::new(),
1477 };
1478 let mut segment_headers_cache = SegmentHeadersCache {
1479 segment_headers_cache: Vec::new(),
1480 };
1481 let mut super_segment_headers_cache = SuperSegmentHeadersCache {
1482 super_segment_headers_cache: Vec::new(),
1483 };
1484
1485 let options = ClientDatabaseInnerOptions {
1486 block_confirmation_depth,
1487 soft_confirmation_depth,
1488 max_fork_tips,
1489 max_fork_tip_distance,
1490 };
1491
1492 let storage_item_handlers = StorageItemHandlers {
1493 permanent: |_arg| {
1494 Ok(())
1496 },
1497 temporary: |arg| {
1498 let StorageItemHandlerArg {
1499 storage_item,
1500 page_offset,
1501 num_pages,
1502 } = arg;
1503 let storage_item_block = match storage_item {
1504 StorageItemTemporary::Block(storage_item_block) => storage_item_block,
1505 StorageItemTemporary::SegmentHeaders(segment_headers) => {
1506 let num_segment_headers = segment_headers.segment_headers.len();
1507 return match segment_headers_cache
1508 .add_segment_headers(segment_headers.segment_headers)
1509 {
1510 Ok(_) => Ok(()),
1511 Err(error) => {
1512 error!(
1513 %page_offset,
1514 %num_segment_headers,
1515 %error,
1516 "Failed to add segment headers from storage item"
1517 );
1518
1519 Err(ClientDatabaseError::InvalidSegmentHeaders { page_offset })
1520 }
1521 };
1522 }
1523 StorageItemTemporary::SuperSegmentHeaders(super_segment_headers) => {
1524 let num_super_segment_headers =
1525 super_segment_headers.super_segment_headers.len();
1526 return match super_segment_headers_cache
1527 .add_super_segment_headers(super_segment_headers.super_segment_headers)
1528 {
1529 Ok(_) => Ok(()),
1530 Err(error) => {
1531 error!(
1532 %page_offset,
1533 %num_super_segment_headers,
1534 %error,
1535 "Failed to add segment headers from storage item"
1536 );
1537
1538 Err(ClientDatabaseError::InvalidSegmentHeaders { page_offset })
1539 }
1540 };
1541 }
1542 };
1543
1544 let StorageItemTemporaryBlock {
1547 header,
1548 body,
1549 mmr_with_block,
1550 system_contract_states,
1551 } = storage_item_block;
1552
1553 let header = Block::Header::from_buffer(header).map_err(|_buffer| {
1554 error!(%page_offset, "Failed to decode block header from bytes");
1555
1556 ClientDatabaseError::InvalidBlock { page_offset }
1557 })?;
1558 let body = Block::Body::from_buffer(body).map_err(|_buffer| {
1559 error!(%page_offset, "Failed to decode block body from bytes");
1560
1561 ClientDatabaseError::InvalidBlock { page_offset }
1562 })?;
1563
1564 let block_root = *header.header().root();
1565 let block_number = header.header().prefix.number;
1566
1567 state_data.block_roots.insert(block_root, block_number);
1568
1569 let maybe_best_number = state_data
1570 .blocks
1571 .front()
1572 .and_then(|block_forks| block_forks.first())
1573 .map(|best_block| {
1574 let header: &Block::Header = best_block.header();
1576
1577 header.header().prefix.number
1578 });
1579
1580 let block_offset = if let Some(best_number) = maybe_best_number {
1581 if block_number <= best_number {
1582 u64::from(best_number - block_number) as usize
1583 } else {
1584 if block_number - best_number != BlockNumber::ONE {
1586 error!(
1587 %page_offset,
1588 %best_number,
1589 %block_number,
1590 "Invalid new best block number, it must be only one block \
1591 higher than the best block"
1592 );
1593
1594 return Err(ClientDatabaseError::InvalidBlock { page_offset });
1595 }
1596
1597 state_data.blocks.push_front(SmallVec::new());
1598 0
1600 }
1601 } else {
1602 state_data.blocks.push_front(SmallVec::new());
1603 0
1605 };
1606
1607 let block_forks = match state_data.blocks.get_mut(block_offset) {
1608 Some(block_forks) => block_forks,
1609 None => {
1610 return Ok(());
1614 }
1615 };
1616
1617 let beacon_chain_block_details =
1619 <dyn Any>::downcast_ref::<OwnedBeaconChainBody>(&body)
1620 .map(|body| BeaconChainBlockDetails::from_body(body.body()));
1621 block_forks.push(ClientDatabaseBlock::Persisted {
1622 header,
1623 block_details: BlockDetails {
1624 mmr_with_block,
1625 system_contract_states,
1626 },
1627 beacon_chain_block_details,
1628 write_location: WriteLocation {
1629 page_offset,
1630 num_pages,
1631 },
1632 });
1633
1634 if block_offset == 0 && block_forks.len() == 1 {
1637 Self::confirm_canonical_block(block_number, &mut state_data, &options);
1638 }
1639
1640 Ok(())
1641 },
1642 };
1643
1644 let storage_backend_adapter =
1645 StorageBackendAdapter::open(write_buffer_size, storage_item_handlers, storage_backend)
1646 .await?;
1647
1648 if let Some(best_block) = state_data.blocks.front().and_then(|block_forks| {
1649 block_forks.last()
1652 }) {
1653 let header: &Block::Header = best_block.header();
1655 let header = header.header();
1656 let block_number = header.prefix.number;
1657 let block_root = *header.root();
1658
1659 if !Self::adjust_ancestor_block_forks(&mut state_data.blocks, block_root) {
1660 return Err(ClientDatabaseError::FailedToAdjustAncestorBlockForks);
1661 }
1662
1663 state_data.fork_tips.push_front(ForkTip {
1665 number: block_number,
1666 root: block_root,
1667 });
1668 } else {
1669 let GenesisBlockBuilderResult {
1670 block,
1671 system_contract_states,
1672 } = genesis_block_builder();
1673
1674 let header = block.header().header();
1676 let block_number = header.prefix.number;
1677 let block_root = *header.root();
1678
1679 state_data.fork_tips.push_front(ForkTip {
1680 number: block_number,
1681 root: block_root,
1682 });
1683 state_data.block_roots.insert(block_root, block_number);
1684 let beacon_chain_block_details =
1685 <dyn Any>::downcast_ref::<OwnedBeaconChainBlock>(&block)
1686 .map(|block| BeaconChainBlockDetails::from_body(block.body.body()));
1687 state_data
1688 .blocks
1689 .push_front(smallvec![ClientDatabaseBlock::InMemory {
1690 block,
1691 block_details: BlockDetails {
1692 system_contract_states,
1693 mmr_with_block: Arc::new({
1694 let mut mmr = BlockMerkleMountainRange::new();
1695 mmr.add_leaf(&block_root);
1696 mmr
1697 })
1698 },
1699 beacon_chain_block_details,
1700 }]);
1701 }
1702
1703 let state = State {
1704 data: state_data,
1705 segment_headers_cache,
1706 super_segment_headers_cache,
1707 storage_backend_adapter: AsyncRwLock::new(storage_backend_adapter),
1708 };
1709
1710 let inner = Inner {
1711 state: AsyncRwLock::new(state),
1712 options,
1713 };
1714
1715 Ok(Self {
1716 inner: Arc::new(inner),
1717 })
1718 }
1719
1720 pub async fn format(
1722 storage_backend: &StorageBackend,
1723 options: ClientDatabaseFormatOptions,
1724 ) -> Result<(), ClientDatabaseFormatError> {
1725 StorageBackendAdapter::format(storage_backend, options).await
1726 }
1727
1728 fn insert_first_block(state: &mut StateData<Block>, block: Block, block_details: BlockDetails) {
1729 let header = block.header().header();
1731 let block_number = header.prefix.number;
1732 let block_root = *header.root();
1733
1734 state.fork_tips.clear();
1735 state.fork_tips.push_front(ForkTip {
1736 number: block_number,
1737 root: block_root,
1738 });
1739 state.block_roots.clear();
1740 state.block_roots.insert(block_root, block_number);
1741 state.blocks.clear();
1742 let beacon_chain_block_details = <dyn Any>::downcast_ref::<OwnedBeaconChainBlock>(&block)
1743 .map(|block| BeaconChainBlockDetails::from_body(block.body.body()));
1744 state
1745 .blocks
1746 .push_front(smallvec![ClientDatabaseBlock::InMemory {
1747 block,
1748 block_details,
1749 beacon_chain_block_details,
1750 }]);
1751 }
1752
1753 async fn insert_new_best_block(
1754 mut state: AsyncRwLockWriteGuard<'_, State<Block, StorageBackend>>,
1755 inner: &Inner<Block, StorageBackend>,
1756 block: Block,
1757 block_details: BlockDetails,
1758 ) -> Result<(), PersistBlockError> {
1759 let header = block.header().header();
1760 let block_number = header.prefix.number;
1761 let block_root = *header.root();
1762 let parent_root = header.prefix.parent_root;
1763
1764 if !Self::adjust_ancestor_block_forks(&mut state.data.blocks, parent_root) {
1767 return Err(PersistBlockError::MissingParent);
1768 }
1769
1770 {
1772 for (index, fork_tip) in state.data.fork_tips.iter_mut().enumerate() {
1773 if fork_tip.root == parent_root {
1775 state.data.fork_tips.remove(index);
1776 break;
1777 }
1778 }
1779
1780 state.data.fork_tips.push_front(ForkTip {
1781 number: block_number,
1782 root: block_root,
1783 });
1784 state.data.block_roots.insert(block_root, block_number);
1785 let beacon_chain_block_details =
1786 <dyn Any>::downcast_ref::<OwnedBeaconChainBlock>(&block)
1787 .map(|block| BeaconChainBlockDetails::from_body(block.body.body()));
1788 state
1789 .data
1790 .blocks
1791 .push_front(smallvec![ClientDatabaseBlock::InMemory {
1792 block,
1793 block_details: block_details.clone(),
1794 beacon_chain_block_details,
1795 }]);
1796 }
1797
1798 let options = &inner.options;
1799
1800 Self::confirm_canonical_block(block_number, &mut state.data, options);
1801 Self::prune_outdated_fork_tips(block_number, &mut state.data, options);
1802
1803 let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1808
1809 let mut blocks_to_persist = Vec::new();
1810 for block_offset in u64::from(options.soft_confirmation_depth) as usize.. {
1811 let Some(fork_blocks) = state.data.blocks.get(block_offset) else {
1812 break;
1813 };
1814
1815 let len_before = blocks_to_persist.len();
1816 fork_blocks
1817 .iter()
1818 .enumerate()
1819 .filter_map(|(fork_offset, client_database_block)| {
1820 match client_database_block {
1821 ClientDatabaseBlock::InMemory {
1822 block,
1823 block_details,
1824 beacon_chain_block_details: _,
1825 } => Some(BlockToPersist {
1826 block_offset,
1827 fork_offset,
1828 block,
1829 block_details,
1830 }),
1831 ClientDatabaseBlock::Persisted { .. }
1832 | ClientDatabaseBlock::PersistedConfirmed { .. } => {
1833 None
1835 }
1836 }
1837 })
1838 .collect_into(&mut blocks_to_persist);
1839
1840 if blocks_to_persist.len() == len_before {
1841 break;
1842 }
1843 }
1844
1845 let mut persisted_blocks = Vec::with_capacity(blocks_to_persist.len());
1847 {
1848 let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1849
1850 for block_to_persist in blocks_to_persist.into_iter().rev() {
1851 let BlockToPersist {
1852 block_offset,
1853 fork_offset,
1854 block,
1855 block_details,
1856 } = block_to_persist;
1857
1858 let write_location = storage_backend_adapter
1859 .write_storage_item(StorageItemTemporary::Block(StorageItemTemporaryBlock {
1860 header: block.header().buffer().clone(),
1861 body: block.body().buffer().clone(),
1862 mmr_with_block: Arc::clone(&block_details.mmr_with_block),
1863 system_contract_states: StdArc::clone(
1864 &block_details.system_contract_states,
1865 ),
1866 }))
1867 .await?;
1868
1869 persisted_blocks.push(PersistedBlock {
1870 block_offset,
1871 fork_offset,
1872 write_location,
1873 });
1874 }
1875 }
1876
1877 let mut state = RwLockUpgradableReadGuard::upgrade(state).await;
1879 for persisted_block in persisted_blocks {
1880 let PersistedBlock {
1881 block_offset,
1882 fork_offset,
1883 write_location,
1884 } = persisted_block;
1885
1886 let block = state
1887 .data
1888 .blocks
1889 .get_mut(block_offset)
1890 .expect("Still holding the same lock since last check; qed")
1891 .get_mut(fork_offset)
1892 .expect("Still holding the same lock since last check; qed");
1893
1894 replace_with_or_abort(block, |block| {
1895 if let ClientDatabaseBlock::InMemory {
1896 block,
1897 block_details,
1898 beacon_chain_block_details,
1899 } = block
1900 {
1901 let (header, _body) = block.split();
1902
1903 ClientDatabaseBlock::Persisted {
1904 header,
1905 block_details,
1906 beacon_chain_block_details,
1907 write_location,
1908 }
1909 } else {
1910 unreachable!("Still holding the same lock since last check; qed");
1911 }
1912 });
1913 }
1914
1915 Ok(())
1919 }
1920
1921 #[must_use]
1926 fn adjust_ancestor_block_forks(
1927 blocks: &mut VecDeque<SmallVec<[ClientDatabaseBlock<Block>; 2]>>,
1928 mut parent_block_root: BlockRoot,
1929 ) -> bool {
1930 let mut ancestor_blocks = blocks.iter_mut();
1931
1932 loop {
1933 if ancestor_blocks.len() == 1 {
1934 break;
1936 }
1937
1938 let Some(parent_blocks) = ancestor_blocks.next() else {
1939 break;
1941 };
1942
1943 let Some(fork_offset_parent_block_root) =
1944 parent_blocks
1945 .iter()
1946 .enumerate()
1947 .find_map(|(fork_offset, fork_block)| {
1948 let fork_header = fork_block.header().header();
1949 if *fork_header.root() == parent_block_root {
1950 Some((fork_offset, fork_header.prefix.parent_root))
1951 } else {
1952 None
1953 }
1954 })
1955 else {
1956 return false;
1957 };
1958
1959 let fork_offset;
1960 (fork_offset, parent_block_root) = fork_offset_parent_block_root;
1961
1962 parent_blocks.swap(0, fork_offset);
1963 }
1964
1965 true
1966 }
1967
1968 fn prune_outdated_fork_tips(
1975 best_number: BlockNumber,
1976 state: &mut StateData<Block>,
1977 options: &ClientDatabaseInnerOptions,
1978 ) {
1979 let state = &mut *state;
1980
1981 let mut candidate_forks_to_remove = Vec::with_capacity(options.max_fork_tips.get());
1984
1985 state.fork_tips.retain(|fork_tip| {
1987 if best_number - fork_tip.number > options.max_fork_tip_distance {
1988 candidate_forks_to_remove.push(*fork_tip);
1989 false
1990 } else {
1991 true
1992 }
1993 });
1994 if state.fork_tips.len() > options.max_fork_tips.get() {
1996 state
1997 .fork_tips
1998 .drain(options.max_fork_tips.get()..)
1999 .collect_into(&mut candidate_forks_to_remove);
2000 }
2001
2002 candidate_forks_to_remove
2004 .retain(|fork_tip| !Self::prune_outdated_fork(best_number, fork_tip, state));
2005 state.fork_tips.extend(candidate_forks_to_remove);
2007 }
2008
2009 #[must_use]
2012 fn prune_outdated_fork(
2013 best_number: BlockNumber,
2014 fork_tip: &ForkTip,
2015 state: &mut StateData<Block>,
2016 ) -> bool {
2017 let block_offset = u64::from(best_number - fork_tip.number) as usize;
2018
2019 let mut block_root_to_prune = fork_tip.root;
2021 let mut pruned_tip = false;
2022 for block_offset in block_offset.. {
2023 let Some(fork_blocks) = state.blocks.get_mut(block_offset) else {
2024 if !pruned_tip {
2025 error!(
2026 %best_number,
2027 ?fork_tip,
2028 block_offset,
2029 "Block offset was not present in the database, this is an implementation \
2030 bug #1"
2031 );
2032 }
2033 break;
2035 };
2036
2037 if fork_blocks.len() == 1 {
2038 if !pruned_tip {
2039 error!(
2040 %best_number,
2041 ?fork_tip,
2042 block_offset,
2043 "Block offset was not present in the database, this is an implementation \
2044 bug #2"
2045 );
2046 }
2047
2048 break;
2050 }
2051
2052 let Some((fork_offset, block)) = fork_blocks
2053 .iter()
2054 .enumerate()
2055 .skip(1)
2057 .find(|(_fork_offset, block)| {
2058 *block.header().header().root() == block_root_to_prune
2059 })
2060 else {
2061 if !pruned_tip {
2062 error!(
2063 %best_number,
2064 ?fork_tip,
2065 block_offset,
2066 "Block offset was not present in the database, this is an implementation \
2067 bug #3"
2068 );
2069 }
2070
2071 break;
2073 };
2074
2075 if block.header().ref_count() > 1 {
2077 break;
2078 }
2079
2080 match block {
2082 ClientDatabaseBlock::InMemory { .. } => {
2083 }
2085 ClientDatabaseBlock::Persisted { .. }
2086 | ClientDatabaseBlock::PersistedConfirmed { .. } => {
2087 pruned_tip = true;
2089 break;
2090 }
2091 }
2092
2093 state.block_roots.get_mut(&block_root_to_prune);
2094 block_root_to_prune = block.header().header().prefix.parent_root;
2095 fork_blocks.swap_remove(fork_offset);
2096
2097 pruned_tip = true;
2098 }
2099
2100 pruned_tip
2101 }
2102
2103 fn confirm_canonical_block(
2106 best_number: BlockNumber,
2107 state_data: &mut StateData<Block>,
2108 options: &ClientDatabaseInnerOptions,
2109 ) {
2110 let block_offset = u64::from(options.block_confirmation_depth + BlockNumber::ONE) as usize;
2114
2115 let Some(fork_blocks) = state_data.blocks.get_mut(block_offset) else {
2116 return;
2118 };
2119
2120 {
2122 let Some(canonical_block) = fork_blocks.first_mut() else {
2123 error!(
2124 %best_number,
2125 block_offset,
2126 "Have not found a canonical block to confirm, this is an implementation bug"
2127 );
2128 return;
2129 };
2130
2131 replace_with_or_abort(canonical_block, |block| match block {
2132 ClientDatabaseBlock::InMemory { .. } => {
2133 error!(
2134 %best_number,
2135 block_offset,
2136 header = ?block.header(),
2137 "Block to be confirmed must not be in memory, this is an implementation bug"
2138 );
2139 block
2140 }
2141 ClientDatabaseBlock::Persisted {
2142 header,
2143 block_details: _,
2144 beacon_chain_block_details,
2145 write_location,
2146 } => ClientDatabaseBlock::PersistedConfirmed {
2147 header,
2148 beacon_chain_block_details,
2149 write_location,
2150 },
2151 ClientDatabaseBlock::PersistedConfirmed { .. } => {
2152 error!(
2153 %best_number,
2154 block_offset,
2155 header = ?block.header(),
2156 "Block to be confirmed must not be confirmed yet, this is an \
2157 implementation bug"
2158 );
2159 block
2160 }
2161 });
2162 }
2163
2164 let mut block_roots_to_prune = fork_blocks
2166 .drain(1..)
2167 .map(|block| *block.header().header().root())
2168 .collect::<Vec<_>>();
2169 let mut current_block_offset = block_offset;
2170 while !block_roots_to_prune.is_empty() {
2171 state_data
2173 .fork_tips
2174 .retain(|fork_tip| !block_roots_to_prune.contains(&fork_tip.root));
2175
2176 for block_root in &block_roots_to_prune {
2178 state_data.block_roots.remove(block_root);
2179 }
2180
2181 if let Some(next_block_offset) = current_block_offset.checked_sub(1) {
2183 current_block_offset = next_block_offset;
2184 } else {
2185 break;
2187 }
2188
2189 let fork_blocks = state_data
2190 .blocks
2191 .get_mut(current_block_offset)
2192 .expect("Lower block offset always exists; qed");
2193
2194 block_roots_to_prune = fork_blocks
2196 .drain_filter(|block| {
2197 let header = block.header().header();
2198
2199 block_roots_to_prune.contains(&header.prefix.parent_root)
2200 })
2201 .map(|block| *block.header().header().root())
2202 .collect();
2203 }
2204 }
2205}