1#![expect(incomplete_features, reason = "generic_const_exprs")]
44#![feature(generic_const_exprs)]
47#![feature(
48 const_block_items,
49 const_convert,
50 const_trait_impl,
51 default_field_values,
52 get_mut_unchecked,
53 iter_collect_into,
54 maybe_uninit_fill
55)]
56
57mod page_group;
58pub mod storage_backend;
59mod storage_backend_adapter;
60
61use crate::page_group::temporary::StorageItemTemporary;
62use crate::page_group::temporary::block::StorageItemTemporaryBlock;
63use crate::page_group::temporary::segment_headers::StorageItemTemporarySegmentHeaders;
64use crate::page_group::temporary::super_segment_headers::StorageItemTemporarySuperSegmentHeaders;
65use crate::storage_backend::ClientDatabaseStorageBackend;
66use crate::storage_backend_adapter::{
67 StorageBackendAdapter, StorageItemHandlerArg, StorageItemHandlers, WriteLocation,
68};
69use ab_client_api::{
70 BeaconChainInfo, BeaconChainInfoWrite, BlockDetails, BlockMerkleMountainRange, ChainInfo,
71 ChainInfoWrite, ContractSlotState, PersistBlockError, PersistSegmentHeadersError,
72 PersistSuperSegmentHeadersError, ReadBlockError, ShardSegmentRoot, ShardSegmentRootsError,
73};
74use ab_core_primitives::block::body::BeaconChainBody;
75use ab_core_primitives::block::body::owned::{GenericOwnedBlockBody, OwnedBeaconChainBody};
76use ab_core_primitives::block::header::GenericBlockHeader;
77use ab_core_primitives::block::header::owned::GenericOwnedBlockHeader;
78use ab_core_primitives::block::owned::{GenericOwnedBlock, OwnedBeaconChainBlock};
79use ab_core_primitives::block::{BlockNumber, BlockRoot, GenericBlock};
80use ab_core_primitives::segments::{
81 LocalSegmentIndex, SegmentHeader, SegmentIndex, SuperSegmentHeader, SuperSegmentIndex,
82};
83use ab_core_primitives::shard::RealShardKind;
84use ab_io_type::trivial_type::TrivialType;
85use async_lock::{
86 RwLock as AsyncRwLock, RwLockUpgradableReadGuard, RwLockWriteGuard as AsyncRwLockWriteGuard,
87};
88use rand::rngs::SysError;
89use rclite::Arc;
90use replace_with::replace_with_or_abort;
91use smallvec::{SmallVec, smallvec};
92use std::any::Any;
93use std::collections::{HashMap, VecDeque};
94use std::hash::{BuildHasherDefault, Hasher};
95use std::num::{NonZeroU32, NonZeroUsize};
96use std::ops::Deref;
97use std::sync::Arc as StdArc;
98use std::{fmt, io};
99use tracing::error;
100
101#[derive(Debug, Copy, Clone, Eq, PartialEq, TrivialType)]
103#[repr(C)]
104pub struct DatabaseId([u8; 32]);
105
106impl Deref for DatabaseId {
107 type Target = [u8; 32];
108
109 #[inline(always)]
110 fn deref(&self) -> &Self::Target {
111 &self.0
112 }
113}
114
115impl AsRef<[u8]> for DatabaseId {
116 #[inline(always)]
117 fn as_ref(&self) -> &[u8] {
118 &self.0
119 }
120}
121
122impl DatabaseId {
123 #[inline(always)]
124 pub const fn new(bytes: [u8; 32]) -> Self {
125 Self(bytes)
126 }
127}
128
129#[derive(Default)]
130struct BlockRootHasher(u64);
131
132impl Hasher for BlockRootHasher {
133 #[inline(always)]
134 fn finish(&self) -> u64 {
135 self.0
136 }
137
138 #[inline(always)]
139 fn write(&mut self, bytes: &[u8]) {
140 let Some(state) = bytes.as_chunks().0.first().copied().map(u64::from_le_bytes) else {
141 return;
142 };
143
144 self.0 = state;
145 }
146}
147
148#[derive(Debug)]
149pub struct GenesisBlockBuilderResult<Block> {
150 pub block: Block,
152 pub system_contract_states: StdArc<[ContractSlotState]>,
154}
155
156#[derive(Debug, Copy, Clone)]
158pub struct ClientDatabaseOptions<GBB, StorageBackend> {
159 pub write_buffer_size: usize = 5,
167 pub block_confirmation_depth: BlockNumber,
173 pub soft_confirmation_depth: BlockNumber = BlockNumber::from(3),
187 pub max_fork_tips: NonZeroUsize = NonZeroUsize::new(3).expect("Not zero; qed"),
198 pub max_fork_tip_distance: BlockNumber = BlockNumber::from(5),
209 pub genesis_block_builder: GBB,
212 pub storage_backend: StorageBackend,
214}
215
216#[derive(Debug, Copy, Clone)]
218pub struct ClientDatabaseFormatOptions {
219 pub page_group_size: NonZeroU32,
236 pub force: bool,
240}
241
242#[derive(Debug, thiserror::Error)]
243pub enum ClientDatabaseError {
244 #[error("Invalid soft confirmation depth, it must be smaller than confirmation depth k")]
246 InvalidSoftConfirmationDepth,
247 #[error("Invalid max fork tip distance, it must be smaller or equal to confirmation depth k")]
249 InvalidMaxForkTipDistance,
250 #[error("Storage backend has canceled read request")]
252 ReadRequestCancelled,
253 #[error("Storage backend read error: {error}")]
255 ReadError {
256 error: io::Error,
258 },
259 #[error("Unsupported database version: {database_version}")]
261 UnsupportedDatabaseVersion {
262 database_version: u8,
264 },
265 #[error("Page group size is too small ({page_group_size}), must be at least two pages")]
267 PageGroupSizeTooSmall {
268 page_group_size: u32,
270 },
271 #[error(
273 "Unexpected sequence number {actual} at page offset {page_offset} (expected \
274 {expected})"
275 )]
276 UnexpectedSequenceNumber {
277 actual: u64,
279 expected: u64,
281 page_offset: u32,
283 },
284 #[error("Unexpected storage item at offset {page_offset}: {storage_item:?}")]
286 UnexpectedStorageItem {
287 storage_item: Box<dyn fmt::Debug + Send + Sync>,
289 page_offset: u32,
291 },
292 #[error("Invalid block at offset {page_offset}")]
294 InvalidBlock {
295 page_offset: u32,
297 },
298 #[error("Invalid segment headers at offset {page_offset}")]
300 InvalidSegmentHeaders {
301 page_offset: u32,
303 },
304 #[error("Failed to adjust ancestor block forks")]
306 FailedToAdjustAncestorBlockForks,
307 #[error("Database is not formatted yet")]
309 Unformatted,
310 #[error("Non-permanent first page group")]
312 NonPermanentFirstPageGroup,
313}
314
315#[derive(Debug, thiserror::Error)]
317pub enum ClientDatabaseFormatError {
318 #[error("Storage backend has canceled read request")]
320 ReadRequestCancelled,
321 #[error("Storage backend read error: {error}")]
323 ReadError {
324 error: io::Error,
326 },
327 #[error("Failed to generate database id")]
329 FailedToGenerateDatabaseId {
330 #[from]
332 error: SysError,
333 },
334 #[error("Database is already formatted yet")]
336 AlreadyFormatted,
337 #[error("Storage backend has canceled a writing request")]
339 WriteRequestCancelled,
340 #[error("Storage item write error")]
342 StorageItemWriteError {
343 #[from]
345 error: io::Error,
346 },
347}
348
349#[derive(Debug, Copy, Clone)]
350struct ForkTip {
351 number: BlockNumber,
352 root: BlockRoot,
353}
354
355enum FullBlock<'a, Block>
356where
357 Block: GenericOwnedBlock,
358{
359 InMemory(&'a Block),
360 Persisted {
361 header: &'a Block::Header,
362 write_location: WriteLocation,
363 },
364}
365
366#[derive(Debug)]
367struct BeaconChainBlockDetails {
368 shard_segment_roots: StdArc<[ShardSegmentRoot]>,
370}
371
372impl BeaconChainBlockDetails {
373 fn from_body(body: &BeaconChainBody<'_>) -> Self {
374 let shard_segment_roots = body
375 .intermediate_shard_blocks()
376 .iter()
377 .flat_map(|intermediate_shard_block_info| {
378 let own_segments = intermediate_shard_block_info
379 .own_segments
380 .into_iter()
381 .flat_map({
382 let shard_index = intermediate_shard_block_info.header.prefix.shard_index;
383
384 move |own_segments| {
385 (own_segments.first_local_segment_index..)
386 .zip(own_segments.segment_roots)
387 .map(move |(segment_index, &segment_root)| ShardSegmentRoot {
388 shard_index,
389 segment_index,
390 segment_root,
391 })
392 }
393 });
394 let child_shard_segment_roots = intermediate_shard_block_info
395 .leaf_shards_segments()
396 .flat_map(move |(shard_index, own_segments)| {
397 (own_segments.first_local_segment_index..)
398 .zip(own_segments.segment_roots)
399 .map(move |(segment_index, &segment_root)| ShardSegmentRoot {
400 shard_index,
401 segment_index,
402 segment_root,
403 })
404 });
405
406 own_segments.chain(child_shard_segment_roots)
407 })
408 .collect();
409
410 Self {
411 shard_segment_roots,
412 }
413 }
414}
415
416#[derive(Debug)]
423enum ClientDatabaseBlock<Block>
424where
425 Block: GenericOwnedBlock,
426{
427 InMemory {
429 block: Block,
430 block_details: BlockDetails,
431 beacon_chain_block_details: Option<BeaconChainBlockDetails>,
433 },
434 Persisted {
436 header: Block::Header,
437 block_details: BlockDetails,
438 beacon_chain_block_details: Option<BeaconChainBlockDetails>,
440 write_location: WriteLocation,
441 },
442 PersistedConfirmed {
445 header: Block::Header,
446 beacon_chain_block_details: Option<BeaconChainBlockDetails>,
448 write_location: WriteLocation,
449 },
450}
451
452impl<Block> ClientDatabaseBlock<Block>
453where
454 Block: GenericOwnedBlock,
455{
456 #[inline(always)]
457 fn header(&self) -> &Block::Header {
458 match self {
459 Self::InMemory { block, .. } => block.header(),
460 Self::Persisted { header, .. } | Self::PersistedConfirmed { header, .. } => header,
461 }
462 }
463
464 #[inline(always)]
465 fn full_block(&self) -> FullBlock<'_, Block> {
466 match self {
467 Self::InMemory { block, .. } => FullBlock::InMemory(block),
468 Self::Persisted {
469 header,
470 write_location,
471 ..
472 }
473 | Self::PersistedConfirmed {
474 header,
475 write_location,
476 ..
477 } => FullBlock::Persisted {
478 header,
479 write_location: *write_location,
480 },
481 }
482 }
483
484 #[inline(always)]
485 fn block_details(&self) -> Option<&BlockDetails> {
486 match self {
487 Self::InMemory { block_details, .. } | Self::Persisted { block_details, .. } => {
488 Some(block_details)
489 }
490 Self::PersistedConfirmed { .. } => None,
491 }
492 }
493
494 #[inline(always)]
495 fn beacon_chain_block_details(&self) -> Option<&BeaconChainBlockDetails> {
496 match self {
497 Self::InMemory {
498 beacon_chain_block_details,
499 ..
500 }
501 | Self::Persisted {
502 beacon_chain_block_details,
503 ..
504 }
505 | Self::PersistedConfirmed {
506 beacon_chain_block_details,
507 ..
508 } => beacon_chain_block_details.as_ref(),
509 }
510 }
511}
512
513#[derive(Debug)]
514struct StateData<Block>
515where
516 Block: GenericOwnedBlock,
517{
518 fork_tips: VecDeque<ForkTip>,
523 block_roots: HashMap<BlockRoot, BlockNumber, BuildHasherDefault<BlockRootHasher>>,
528 blocks: VecDeque<SmallVec<[ClientDatabaseBlock<Block>; 2]>>,
543}
544
545#[derive(Debug)]
546struct SegmentHeadersCache {
547 segment_headers_cache: Vec<SegmentHeader>,
548}
549
550impl SegmentHeadersCache {
551 #[inline(always)]
552 fn last_segment_header(&self) -> Option<SegmentHeader> {
553 self.segment_headers_cache.last().copied()
554 }
555
556 #[inline(always)]
557 fn max_local_segment_index(&self) -> Option<LocalSegmentIndex> {
558 self.segment_headers_cache
559 .last()
560 .map(|segment_header| segment_header.index.as_inner())
561 }
562
563 #[inline(always)]
564 fn get_segment_header(&self, local_segment_index: LocalSegmentIndex) -> Option<SegmentHeader> {
565 self.segment_headers_cache
566 .get(u64::from(local_segment_index) as usize)
567 .copied()
568 }
569
570 fn add_segment_headers(
572 &mut self,
573 mut segment_headers: Vec<SegmentHeader>,
574 ) -> Result<Vec<SegmentHeader>, PersistSegmentHeadersError> {
575 self.segment_headers_cache.reserve(segment_headers.len());
576
577 let mut maybe_last_local_segment_index = self.max_local_segment_index();
578
579 if let Some(last_segment_index) = maybe_last_local_segment_index {
580 segment_headers
582 .retain(|segment_header| segment_header.index.as_inner() > last_segment_index);
583 }
584
585 for segment_header in segment_headers.iter().copied() {
588 let local_segment_index = segment_header.index.as_inner();
589 if let Some(last_local_segment_index) = maybe_last_local_segment_index {
590 if local_segment_index != last_local_segment_index + LocalSegmentIndex::ONE {
591 return Err(PersistSegmentHeadersError::MustFollowLastSegmentIndex {
592 local_segment_index,
593 last_local_segment_index,
594 });
595 }
596
597 self.segment_headers_cache.push(segment_header);
598 maybe_last_local_segment_index.replace(local_segment_index);
599 } else {
600 if local_segment_index != LocalSegmentIndex::ZERO {
601 return Err(PersistSegmentHeadersError::FirstSegmentIndexZero {
602 local_segment_index,
603 });
604 }
605
606 self.segment_headers_cache.push(segment_header);
607 maybe_last_local_segment_index.replace(local_segment_index);
608 }
609 }
610
611 Ok(segment_headers)
612 }
613}
614
615#[derive(Debug)]
616struct SuperSegmentHeadersCache {
617 super_segment_headers_cache: Vec<SuperSegmentHeader>,
618}
619
620impl SuperSegmentHeadersCache {
621 #[inline(always)]
622 fn last_super_segment_header(&self) -> Option<SuperSegmentHeader> {
623 self.super_segment_headers_cache.last().copied()
624 }
625
626 #[inline]
627 fn previous_super_segment_header(
628 &self,
629 target_block_number: BlockNumber,
630 ) -> Option<SuperSegmentHeader> {
631 let block_number = target_block_number.checked_sub(BlockNumber::ONE)?;
632 let index = match self.super_segment_headers_cache.binary_search_by_key(
633 &block_number,
634 |super_segment_header| {
635 super_segment_header
636 .target_beacon_chain_block_number
637 .as_inner()
638 },
639 ) {
640 Ok(found_index) => found_index,
641 Err(insert_index) => insert_index.checked_sub(1)?,
642 };
643
644 self.super_segment_headers_cache.get(index).copied()
645 }
646
647 #[inline(always)]
648 fn get_super_segment_header(
649 &self,
650 local_segment_index: SuperSegmentIndex,
651 ) -> Option<SuperSegmentHeader> {
652 self.super_segment_headers_cache
653 .get(u64::from(local_segment_index) as usize)
654 .copied()
655 }
656
657 #[inline(always)]
658 fn get_super_segment_header_for_segment_index(
659 &self,
660 segment_index: SegmentIndex,
661 ) -> Option<SuperSegmentHeader> {
662 let index = self
663 .super_segment_headers_cache
664 .binary_search_by_key(&segment_index, |super_segment_header| {
665 super_segment_header.max_segment_index.as_inner()
666 })
667 .unwrap_or_else(|insert_index| insert_index);
668
669 let super_segment_header = self.super_segment_headers_cache.get(index).copied()?;
670
671 let max_segment_index = super_segment_header.max_segment_index.as_inner();
672 let first_segment_index = max_segment_index
673 - SegmentIndex::from(u64::from(super_segment_header.num_segments))
674 + SegmentIndex::ONE;
675
676 (first_segment_index..=max_segment_index)
677 .contains(&segment_index)
678 .then_some(super_segment_header)
679 }
680
681 fn add_super_segment_headers(
683 &mut self,
684 mut super_segment_headers: Vec<SuperSegmentHeader>,
685 ) -> Result<Vec<SuperSegmentHeader>, PersistSuperSegmentHeadersError> {
686 self.super_segment_headers_cache
687 .reserve(super_segment_headers.len());
688
689 let mut maybe_last_super_segment_index = self
690 .super_segment_headers_cache
691 .last()
692 .map(|header| header.index.as_inner());
693
694 if let Some(last_super_segment_index) = maybe_last_super_segment_index {
695 super_segment_headers.retain(|super_segment_header| {
697 super_segment_header.index.as_inner() > last_super_segment_index
698 });
699 }
700
701 for super_segment_header in super_segment_headers.iter().copied() {
704 let super_segment_index = super_segment_header.index.as_inner();
705 if let Some(last_super_segment_index) = maybe_last_super_segment_index {
706 if super_segment_index != last_super_segment_index + SuperSegmentIndex::ONE {
707 return Err(
708 PersistSuperSegmentHeadersError::MustFollowLastSegmentIndex {
709 super_segment_index,
710 last_super_segment_index,
711 },
712 );
713 }
714
715 self.super_segment_headers_cache.push(super_segment_header);
716 maybe_last_super_segment_index.replace(super_segment_index);
717 } else {
718 if super_segment_index != SuperSegmentIndex::ZERO {
719 return Err(PersistSuperSegmentHeadersError::FirstSegmentIndexZero {
720 super_segment_index,
721 });
722 }
723
724 self.super_segment_headers_cache.push(super_segment_header);
725 maybe_last_super_segment_index.replace(super_segment_index);
726 }
727 }
728
729 Ok(super_segment_headers)
730 }
731}
732
733#[derive(Debug)]
735struct State<Block, StorageBackend>
736where
737 Block: GenericOwnedBlock,
738{
739 data: StateData<Block>,
740 segment_headers_cache: SegmentHeadersCache,
741 super_segment_headers_cache: SuperSegmentHeadersCache,
742 storage_backend_adapter: AsyncRwLock<StorageBackendAdapter<StorageBackend>>,
743}
744
745impl<Block, StorageBackend> State<Block, StorageBackend>
746where
747 Block: GenericOwnedBlock,
748{
749 #[inline(always)]
750 fn best_tip(&self) -> &ForkTip {
751 self.data
752 .fork_tips
753 .front()
754 .expect("The best block is always present; qed")
755 }
756
757 #[inline(always)]
758 fn best_block(&self) -> &ClientDatabaseBlock<Block> {
759 self.data
760 .blocks
761 .front()
762 .expect("The best block is always present; qed")
763 .first()
764 .expect("The best block is always present; qed")
765 }
766}
767
768#[derive(Debug)]
769struct BlockToPersist<'a, Block>
770where
771 Block: GenericOwnedBlock,
772{
773 block_offset: usize,
774 fork_offset: usize,
775 block: &'a Block,
776 block_details: &'a BlockDetails,
777}
778
779#[derive(Debug)]
780struct PersistedBlock {
781 block_offset: usize,
782 fork_offset: usize,
783 write_location: WriteLocation,
784}
785
786#[derive(Debug)]
787struct ClientDatabaseInnerOptions {
788 block_confirmation_depth: BlockNumber,
789 soft_confirmation_depth: BlockNumber,
790 max_fork_tips: NonZeroUsize,
791 max_fork_tip_distance: BlockNumber,
792}
793
794#[derive(Debug)]
795struct Inner<Block, StorageBackend>
796where
797 Block: GenericOwnedBlock,
798{
799 state: AsyncRwLock<State<Block, StorageBackend>>,
800 options: ClientDatabaseInnerOptions,
801}
802
803#[derive(Debug)]
805pub struct ClientDatabase<Block, StorageBackend>
806where
807 Block: GenericOwnedBlock,
808{
809 inner: Arc<Inner<Block, StorageBackend>>,
810}
811
812impl<Block, StorageBackend> Clone for ClientDatabase<Block, StorageBackend>
813where
814 Block: GenericOwnedBlock,
815{
816 fn clone(&self) -> Self {
817 Self {
818 inner: self.inner.clone(),
819 }
820 }
821}
822
823impl<Block, StorageBackend> Drop for ClientDatabase<Block, StorageBackend>
824where
825 Block: GenericOwnedBlock,
826{
827 fn drop(&mut self) {
828 }
830}
831
832impl<Block, StorageBackend> ChainInfo<Block> for ClientDatabase<Block, StorageBackend>
833where
834 Block: GenericOwnedBlock,
835 StorageBackend: ClientDatabaseStorageBackend,
836{
837 #[inline]
838 fn best_root(&self) -> BlockRoot {
839 self.inner.state.read_blocking().best_tip().root
842 }
843
844 #[inline]
845 fn best_header(&self) -> Block::Header {
846 self.inner
849 .state
850 .read_blocking()
851 .best_block()
852 .header()
853 .clone()
854 }
855
856 #[inline]
857 fn best_header_with_details(&self) -> (Block::Header, BlockDetails) {
858 let state = self.inner.state.read_blocking();
861 let best_block = state.best_block();
862 (
863 best_block.header().clone(),
864 best_block
865 .block_details()
866 .expect("Always present for the best block; qed")
867 .clone(),
868 )
869 }
870
871 #[inline]
873 fn ancestor_header(
874 &self,
875 ancestor_block_number: BlockNumber,
876 descendant_block_root: &BlockRoot,
877 ) -> Option<Block::Header> {
878 let state = self.inner.state.read_blocking();
881 let best_number = state.best_tip().number;
882
883 let ancestor_block_offset =
884 u64::from(best_number.checked_sub(ancestor_block_number)?) as usize;
885 let ancestor_block_candidates = state.data.blocks.get(ancestor_block_offset)?;
886
887 let descendant_block_number = *state.data.block_roots.get(descendant_block_root)?;
888 if ancestor_block_number > descendant_block_number {
889 return None;
890 }
891 let descendant_block_offset =
892 u64::from(best_number.checked_sub(descendant_block_number)?) as usize;
893
894 let mut blocks_range_iter = state
896 .data
897 .blocks
898 .iter()
899 .enumerate()
900 .skip(descendant_block_offset);
901
902 let (_offset, descendant_block_candidates) = blocks_range_iter.next()?;
903 let descendant_header = descendant_block_candidates
904 .iter()
905 .find(|block| &*block.header().header().root() == descendant_block_root)?
906 .header()
907 .header();
908
909 if descendant_block_candidates.len() == 1 || ancestor_block_candidates.len() == 1 {
914 return ancestor_block_candidates
915 .iter()
916 .next()
917 .map(|block| block.header().clone());
918 }
919
920 let mut parent_block_root = &descendant_header.prefix.parent_root;
921
922 for (block_offset, parent_candidates) in blocks_range_iter {
924 let parent_header = parent_candidates
925 .iter()
926 .find(|header| &*header.header().header().root() == parent_block_root)?
927 .header();
928
929 if block_offset == ancestor_block_offset {
931 return Some(parent_header.clone());
932 }
933
934 parent_block_root = &parent_header.header().prefix.parent_root;
935 }
936
937 None
938 }
939
940 #[inline]
941 fn header(&self, block_root: &BlockRoot) -> Option<Block::Header> {
942 let state = self.inner.state.read_blocking();
945 let best_number = state.best_tip().number;
946
947 let block_number = *state.data.block_roots.get(block_root)?;
948 let block_offset = u64::from(best_number.checked_sub(block_number)?) as usize;
949 let block_candidates = state.data.blocks.get(block_offset)?;
950
951 block_candidates.iter().find_map(|block| {
952 let header = block.header();
953
954 if &*header.header().root() == block_root {
955 Some(header.clone())
956 } else {
957 None
958 }
959 })
960 }
961
962 #[inline]
963 fn header_with_details(&self, block_root: &BlockRoot) -> Option<(Block::Header, BlockDetails)> {
964 let state = self.inner.state.read_blocking();
967 let best_number = state.best_tip().number;
968
969 let block_number = *state.data.block_roots.get(block_root)?;
970 let block_offset = u64::from(best_number.checked_sub(block_number)?) as usize;
971 let block_candidates = state.data.blocks.get(block_offset)?;
972
973 block_candidates.iter().find_map(|block| {
974 let header = block.header();
975 let block_details = block.block_details().cloned()?;
976
977 if &*header.header().root() == block_root {
978 Some((header.clone(), block_details))
979 } else {
980 None
981 }
982 })
983 }
984
985 #[inline]
986 async fn block(&self, block_root: &BlockRoot) -> Result<Block, ReadBlockError> {
987 let state = self.inner.state.read().await;
988 let best_number = state.best_tip().number;
989
990 let block_number = *state
991 .data
992 .block_roots
993 .get(block_root)
994 .ok_or(ReadBlockError::UnknownBlockRoot)?;
995 let block_offset = u64::from(
996 best_number
997 .checked_sub(block_number)
998 .expect("Known block roots always have valid block offset; qed"),
999 ) as usize;
1000 let block_candidates = state
1001 .data
1002 .blocks
1003 .get(block_offset)
1004 .expect("Valid block offsets always have block entries; qed");
1005
1006 for block_candidate in block_candidates {
1007 let header = block_candidate.header();
1008
1009 if &*header.header().root() == block_root {
1010 return match block_candidate.full_block() {
1011 FullBlock::InMemory(block) => Ok(block.clone()),
1012 FullBlock::Persisted {
1013 header,
1014 write_location,
1015 } => {
1016 let storage_backend_adapter = state.storage_backend_adapter.read().await;
1017
1018 let storage_item = storage_backend_adapter
1019 .read_storage_item::<StorageItemTemporary>(write_location)
1020 .await?;
1021
1022 let storage_item_block = match storage_item {
1023 StorageItemTemporary::Block(storage_item_block) => storage_item_block,
1024 StorageItemTemporary::SegmentHeaders(_) => {
1025 return Err(ReadBlockError::StorageItemReadError {
1026 error: io::Error::other(
1027 "Unexpected storage item: `SegmentHeaders`",
1028 ),
1029 });
1030 }
1031 StorageItemTemporary::SuperSegmentHeaders(_) => {
1032 return Err(ReadBlockError::StorageItemReadError {
1033 error: io::Error::other(
1034 "Unexpected storage item: `SuperSegmentHeaders`",
1035 ),
1036 });
1037 }
1038 };
1039
1040 let StorageItemTemporaryBlock {
1041 header: _,
1042 body,
1043 mmr_with_block: _,
1044 system_contract_states: _,
1045 } = storage_item_block;
1046
1047 Block::from_buffers(header.buffer().clone(), body)
1048 .ok_or(ReadBlockError::FailedToDecode)
1049 }
1050 };
1051 }
1052 }
1053
1054 unreachable!("Known block root always has block candidate associated with it; qed")
1055 }
1056
1057 #[inline]
1058 fn last_segment_header(&self) -> Option<SegmentHeader> {
1059 let state = self.inner.state.read_blocking();
1062 state.segment_headers_cache.last_segment_header()
1063 }
1064
1065 #[inline]
1066 fn get_segment_header(&self, segment_index: LocalSegmentIndex) -> Option<SegmentHeader> {
1067 let state = self.inner.state.read_blocking();
1070
1071 state
1072 .segment_headers_cache
1073 .get_segment_header(segment_index)
1074 }
1075
1076 fn segment_headers_for_block(&self, block_number: BlockNumber) -> Vec<SegmentHeader> {
1077 let state = self.inner.state.read_blocking();
1080
1081 let Some(last_local_segment_index) = state.segment_headers_cache.max_local_segment_index()
1082 else {
1083 return Vec::new();
1085 };
1086
1087 if Block::Block::SHARD_KIND == RealShardKind::BeaconChain
1089 && block_number == BlockNumber::ONE
1090 {
1091 return vec![
1094 state
1095 .segment_headers_cache
1096 .get_segment_header(LocalSegmentIndex::ZERO)
1097 .expect("Segment headers are stored in monotonically increasing order; qed"),
1098 ];
1099 }
1100
1101 if last_local_segment_index == LocalSegmentIndex::ZERO {
1102 return Vec::new();
1104 }
1105
1106 let mut current_local_segment_index = last_local_segment_index;
1107 loop {
1108 let current_segment_header = state
1111 .segment_headers_cache
1112 .get_segment_header(current_local_segment_index)
1113 .expect("Segment headers are stored in monotonically increasing order; qed");
1114
1115 let target_block_number = current_segment_header.last_archived_block.number()
1117 + BlockNumber::ONE
1118 + self.inner.options.block_confirmation_depth;
1119 if target_block_number == block_number {
1120 let mut headers_for_block = vec![current_segment_header];
1121
1122 let last_archived_block_number = current_segment_header.last_archived_block.number;
1124 let mut local_segment_index = current_local_segment_index - LocalSegmentIndex::ONE;
1125
1126 while let Some(segment_header) = state
1127 .segment_headers_cache
1128 .get_segment_header(local_segment_index)
1129 {
1130 if segment_header.last_archived_block.number == last_archived_block_number {
1131 headers_for_block.insert(0, segment_header);
1132 local_segment_index -= LocalSegmentIndex::ONE;
1133 } else {
1134 break;
1135 }
1136 }
1137
1138 return headers_for_block;
1139 }
1140
1141 if target_block_number > block_number {
1143 if current_local_segment_index > LocalSegmentIndex::ONE {
1145 current_local_segment_index -= LocalSegmentIndex::ONE;
1146 } else {
1147 break;
1148 }
1149 } else {
1150 return Vec::new();
1152 }
1153 }
1154
1155 Vec::new()
1157 }
1158}
1159
1160impl<Block, StorageBackend> ChainInfoWrite<Block> for ClientDatabase<Block, StorageBackend>
1161where
1162 Block: GenericOwnedBlock,
1163 StorageBackend: ClientDatabaseStorageBackend,
1164{
1165 async fn persist_block(
1166 &self,
1167 block: Block,
1168 block_details: BlockDetails,
1169 ) -> Result<(), PersistBlockError> {
1170 let mut state = self.inner.state.write().await;
1171 let best_number = state.best_tip().number;
1172
1173 let header = block.header().header();
1174
1175 let block_number = header.prefix.number;
1176
1177 if best_number == BlockNumber::ZERO && block_number != BlockNumber::ONE {
1178 Self::insert_first_block(&mut state.data, block, block_details);
1180
1181 return Ok(());
1182 }
1183
1184 if block_number == best_number + BlockNumber::ONE {
1185 return Self::insert_new_best_block(state, &self.inner, block, block_details).await;
1186 }
1187
1188 let block_offset = u64::from(
1189 best_number
1190 .checked_sub(block_number)
1191 .ok_or(PersistBlockError::MissingParent)?,
1192 ) as usize;
1193
1194 if block_offset >= u64::from(self.inner.options.block_confirmation_depth) as usize {
1195 return Err(PersistBlockError::OutsideAcceptableRange);
1196 }
1197
1198 let state = &mut *state;
1199
1200 let block_forks = state.data.blocks.get_mut(block_offset).ok_or_else(|| {
1201 error!(
1202 %block_number,
1203 %block_offset,
1204 "Failed to store block fork, header offset is missing despite being within \
1205 acceptable range"
1206 );
1207
1208 PersistBlockError::OutsideAcceptableRange
1209 })?;
1210
1211 for (index, fork_tip) in state.data.fork_tips.iter_mut().enumerate() {
1212 if fork_tip.root == header.prefix.parent_root {
1214 state.data.fork_tips.remove(index);
1215 break;
1216 }
1217 }
1218
1219 let block_root = *header.root();
1220 state.data.fork_tips.insert(
1223 1,
1224 ForkTip {
1225 number: block_number,
1226 root: block_root,
1227 },
1228 );
1229 state.data.block_roots.insert(block_root, block_number);
1230 let beacon_chain_block_details = <dyn Any>::downcast_ref::<OwnedBeaconChainBlock>(&block)
1231 .map(|block| BeaconChainBlockDetails::from_body(block.body.body()));
1232 block_forks.push(ClientDatabaseBlock::InMemory {
1233 block,
1234 block_details,
1235 beacon_chain_block_details,
1236 });
1237
1238 Self::prune_outdated_fork_tips(block_number, &mut state.data, &self.inner.options);
1239
1240 Ok(())
1241 }
1242
1243 async fn persist_segment_headers(
1244 &self,
1245 segment_headers: Vec<SegmentHeader>,
1246 ) -> Result<(), PersistSegmentHeadersError> {
1247 let mut state = self.inner.state.write().await;
1248
1249 let added_segment_headers = state
1250 .segment_headers_cache
1251 .add_segment_headers(segment_headers)?;
1252
1253 if added_segment_headers.is_empty() {
1254 return Ok(());
1255 }
1256
1257 let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1262
1263 let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1264
1265 storage_backend_adapter
1266 .write_storage_item(StorageItemTemporary::SegmentHeaders(
1267 StorageItemTemporarySegmentHeaders {
1268 segment_headers: added_segment_headers,
1269 },
1270 ))
1271 .await?;
1272
1273 Ok(())
1274 }
1275}
1276
1277impl<StorageBackend> BeaconChainInfo for ClientDatabase<OwnedBeaconChainBlock, StorageBackend>
1278where
1279 StorageBackend: ClientDatabaseStorageBackend,
1280{
1281 fn shard_segment_roots(
1282 &self,
1283 block_number: BlockNumber,
1284 ) -> Result<StdArc<[ShardSegmentRoot]>, ShardSegmentRootsError> {
1285 let state = self.inner.state.read_blocking();
1288 let best_number = state.best_tip().number;
1289
1290 let block_offset = u64::from(
1291 best_number
1292 .checked_sub(block_number)
1293 .ok_or(ShardSegmentRootsError::BlockMissing { block_number })?,
1294 ) as usize;
1295
1296 let block = state
1297 .data
1298 .blocks
1299 .get(block_offset)
1300 .ok_or(ShardSegmentRootsError::BlockMissing { block_number })?
1301 .first()
1302 .expect("There is always at least one block candidate; qed");
1303
1304 Ok(StdArc::clone(
1305 &block
1306 .beacon_chain_block_details()
1307 .as_ref()
1308 .expect("Always present in the beacon chain block; qed")
1309 .shard_segment_roots,
1310 ))
1311 }
1312
1313 #[inline]
1314 fn last_super_segment_header(&self) -> Option<SuperSegmentHeader> {
1315 let state = self.inner.state.read_blocking();
1318 state
1319 .super_segment_headers_cache
1320 .last_super_segment_header()
1321 }
1322
1323 #[inline]
1324 fn previous_super_segment_header(
1325 &self,
1326 block_number: BlockNumber,
1327 ) -> Option<SuperSegmentHeader> {
1328 let state = self.inner.state.read_blocking();
1331
1332 state
1333 .super_segment_headers_cache
1334 .previous_super_segment_header(block_number)
1335 }
1336
1337 #[inline]
1338 fn get_super_segment_header(
1339 &self,
1340 super_segment_index: SuperSegmentIndex,
1341 ) -> Option<SuperSegmentHeader> {
1342 let state = self.inner.state.read_blocking();
1345
1346 state
1347 .super_segment_headers_cache
1348 .get_super_segment_header(super_segment_index)
1349 }
1350
1351 fn get_super_segment_header_for_segment_index(
1352 &self,
1353 segment_index: SegmentIndex,
1354 ) -> Option<SuperSegmentHeader> {
1355 let state = self.inner.state.read_blocking();
1358
1359 state
1360 .super_segment_headers_cache
1361 .get_super_segment_header_for_segment_index(segment_index)
1362 }
1363}
1364
1365impl<StorageBackend> BeaconChainInfoWrite for ClientDatabase<OwnedBeaconChainBlock, StorageBackend>
1366where
1367 StorageBackend: ClientDatabaseStorageBackend,
1368{
1369 async fn persist_super_segment_header(
1370 &self,
1371 super_segment_header: SuperSegmentHeader,
1372 ) -> Result<bool, PersistSuperSegmentHeadersError> {
1373 let mut state = self.inner.state.write().await;
1374
1375 let added_super_segment_headers = state
1376 .super_segment_headers_cache
1377 .add_super_segment_headers(vec![super_segment_header])?;
1378
1379 if added_super_segment_headers.is_empty() {
1380 return Ok(false);
1381 }
1382
1383 let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1388
1389 let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1390
1391 storage_backend_adapter
1392 .write_storage_item(StorageItemTemporary::SuperSegmentHeaders(
1393 StorageItemTemporarySuperSegmentHeaders {
1394 super_segment_headers: added_super_segment_headers,
1395 },
1396 ))
1397 .await?;
1398
1399 Ok(true)
1400 }
1401
1402 async fn persist_super_segment_headers(
1403 &self,
1404 super_segment_headers: Vec<SuperSegmentHeader>,
1405 ) -> Result<(), PersistSuperSegmentHeadersError> {
1406 let mut state = self.inner.state.write().await;
1407
1408 let added_super_segment_headers = state
1409 .super_segment_headers_cache
1410 .add_super_segment_headers(super_segment_headers)?;
1411
1412 if added_super_segment_headers.is_empty() {
1413 return Ok(());
1414 }
1415
1416 let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1421
1422 let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1423
1424 storage_backend_adapter
1425 .write_storage_item(StorageItemTemporary::SuperSegmentHeaders(
1426 StorageItemTemporarySuperSegmentHeaders {
1427 super_segment_headers: added_super_segment_headers,
1428 },
1429 ))
1430 .await?;
1431
1432 Ok(())
1433 }
1434}
1435
1436impl<Block, StorageBackend> ClientDatabase<Block, StorageBackend>
1437where
1438 Block: GenericOwnedBlock,
1439 StorageBackend: ClientDatabaseStorageBackend,
1440{
1441 pub async fn open<GBB>(
1445 options: ClientDatabaseOptions<GBB, StorageBackend>,
1446 ) -> Result<Self, ClientDatabaseError>
1447 where
1448 GBB: FnOnce() -> GenesisBlockBuilderResult<Block>,
1449 {
1450 let ClientDatabaseOptions {
1451 write_buffer_size,
1452 block_confirmation_depth,
1453 soft_confirmation_depth,
1454 max_fork_tips,
1455 max_fork_tip_distance,
1456 genesis_block_builder,
1457 storage_backend,
1458 } = options;
1459 if soft_confirmation_depth >= block_confirmation_depth {
1460 return Err(ClientDatabaseError::InvalidSoftConfirmationDepth);
1461 }
1462
1463 if max_fork_tip_distance > block_confirmation_depth {
1464 return Err(ClientDatabaseError::InvalidMaxForkTipDistance);
1465 }
1466
1467 let mut state_data = StateData {
1468 fork_tips: VecDeque::new(),
1469 block_roots: HashMap::default(),
1470 blocks: VecDeque::new(),
1471 };
1472 let mut segment_headers_cache = SegmentHeadersCache {
1473 segment_headers_cache: Vec::new(),
1474 };
1475 let mut super_segment_headers_cache = SuperSegmentHeadersCache {
1476 super_segment_headers_cache: Vec::new(),
1477 };
1478
1479 let options = ClientDatabaseInnerOptions {
1480 block_confirmation_depth,
1481 soft_confirmation_depth,
1482 max_fork_tips,
1483 max_fork_tip_distance,
1484 };
1485
1486 let storage_item_handlers = StorageItemHandlers {
1487 permanent: |_arg| {
1488 Ok(())
1490 },
1491 temporary: |arg| {
1492 let StorageItemHandlerArg {
1493 storage_item,
1494 page_offset,
1495 num_pages,
1496 } = arg;
1497 let storage_item_block = match storage_item {
1498 StorageItemTemporary::Block(storage_item_block) => storage_item_block,
1499 StorageItemTemporary::SegmentHeaders(segment_headers) => {
1500 let num_segment_headers = segment_headers.segment_headers.len();
1501 return match segment_headers_cache
1502 .add_segment_headers(segment_headers.segment_headers)
1503 {
1504 Ok(_) => Ok(()),
1505 Err(error) => {
1506 error!(
1507 %page_offset,
1508 %num_segment_headers,
1509 %error,
1510 "Failed to add segment headers from storage item"
1511 );
1512
1513 Err(ClientDatabaseError::InvalidSegmentHeaders { page_offset })
1514 }
1515 };
1516 }
1517 StorageItemTemporary::SuperSegmentHeaders(super_segment_headers) => {
1518 let num_super_segment_headers =
1519 super_segment_headers.super_segment_headers.len();
1520 return match super_segment_headers_cache
1521 .add_super_segment_headers(super_segment_headers.super_segment_headers)
1522 {
1523 Ok(_) => Ok(()),
1524 Err(error) => {
1525 error!(
1526 %page_offset,
1527 %num_super_segment_headers,
1528 %error,
1529 "Failed to add segment headers from storage item"
1530 );
1531
1532 Err(ClientDatabaseError::InvalidSegmentHeaders { page_offset })
1533 }
1534 };
1535 }
1536 };
1537
1538 let StorageItemTemporaryBlock {
1541 header,
1542 body,
1543 mmr_with_block,
1544 system_contract_states,
1545 } = storage_item_block;
1546
1547 let header = Block::Header::from_buffer(header).map_err(|_buffer| {
1548 error!(%page_offset, "Failed to decode block header from bytes");
1549
1550 ClientDatabaseError::InvalidBlock { page_offset }
1551 })?;
1552 let body = Block::Body::from_buffer(body).map_err(|_buffer| {
1553 error!(%page_offset, "Failed to decode block body from bytes");
1554
1555 ClientDatabaseError::InvalidBlock { page_offset }
1556 })?;
1557
1558 let block_root = *header.header().root();
1559 let block_number = header.header().prefix.number;
1560
1561 state_data.block_roots.insert(block_root, block_number);
1562
1563 let maybe_best_number = state_data
1564 .blocks
1565 .front()
1566 .and_then(|block_forks| block_forks.first())
1567 .map(|best_block| {
1568 let header: &Block::Header = best_block.header();
1570
1571 header.header().prefix.number
1572 });
1573
1574 let block_offset = if let Some(best_number) = maybe_best_number {
1575 if block_number <= best_number {
1576 u64::from(best_number - block_number) as usize
1577 } else {
1578 if block_number - best_number != BlockNumber::ONE {
1580 error!(
1581 %page_offset,
1582 %best_number,
1583 %block_number,
1584 "Invalid new best block number, it must be only one block \
1585 higher than the best block"
1586 );
1587
1588 return Err(ClientDatabaseError::InvalidBlock { page_offset });
1589 }
1590
1591 state_data.blocks.push_front(SmallVec::new());
1592 0
1594 }
1595 } else {
1596 state_data.blocks.push_front(SmallVec::new());
1597 0
1599 };
1600
1601 let Some(block_forks) = state_data.blocks.get_mut(block_offset) else {
1602 return Ok(());
1605 };
1606
1607 let beacon_chain_block_details =
1609 <dyn Any>::downcast_ref::<OwnedBeaconChainBody>(&body)
1610 .map(|body| BeaconChainBlockDetails::from_body(body.body()));
1611 block_forks.push(ClientDatabaseBlock::Persisted {
1612 header,
1613 block_details: BlockDetails {
1614 mmr_with_block,
1615 system_contract_states,
1616 },
1617 beacon_chain_block_details,
1618 write_location: WriteLocation {
1619 page_offset,
1620 num_pages,
1621 },
1622 });
1623
1624 if block_offset == 0 && block_forks.len() == 1 {
1627 Self::confirm_canonical_block(block_number, &mut state_data, &options);
1628 }
1629
1630 Ok(())
1631 },
1632 };
1633
1634 let storage_backend_adapter =
1635 StorageBackendAdapter::open(write_buffer_size, storage_item_handlers, storage_backend)
1636 .await?;
1637
1638 if let Some(best_block) = state_data.blocks.front().and_then(|block_forks| {
1639 block_forks.last()
1642 }) {
1643 let header: &Block::Header = best_block.header();
1645 let header = header.header();
1646 let block_number = header.prefix.number;
1647 let block_root = *header.root();
1648
1649 if !Self::adjust_ancestor_block_forks(&mut state_data.blocks, block_root) {
1650 return Err(ClientDatabaseError::FailedToAdjustAncestorBlockForks);
1651 }
1652
1653 state_data.fork_tips.push_front(ForkTip {
1655 number: block_number,
1656 root: block_root,
1657 });
1658 } else {
1659 let GenesisBlockBuilderResult {
1660 block,
1661 system_contract_states,
1662 } = genesis_block_builder();
1663
1664 let header = block.header().header();
1666 let block_number = header.prefix.number;
1667 let block_root = *header.root();
1668
1669 state_data.fork_tips.push_front(ForkTip {
1670 number: block_number,
1671 root: block_root,
1672 });
1673 state_data.block_roots.insert(block_root, block_number);
1674 let beacon_chain_block_details =
1675 <dyn Any>::downcast_ref::<OwnedBeaconChainBlock>(&block)
1676 .map(|block| BeaconChainBlockDetails::from_body(block.body.body()));
1677 state_data
1678 .blocks
1679 .push_front(smallvec![ClientDatabaseBlock::InMemory {
1680 block,
1681 block_details: BlockDetails {
1682 system_contract_states,
1683 mmr_with_block: Arc::new({
1684 let mut mmr = BlockMerkleMountainRange::new();
1685 mmr.add_leaf(&block_root);
1686 mmr
1687 })
1688 },
1689 beacon_chain_block_details,
1690 }]);
1691 }
1692
1693 let state = State {
1694 data: state_data,
1695 segment_headers_cache,
1696 super_segment_headers_cache,
1697 storage_backend_adapter: AsyncRwLock::new(storage_backend_adapter),
1698 };
1699
1700 let inner = Inner {
1701 state: AsyncRwLock::new(state),
1702 options,
1703 };
1704
1705 Ok(Self {
1706 inner: Arc::new(inner),
1707 })
1708 }
1709
1710 pub async fn format(
1712 storage_backend: &StorageBackend,
1713 options: ClientDatabaseFormatOptions,
1714 ) -> Result<(), ClientDatabaseFormatError> {
1715 StorageBackendAdapter::format(storage_backend, options).await
1716 }
1717
1718 fn insert_first_block(state: &mut StateData<Block>, block: Block, block_details: BlockDetails) {
1719 let header = block.header().header();
1721 let block_number = header.prefix.number;
1722 let block_root = *header.root();
1723
1724 state.fork_tips.clear();
1725 state.fork_tips.push_front(ForkTip {
1726 number: block_number,
1727 root: block_root,
1728 });
1729 state.block_roots.clear();
1730 state.block_roots.insert(block_root, block_number);
1731 state.blocks.clear();
1732 let beacon_chain_block_details = <dyn Any>::downcast_ref::<OwnedBeaconChainBlock>(&block)
1733 .map(|block| BeaconChainBlockDetails::from_body(block.body.body()));
1734 state
1735 .blocks
1736 .push_front(smallvec![ClientDatabaseBlock::InMemory {
1737 block,
1738 block_details,
1739 beacon_chain_block_details,
1740 }]);
1741 }
1742
1743 async fn insert_new_best_block(
1744 mut state: AsyncRwLockWriteGuard<'_, State<Block, StorageBackend>>,
1745 inner: &Inner<Block, StorageBackend>,
1746 block: Block,
1747 block_details: BlockDetails,
1748 ) -> Result<(), PersistBlockError> {
1749 let header = block.header().header();
1750 let block_number = header.prefix.number;
1751 let block_root = *header.root();
1752 let parent_root = header.prefix.parent_root;
1753
1754 if !Self::adjust_ancestor_block_forks(&mut state.data.blocks, parent_root) {
1757 return Err(PersistBlockError::MissingParent);
1758 }
1759
1760 {
1762 for (index, fork_tip) in state.data.fork_tips.iter_mut().enumerate() {
1763 if fork_tip.root == parent_root {
1765 state.data.fork_tips.remove(index);
1766 break;
1767 }
1768 }
1769
1770 state.data.fork_tips.push_front(ForkTip {
1771 number: block_number,
1772 root: block_root,
1773 });
1774 state.data.block_roots.insert(block_root, block_number);
1775 let beacon_chain_block_details =
1776 <dyn Any>::downcast_ref::<OwnedBeaconChainBlock>(&block)
1777 .map(|block| BeaconChainBlockDetails::from_body(block.body.body()));
1778 state
1779 .data
1780 .blocks
1781 .push_front(smallvec![ClientDatabaseBlock::InMemory {
1782 block,
1783 block_details: block_details.clone(),
1784 beacon_chain_block_details,
1785 }]);
1786 }
1787
1788 let options = &inner.options;
1789
1790 Self::confirm_canonical_block(block_number, &mut state.data, options);
1791 Self::prune_outdated_fork_tips(block_number, &mut state.data, options);
1792
1793 let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1798
1799 let mut blocks_to_persist = Vec::new();
1800 for block_offset in u64::from(options.soft_confirmation_depth) as usize.. {
1801 let Some(fork_blocks) = state.data.blocks.get(block_offset) else {
1802 break;
1803 };
1804
1805 let len_before = blocks_to_persist.len();
1806 fork_blocks
1807 .iter()
1808 .enumerate()
1809 .filter_map(|(fork_offset, client_database_block)| {
1810 match client_database_block {
1811 ClientDatabaseBlock::InMemory {
1812 block,
1813 block_details,
1814 beacon_chain_block_details: _,
1815 } => Some(BlockToPersist {
1816 block_offset,
1817 fork_offset,
1818 block,
1819 block_details,
1820 }),
1821 ClientDatabaseBlock::Persisted { .. }
1822 | ClientDatabaseBlock::PersistedConfirmed { .. } => {
1823 None
1825 }
1826 }
1827 })
1828 .collect_into(&mut blocks_to_persist);
1829
1830 if blocks_to_persist.len() == len_before {
1831 break;
1832 }
1833 }
1834
1835 let mut persisted_blocks = Vec::with_capacity(blocks_to_persist.len());
1837 {
1838 let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1839
1840 for block_to_persist in blocks_to_persist.into_iter().rev() {
1841 let BlockToPersist {
1842 block_offset,
1843 fork_offset,
1844 block,
1845 block_details,
1846 } = block_to_persist;
1847
1848 let write_location = storage_backend_adapter
1849 .write_storage_item(StorageItemTemporary::Block(StorageItemTemporaryBlock {
1850 header: block.header().buffer().clone(),
1851 body: block.body().buffer().clone(),
1852 mmr_with_block: Arc::clone(&block_details.mmr_with_block),
1853 system_contract_states: StdArc::clone(
1854 &block_details.system_contract_states,
1855 ),
1856 }))
1857 .await?;
1858
1859 persisted_blocks.push(PersistedBlock {
1860 block_offset,
1861 fork_offset,
1862 write_location,
1863 });
1864 }
1865 }
1866
1867 let mut state = RwLockUpgradableReadGuard::upgrade(state).await;
1869 for persisted_block in persisted_blocks {
1870 let PersistedBlock {
1871 block_offset,
1872 fork_offset,
1873 write_location,
1874 } = persisted_block;
1875
1876 let block = state
1877 .data
1878 .blocks
1879 .get_mut(block_offset)
1880 .expect("Still holding the same lock since last check; qed")
1881 .get_mut(fork_offset)
1882 .expect("Still holding the same lock since last check; qed");
1883
1884 replace_with_or_abort(block, |block| {
1885 if let ClientDatabaseBlock::InMemory {
1886 block,
1887 block_details,
1888 beacon_chain_block_details,
1889 } = block
1890 {
1891 let (header, _body) = block.split();
1892
1893 ClientDatabaseBlock::Persisted {
1894 header,
1895 block_details,
1896 beacon_chain_block_details,
1897 write_location,
1898 }
1899 } else {
1900 unreachable!("Still holding the same lock since last check; qed");
1901 }
1902 });
1903 }
1904
1905 Ok(())
1909 }
1910
1911 #[must_use]
1916 fn adjust_ancestor_block_forks(
1917 blocks: &mut VecDeque<SmallVec<[ClientDatabaseBlock<Block>; 2]>>,
1918 mut parent_block_root: BlockRoot,
1919 ) -> bool {
1920 let mut ancestor_blocks = blocks.iter_mut();
1921
1922 loop {
1923 if ancestor_blocks.len() == 1 {
1924 break;
1926 }
1927
1928 let Some(parent_blocks) = ancestor_blocks.next() else {
1929 break;
1931 };
1932
1933 let Some(fork_offset_parent_block_root) =
1934 parent_blocks
1935 .iter()
1936 .enumerate()
1937 .find_map(|(fork_offset, fork_block)| {
1938 let fork_header = fork_block.header().header();
1939 if *fork_header.root() == parent_block_root {
1940 Some((fork_offset, fork_header.prefix.parent_root))
1941 } else {
1942 None
1943 }
1944 })
1945 else {
1946 return false;
1947 };
1948
1949 let fork_offset;
1950 (fork_offset, parent_block_root) = fork_offset_parent_block_root;
1951
1952 parent_blocks.swap(0, fork_offset);
1953 }
1954
1955 true
1956 }
1957
1958 fn prune_outdated_fork_tips(
1965 best_number: BlockNumber,
1966 state: &mut StateData<Block>,
1967 options: &ClientDatabaseInnerOptions,
1968 ) {
1969 let state = &mut *state;
1970
1971 let mut candidate_forks_to_remove = Vec::with_capacity(options.max_fork_tips.get());
1974
1975 state.fork_tips.retain(|fork_tip| {
1977 if best_number - fork_tip.number > options.max_fork_tip_distance {
1978 candidate_forks_to_remove.push(*fork_tip);
1979 false
1980 } else {
1981 true
1982 }
1983 });
1984 if state.fork_tips.len() > options.max_fork_tips.get() {
1986 state
1987 .fork_tips
1988 .drain(options.max_fork_tips.get()..)
1989 .collect_into(&mut candidate_forks_to_remove);
1990 }
1991
1992 candidate_forks_to_remove
1994 .retain(|fork_tip| !Self::prune_outdated_fork(best_number, fork_tip, state));
1995 state.fork_tips.extend(candidate_forks_to_remove);
1997 }
1998
1999 #[must_use]
2002 fn prune_outdated_fork(
2003 best_number: BlockNumber,
2004 fork_tip: &ForkTip,
2005 state: &mut StateData<Block>,
2006 ) -> bool {
2007 let block_offset = u64::from(best_number - fork_tip.number) as usize;
2008
2009 let mut block_root_to_prune = fork_tip.root;
2011 let mut pruned_tip = false;
2012 for block_offset in block_offset.. {
2013 let Some(fork_blocks) = state.blocks.get_mut(block_offset) else {
2014 if !pruned_tip {
2015 error!(
2016 %best_number,
2017 ?fork_tip,
2018 block_offset,
2019 "Block offset was not present in the database, this is an implementation \
2020 bug #1"
2021 );
2022 }
2023 break;
2025 };
2026
2027 if fork_blocks.len() == 1 {
2028 if !pruned_tip {
2029 error!(
2030 %best_number,
2031 ?fork_tip,
2032 block_offset,
2033 "Block offset was not present in the database, this is an implementation \
2034 bug #2"
2035 );
2036 }
2037
2038 break;
2040 }
2041
2042 let Some((fork_offset, block)) = fork_blocks
2043 .iter()
2044 .enumerate()
2045 .skip(1)
2047 .find(|(_fork_offset, block)| {
2048 *block.header().header().root() == block_root_to_prune
2049 })
2050 else {
2051 if !pruned_tip {
2052 error!(
2053 %best_number,
2054 ?fork_tip,
2055 block_offset,
2056 "Block offset was not present in the database, this is an implementation \
2057 bug #3"
2058 );
2059 }
2060
2061 break;
2063 };
2064
2065 if block.header().ref_count() > 1 {
2067 break;
2068 }
2069
2070 match block {
2072 ClientDatabaseBlock::InMemory { .. } => {
2073 }
2075 ClientDatabaseBlock::Persisted { .. }
2076 | ClientDatabaseBlock::PersistedConfirmed { .. } => {
2077 pruned_tip = true;
2079 break;
2080 }
2081 }
2082
2083 state.block_roots.get_mut(&block_root_to_prune);
2084 block_root_to_prune = block.header().header().prefix.parent_root;
2085 fork_blocks.swap_remove(fork_offset);
2086
2087 pruned_tip = true;
2088 }
2089
2090 pruned_tip
2091 }
2092
2093 fn confirm_canonical_block(
2096 best_number: BlockNumber,
2097 state_data: &mut StateData<Block>,
2098 options: &ClientDatabaseInnerOptions,
2099 ) {
2100 let block_offset = u64::from(options.block_confirmation_depth + BlockNumber::ONE) as usize;
2104
2105 let Some(fork_blocks) = state_data.blocks.get_mut(block_offset) else {
2106 return;
2108 };
2109
2110 {
2112 let Some(canonical_block) = fork_blocks.first_mut() else {
2113 error!(
2114 %best_number,
2115 block_offset,
2116 "Have not found a canonical block to confirm, this is an implementation bug"
2117 );
2118 return;
2119 };
2120
2121 replace_with_or_abort(canonical_block, |block| match block {
2122 ClientDatabaseBlock::InMemory { .. } => {
2123 error!(
2124 %best_number,
2125 block_offset,
2126 header = ?block.header(),
2127 "Block to be confirmed must not be in memory, this is an implementation bug"
2128 );
2129 block
2130 }
2131 ClientDatabaseBlock::Persisted {
2132 header,
2133 block_details: _,
2134 beacon_chain_block_details,
2135 write_location,
2136 } => ClientDatabaseBlock::PersistedConfirmed {
2137 header,
2138 beacon_chain_block_details,
2139 write_location,
2140 },
2141 ClientDatabaseBlock::PersistedConfirmed { .. } => {
2142 error!(
2143 %best_number,
2144 block_offset,
2145 header = ?block.header(),
2146 "Block to be confirmed must not be confirmed yet, this is an \
2147 implementation bug"
2148 );
2149 block
2150 }
2151 });
2152 }
2153
2154 let mut block_roots_to_prune = fork_blocks
2156 .drain(1..)
2157 .map(|block| *block.header().header().root())
2158 .collect::<Vec<_>>();
2159 let mut current_block_offset = block_offset;
2160 while !block_roots_to_prune.is_empty() {
2161 state_data
2163 .fork_tips
2164 .retain(|fork_tip| !block_roots_to_prune.contains(&fork_tip.root));
2165
2166 for block_root in &block_roots_to_prune {
2168 state_data.block_roots.remove(block_root);
2169 }
2170
2171 if let Some(next_block_offset) = current_block_offset.checked_sub(1) {
2173 current_block_offset = next_block_offset;
2174 } else {
2175 break;
2177 }
2178
2179 let fork_blocks = state_data
2180 .blocks
2181 .get_mut(current_block_offset)
2182 .expect("Lower block offset always exists; qed");
2183
2184 block_roots_to_prune = fork_blocks
2186 .drain_filter(|block| {
2187 let header = block.header().header();
2188
2189 block_roots_to_prune.contains(&header.prefix.parent_root)
2190 })
2191 .map(|block| *block.header().header().root())
2192 .collect();
2193 }
2194 }
2195}