1#![expect(incomplete_features, reason = "generic_const_exprs")]
44#![feature(generic_const_exprs)]
47#![feature(
48 const_block_items,
49 const_convert,
50 const_trait_impl,
51 default_field_values,
52 get_mut_unchecked,
53 iter_collect_into,
54 maybe_uninit_fill
55)]
56
57mod page_group;
58pub mod storage_backend;
59mod storage_backend_adapter;
60
61use crate::page_group::temporary::StorageItemTemporary;
62use crate::page_group::temporary::block::StorageItemTemporaryBlock;
63use crate::page_group::temporary::segment_headers::StorageItemTemporarySegmentHeaders;
64use crate::page_group::temporary::super_segment_headers::StorageItemTemporarySuperSegmentHeaders;
65use crate::storage_backend::ClientDatabaseStorageBackend;
66use crate::storage_backend_adapter::{
67 StorageBackendAdapter, StorageItemHandlerArg, StorageItemHandlers, WriteLocation,
68};
69use ab_client_api::{
70 BeaconChainInfo, BeaconChainInfoWrite, BlockDetails, BlockMerkleMountainRange, ChainInfo,
71 ChainInfoWrite, ContractSlotState, PersistBlockError, PersistSegmentHeadersError,
72 PersistSuperSegmentHeadersError, ReadBlockError, ShardSegmentRoot, ShardSegmentRootsError,
73};
74use ab_core_primitives::block::body::BeaconChainBody;
75use ab_core_primitives::block::body::owned::{GenericOwnedBlockBody, OwnedBeaconChainBody};
76use ab_core_primitives::block::header::GenericBlockHeader;
77use ab_core_primitives::block::header::owned::GenericOwnedBlockHeader;
78use ab_core_primitives::block::owned::{GenericOwnedBlock, OwnedBeaconChainBlock};
79use ab_core_primitives::block::{BlockNumber, BlockRoot, GenericBlock};
80use ab_core_primitives::segments::{
81 LocalSegmentIndex, SegmentHeader, SegmentIndex, SuperSegmentHeader, SuperSegmentIndex,
82};
83use ab_core_primitives::shard::RealShardKind;
84use ab_io_type::trivial_type::TrivialType;
85use async_lock::{
86 RwLock as AsyncRwLock, RwLockUpgradableReadGuard, RwLockWriteGuard as AsyncRwLockWriteGuard,
87};
88use rand::rngs::SysError;
89use rclite::Arc;
90use replace_with::replace_with_or_abort;
91use smallvec::{SmallVec, smallvec};
92use std::any::Any;
93use std::collections::{HashMap, VecDeque};
94use std::hash::{BuildHasherDefault, Hasher};
95use std::num::{NonZeroU32, NonZeroUsize};
96use std::ops::Deref;
97use std::sync::Arc as StdArc;
98use std::{fmt, io};
99use tracing::error;
100
101#[derive(Debug, Copy, Clone, Eq, PartialEq, TrivialType)]
103#[repr(C)]
104pub struct DatabaseId([u8; 32]);
105
106impl Deref for DatabaseId {
107 type Target = [u8; 32];
108
109 #[inline(always)]
110 fn deref(&self) -> &Self::Target {
111 &self.0
112 }
113}
114
115impl AsRef<[u8]> for DatabaseId {
116 #[inline(always)]
117 fn as_ref(&self) -> &[u8] {
118 &self.0
119 }
120}
121
122impl DatabaseId {
123 #[inline(always)]
124 pub const fn new(bytes: [u8; 32]) -> Self {
125 Self(bytes)
126 }
127}
128
129#[derive(Default)]
130struct BlockRootHasher(u64);
131
132impl Hasher for BlockRootHasher {
133 #[inline(always)]
134 fn finish(&self) -> u64 {
135 self.0
136 }
137
138 #[inline(always)]
139 fn write(&mut self, bytes: &[u8]) {
140 let Some(state) = bytes.as_chunks().0.first().copied().map(u64::from_le_bytes) else {
141 return;
142 };
143
144 self.0 = state;
145 }
146}
147
148#[derive(Debug)]
149pub struct GenesisBlockBuilderResult<Block> {
150 pub block: Block,
152 pub system_contract_states: StdArc<[ContractSlotState]>,
154}
155
156#[derive(Debug, Copy, Clone)]
158pub struct ClientDatabaseOptions<GBB, StorageBackend> {
159 pub write_buffer_size: usize = 5,
167 pub block_confirmation_depth: BlockNumber,
173 pub soft_confirmation_depth: BlockNumber = BlockNumber::from(3),
187 pub max_fork_tips: NonZeroUsize = NonZeroUsize::new(3).expect("Not zero; qed"),
198 pub max_fork_tip_distance: BlockNumber = BlockNumber::from(5),
209 pub genesis_block_builder: GBB,
212 pub storage_backend: StorageBackend,
214}
215
216#[derive(Debug, Copy, Clone)]
218pub struct ClientDatabaseFormatOptions {
219 pub page_group_size: NonZeroU32,
236 pub force: bool,
240}
241
242#[derive(Debug, thiserror::Error)]
243pub enum ClientDatabaseError {
244 #[error("Invalid soft confirmation depth, it must be smaller than confirmation depth k")]
246 InvalidSoftConfirmationDepth,
247 #[error("Invalid max fork tip distance, it must be smaller or equal to confirmation depth k")]
249 InvalidMaxForkTipDistance,
250 #[error("Storage backend has canceled read request")]
252 ReadRequestCancelled,
253 #[error("Storage backend read error: {error}")]
255 ReadError {
256 error: io::Error,
258 },
259 #[error("Unsupported database version: {database_version}")]
261 UnsupportedDatabaseVersion {
262 database_version: u8,
264 },
265 #[error("Page group size is too small ({page_group_size}), must be at least two pages")]
267 PageGroupSizeTooSmall {
268 page_group_size: u32,
270 },
271 #[error(
273 "Unexpected sequence number {actual} at page offset {page_offset} (expected \
274 {expected})"
275 )]
276 UnexpectedSequenceNumber {
277 actual: u64,
279 expected: u64,
281 page_offset: u32,
283 },
284 #[error("Unexpected storage item at offset {page_offset}: {storage_item:?}")]
286 UnexpectedStorageItem {
287 storage_item: Box<dyn fmt::Debug + Send + Sync>,
289 page_offset: u32,
291 },
292 #[error("Invalid block at offset {page_offset}")]
294 InvalidBlock {
295 page_offset: u32,
297 },
298 #[error("Invalid segment headers at offset {page_offset}")]
300 InvalidSegmentHeaders {
301 page_offset: u32,
303 },
304 #[error("Failed to adjust ancestor block forks")]
306 FailedToAdjustAncestorBlockForks,
307 #[error("Database is not formatted yet")]
309 Unformatted,
310 #[error("Non-permanent first page group")]
312 NonPermanentFirstPageGroup,
313}
314
315#[derive(Debug, thiserror::Error)]
317pub enum ClientDatabaseFormatError {
318 #[error("Storage backend has canceled read request")]
320 ReadRequestCancelled,
321 #[error("Storage backend read error: {error}")]
323 ReadError {
324 error: io::Error,
326 },
327 #[error("Failed to generate database id")]
329 FailedToGenerateDatabaseId {
330 #[from]
332 error: SysError,
333 },
334 #[error("Database is already formatted yet")]
336 AlreadyFormatted,
337 #[error("Storage backend has canceled a writing request")]
339 WriteRequestCancelled,
340 #[error("Storage item write error")]
342 StorageItemWriteError {
343 #[from]
345 error: io::Error,
346 },
347}
348
349#[derive(Debug, Copy, Clone)]
350struct ForkTip {
351 number: BlockNumber,
352 root: BlockRoot,
353}
354
355enum FullBlock<'a, Block>
356where
357 Block: GenericOwnedBlock,
358{
359 InMemory(&'a Block),
360 Persisted {
361 header: &'a Block::Header,
362 write_location: WriteLocation,
363 },
364}
365
366#[derive(Debug)]
367struct BeaconChainBlockDetails {
368 shard_segment_roots: StdArc<[ShardSegmentRoot]>,
370}
371
372impl BeaconChainBlockDetails {
373 fn from_body(body: &BeaconChainBody<'_>) -> Self {
374 let shard_segment_roots = body
375 .intermediate_shard_blocks()
376 .iter()
377 .flat_map(|intermediate_shard_block_info| {
378 let own_segments = intermediate_shard_block_info
379 .own_segments
380 .into_iter()
381 .flat_map({
382 let shard_index = intermediate_shard_block_info.header.prefix.shard_index;
383
384 move |own_segments| {
385 (own_segments.first_local_segment_index..)
386 .zip(own_segments.segment_roots)
387 .map(move |(segment_index, &segment_root)| ShardSegmentRoot {
388 shard_index,
389 segment_index,
390 segment_root,
391 })
392 }
393 });
394 let child_shard_segment_roots = intermediate_shard_block_info
395 .leaf_shards_segments()
396 .flat_map(move |(shard_index, own_segments)| {
397 (own_segments.first_local_segment_index..)
398 .zip(own_segments.segment_roots)
399 .map(move |(segment_index, &segment_root)| ShardSegmentRoot {
400 shard_index,
401 segment_index,
402 segment_root,
403 })
404 });
405
406 own_segments.chain(child_shard_segment_roots)
407 })
408 .collect();
409
410 Self {
411 shard_segment_roots,
412 }
413 }
414}
415
416#[derive(Debug)]
423enum ClientDatabaseBlock<Block>
424where
425 Block: GenericOwnedBlock,
426{
427 InMemory {
429 block: Block,
430 block_details: BlockDetails,
431 beacon_chain_block_details: Option<BeaconChainBlockDetails>,
433 },
434 Persisted {
436 header: Block::Header,
437 block_details: BlockDetails,
438 beacon_chain_block_details: Option<BeaconChainBlockDetails>,
440 write_location: WriteLocation,
441 },
442 PersistedConfirmed {
445 header: Block::Header,
446 beacon_chain_block_details: Option<BeaconChainBlockDetails>,
448 write_location: WriteLocation,
449 },
450}
451
452impl<Block> ClientDatabaseBlock<Block>
453where
454 Block: GenericOwnedBlock,
455{
456 #[inline(always)]
457 fn header(&self) -> &Block::Header {
458 match self {
459 Self::InMemory { block, .. } => block.header(),
460 Self::Persisted { header, .. } | Self::PersistedConfirmed { header, .. } => header,
461 }
462 }
463
464 #[inline(always)]
465 fn full_block(&self) -> FullBlock<'_, Block> {
466 match self {
467 Self::InMemory { block, .. } => FullBlock::InMemory(block),
468 Self::Persisted {
469 header,
470 write_location,
471 ..
472 }
473 | Self::PersistedConfirmed {
474 header,
475 write_location,
476 ..
477 } => FullBlock::Persisted {
478 header,
479 write_location: *write_location,
480 },
481 }
482 }
483
484 #[inline(always)]
485 fn block_details(&self) -> Option<&BlockDetails> {
486 match self {
487 Self::InMemory { block_details, .. } | Self::Persisted { block_details, .. } => {
488 Some(block_details)
489 }
490 Self::PersistedConfirmed { .. } => None,
491 }
492 }
493
494 #[inline(always)]
495 fn beacon_chain_block_details(&self) -> Option<&BeaconChainBlockDetails> {
496 match self {
497 Self::InMemory {
498 beacon_chain_block_details,
499 ..
500 }
501 | Self::Persisted {
502 beacon_chain_block_details,
503 ..
504 }
505 | Self::PersistedConfirmed {
506 beacon_chain_block_details,
507 ..
508 } => beacon_chain_block_details.as_ref(),
509 }
510 }
511}
512
513#[derive(Debug)]
514struct StateData<Block>
515where
516 Block: GenericOwnedBlock,
517{
518 fork_tips: VecDeque<ForkTip>,
523 block_roots: HashMap<BlockRoot, BlockNumber, BuildHasherDefault<BlockRootHasher>>,
528 blocks: VecDeque<SmallVec<[ClientDatabaseBlock<Block>; 2]>>,
543}
544
545#[derive(Debug)]
546struct SegmentHeadersCache {
547 segment_headers_cache: Vec<SegmentHeader>,
548}
549
550impl SegmentHeadersCache {
551 #[inline(always)]
552 fn last_segment_header(&self) -> Option<SegmentHeader> {
553 self.segment_headers_cache.last().copied()
554 }
555
556 #[inline(always)]
557 fn max_local_segment_index(&self) -> Option<LocalSegmentIndex> {
558 self.segment_headers_cache
559 .last()
560 .map(|segment_header| segment_header.index.as_inner())
561 }
562
563 #[inline(always)]
564 fn get_segment_header(&self, local_segment_index: LocalSegmentIndex) -> Option<SegmentHeader> {
565 self.segment_headers_cache
566 .get(u64::from(local_segment_index) as usize)
567 .copied()
568 }
569
570 fn add_segment_headers(
572 &mut self,
573 mut segment_headers: Vec<SegmentHeader>,
574 ) -> Result<Vec<SegmentHeader>, PersistSegmentHeadersError> {
575 self.segment_headers_cache.reserve(segment_headers.len());
576
577 let mut maybe_last_local_segment_index = self.max_local_segment_index();
578
579 if let Some(last_segment_index) = maybe_last_local_segment_index {
580 segment_headers
582 .retain(|segment_header| segment_header.index.as_inner() > last_segment_index);
583 }
584
585 for segment_header in segment_headers.iter().copied() {
588 let local_segment_index = segment_header.index.as_inner();
589 if let Some(last_local_segment_index) = maybe_last_local_segment_index {
590 if local_segment_index != last_local_segment_index + LocalSegmentIndex::ONE {
591 return Err(PersistSegmentHeadersError::MustFollowLastSegmentIndex {
592 local_segment_index,
593 last_local_segment_index,
594 });
595 }
596
597 self.segment_headers_cache.push(segment_header);
598 maybe_last_local_segment_index.replace(local_segment_index);
599 } else {
600 if local_segment_index != LocalSegmentIndex::ZERO {
601 return Err(PersistSegmentHeadersError::FirstSegmentIndexZero {
602 local_segment_index,
603 });
604 }
605
606 self.segment_headers_cache.push(segment_header);
607 maybe_last_local_segment_index.replace(local_segment_index);
608 }
609 }
610
611 Ok(segment_headers)
612 }
613}
614
615#[derive(Debug)]
616struct SuperSegmentHeadersCache {
617 super_segment_headers_cache: Vec<SuperSegmentHeader>,
618}
619
620impl SuperSegmentHeadersCache {
621 #[inline(always)]
622 fn last_super_segment_header(&self) -> Option<SuperSegmentHeader> {
623 self.super_segment_headers_cache.last().copied()
624 }
625
626 #[inline]
627 fn previous_super_segment_header(
628 &self,
629 target_block_number: BlockNumber,
630 ) -> Option<SuperSegmentHeader> {
631 let block_number = target_block_number.checked_sub(BlockNumber::ONE)?;
632 let index = match self.super_segment_headers_cache.binary_search_by_key(
633 &block_number,
634 |super_segment_header| {
635 super_segment_header
636 .target_beacon_chain_block_number
637 .as_inner()
638 },
639 ) {
640 Ok(found_index) => found_index,
641 Err(insert_index) => insert_index.checked_sub(1)?,
642 };
643
644 self.super_segment_headers_cache.get(index).copied()
645 }
646
647 #[inline(always)]
648 fn get_super_segment_header(
649 &self,
650 local_segment_index: SuperSegmentIndex,
651 ) -> Option<SuperSegmentHeader> {
652 self.super_segment_headers_cache
653 .get(u64::from(local_segment_index) as usize)
654 .copied()
655 }
656
657 #[inline(always)]
658 fn get_super_segment_header_for_segment_index(
659 &self,
660 segment_index: SegmentIndex,
661 ) -> Option<SuperSegmentHeader> {
662 let index = self
663 .super_segment_headers_cache
664 .binary_search_by_key(&segment_index, |super_segment_header| {
665 super_segment_header.max_segment_index.as_inner()
666 })
667 .unwrap_or_else(|insert_index| insert_index);
668
669 let super_segment_header = self.super_segment_headers_cache.get(index).copied()?;
670
671 let max_segment_index = super_segment_header.max_segment_index.as_inner();
672 let first_segment_index = max_segment_index
673 - SegmentIndex::from(u64::from(super_segment_header.num_segments))
674 + SegmentIndex::ONE;
675
676 (first_segment_index..=max_segment_index)
677 .contains(&segment_index)
678 .then_some(super_segment_header)
679 }
680
681 fn add_super_segment_headers(
683 &mut self,
684 mut super_segment_headers: Vec<SuperSegmentHeader>,
685 ) -> Result<Vec<SuperSegmentHeader>, PersistSuperSegmentHeadersError> {
686 self.super_segment_headers_cache
687 .reserve(super_segment_headers.len());
688
689 let mut maybe_last_super_segment_index = self
690 .super_segment_headers_cache
691 .last()
692 .map(|header| header.index.as_inner());
693
694 if let Some(last_super_segment_index) = maybe_last_super_segment_index {
695 super_segment_headers.retain(|super_segment_header| {
697 super_segment_header.index.as_inner() > last_super_segment_index
698 });
699 }
700
701 for super_segment_header in super_segment_headers.iter().copied() {
704 let super_segment_index = super_segment_header.index.as_inner();
705 if let Some(last_super_segment_index) = maybe_last_super_segment_index {
706 if super_segment_index != last_super_segment_index + SuperSegmentIndex::ONE {
707 return Err(
708 PersistSuperSegmentHeadersError::MustFollowLastSegmentIndex {
709 super_segment_index,
710 last_super_segment_index,
711 },
712 );
713 }
714
715 self.super_segment_headers_cache.push(super_segment_header);
716 maybe_last_super_segment_index.replace(super_segment_index);
717 } else {
718 if super_segment_index != SuperSegmentIndex::ZERO {
719 return Err(PersistSuperSegmentHeadersError::FirstSegmentIndexZero {
720 super_segment_index,
721 });
722 }
723
724 self.super_segment_headers_cache.push(super_segment_header);
725 maybe_last_super_segment_index.replace(super_segment_index);
726 }
727 }
728
729 Ok(super_segment_headers)
730 }
731}
732
733#[derive(Debug)]
735struct State<Block, StorageBackend>
736where
737 Block: GenericOwnedBlock,
738{
739 data: StateData<Block>,
740 segment_headers_cache: SegmentHeadersCache,
741 super_segment_headers_cache: SuperSegmentHeadersCache,
742 storage_backend_adapter: AsyncRwLock<StorageBackendAdapter<StorageBackend>>,
743}
744
745impl<Block, StorageBackend> State<Block, StorageBackend>
746where
747 Block: GenericOwnedBlock,
748{
749 #[inline(always)]
750 fn best_tip(&self) -> &ForkTip {
751 self.data
752 .fork_tips
753 .front()
754 .expect("The best block is always present; qed")
755 }
756
757 #[inline(always)]
758 fn best_block(&self) -> &ClientDatabaseBlock<Block> {
759 self.data
760 .blocks
761 .front()
762 .expect("The best block is always present; qed")
763 .first()
764 .expect("The best block is always present; qed")
765 }
766}
767
768#[derive(Debug)]
769struct BlockToPersist<'a, Block>
770where
771 Block: GenericOwnedBlock,
772{
773 block_offset: usize,
774 fork_offset: usize,
775 block: &'a Block,
776 block_details: &'a BlockDetails,
777}
778
779#[derive(Debug)]
780struct PersistedBlock {
781 block_offset: usize,
782 fork_offset: usize,
783 write_location: WriteLocation,
784}
785
786#[derive(Debug)]
787struct ClientDatabaseInnerOptions {
788 block_confirmation_depth: BlockNumber,
789 soft_confirmation_depth: BlockNumber,
790 max_fork_tips: NonZeroUsize,
791 max_fork_tip_distance: BlockNumber,
792}
793
794#[derive(Debug)]
795struct Inner<Block, StorageBackend>
796where
797 Block: GenericOwnedBlock,
798{
799 state: AsyncRwLock<State<Block, StorageBackend>>,
800 options: ClientDatabaseInnerOptions,
801}
802
803#[derive(Debug)]
805pub struct ClientDatabase<Block, StorageBackend>
806where
807 Block: GenericOwnedBlock,
808{
809 inner: Arc<Inner<Block, StorageBackend>>,
810}
811
812impl<Block, StorageBackend> Clone for ClientDatabase<Block, StorageBackend>
813where
814 Block: GenericOwnedBlock,
815{
816 fn clone(&self) -> Self {
817 Self {
818 inner: self.inner.clone(),
819 }
820 }
821}
822
823#[expect(clippy::empty_drop, reason = "Not implemented yet")]
824impl<Block, StorageBackend> Drop for ClientDatabase<Block, StorageBackend>
825where
826 Block: GenericOwnedBlock,
827{
828 fn drop(&mut self) {
829 }
831}
832
833impl<Block, StorageBackend> ChainInfo<Block> for ClientDatabase<Block, StorageBackend>
834where
835 Block: GenericOwnedBlock,
836 StorageBackend: ClientDatabaseStorageBackend,
837{
838 #[inline]
839 fn best_root(&self) -> BlockRoot {
840 self.inner.state.read_blocking().best_tip().root
843 }
844
845 #[inline]
846 fn best_header(&self) -> Block::Header {
847 self.inner
850 .state
851 .read_blocking()
852 .best_block()
853 .header()
854 .clone()
855 }
856
857 #[inline]
858 fn best_header_with_details(&self) -> (Block::Header, BlockDetails) {
859 let state = self.inner.state.read_blocking();
862 let best_block = state.best_block();
863 (
864 best_block.header().clone(),
865 best_block
866 .block_details()
867 .expect("Always present for the best block; qed")
868 .clone(),
869 )
870 }
871
872 #[inline]
874 fn ancestor_header(
875 &self,
876 ancestor_block_number: BlockNumber,
877 descendant_block_root: &BlockRoot,
878 ) -> Option<Block::Header> {
879 let state = self.inner.state.read_blocking();
882 let best_number = state.best_tip().number;
883
884 let ancestor_block_offset =
885 u64::from(best_number.checked_sub(ancestor_block_number)?) as usize;
886 let ancestor_block_candidates = state.data.blocks.get(ancestor_block_offset)?;
887
888 let descendant_block_number = *state.data.block_roots.get(descendant_block_root)?;
889 if ancestor_block_number > descendant_block_number {
890 return None;
891 }
892 let descendant_block_offset =
893 u64::from(best_number.checked_sub(descendant_block_number)?) as usize;
894
895 let mut blocks_range_iter = state
897 .data
898 .blocks
899 .iter()
900 .enumerate()
901 .skip(descendant_block_offset);
902
903 let (_offset, descendant_block_candidates) = blocks_range_iter.next()?;
904 let descendant_header = descendant_block_candidates
905 .iter()
906 .find(|block| &*block.header().header().root() == descendant_block_root)?
907 .header()
908 .header();
909
910 if descendant_block_candidates.len() == 1 || ancestor_block_candidates.len() == 1 {
915 return ancestor_block_candidates
916 .iter()
917 .next()
918 .map(|block| block.header().clone());
919 }
920
921 let mut parent_block_root = &descendant_header.prefix.parent_root;
922
923 for (block_offset, parent_candidates) in blocks_range_iter {
925 let parent_header = parent_candidates
926 .iter()
927 .find(|header| &*header.header().header().root() == parent_block_root)?
928 .header();
929
930 if block_offset == ancestor_block_offset {
932 return Some(parent_header.clone());
933 }
934
935 parent_block_root = &parent_header.header().prefix.parent_root;
936 }
937
938 None
939 }
940
941 #[inline]
942 fn header(&self, block_root: &BlockRoot) -> Option<Block::Header> {
943 let state = self.inner.state.read_blocking();
946 let best_number = state.best_tip().number;
947
948 let block_number = *state.data.block_roots.get(block_root)?;
949 let block_offset = u64::from(best_number.checked_sub(block_number)?) as usize;
950 let block_candidates = state.data.blocks.get(block_offset)?;
951
952 block_candidates.iter().find_map(|block| {
953 let header = block.header();
954
955 if &*header.header().root() == block_root {
956 Some(header.clone())
957 } else {
958 None
959 }
960 })
961 }
962
963 #[inline]
964 fn header_with_details(&self, block_root: &BlockRoot) -> Option<(Block::Header, BlockDetails)> {
965 let state = self.inner.state.read_blocking();
968 let best_number = state.best_tip().number;
969
970 let block_number = *state.data.block_roots.get(block_root)?;
971 let block_offset = u64::from(best_number.checked_sub(block_number)?) as usize;
972 let block_candidates = state.data.blocks.get(block_offset)?;
973
974 block_candidates.iter().find_map(|block| {
975 let header = block.header();
976 let block_details = block.block_details().cloned()?;
977
978 if &*header.header().root() == block_root {
979 Some((header.clone(), block_details))
980 } else {
981 None
982 }
983 })
984 }
985
986 #[inline]
987 async fn block(&self, block_root: &BlockRoot) -> Result<Block, ReadBlockError> {
988 let state = self.inner.state.read().await;
989 let best_number = state.best_tip().number;
990
991 let block_number = *state
992 .data
993 .block_roots
994 .get(block_root)
995 .ok_or(ReadBlockError::UnknownBlockRoot)?;
996 let block_offset = u64::from(
997 best_number
998 .checked_sub(block_number)
999 .expect("Known block roots always have valid block offset; qed"),
1000 ) as usize;
1001 let block_candidates = state
1002 .data
1003 .blocks
1004 .get(block_offset)
1005 .expect("Valid block offsets always have block entries; qed");
1006
1007 for block_candidate in block_candidates {
1008 let header = block_candidate.header();
1009
1010 if &*header.header().root() == block_root {
1011 return match block_candidate.full_block() {
1012 FullBlock::InMemory(block) => Ok(block.clone()),
1013 FullBlock::Persisted {
1014 header,
1015 write_location,
1016 } => {
1017 let storage_backend_adapter = state.storage_backend_adapter.read().await;
1018
1019 let storage_item = storage_backend_adapter
1020 .read_storage_item::<StorageItemTemporary>(write_location)
1021 .await?;
1022
1023 let storage_item_block = match storage_item {
1024 StorageItemTemporary::Block(storage_item_block) => storage_item_block,
1025 StorageItemTemporary::SegmentHeaders(_) => {
1026 return Err(ReadBlockError::StorageItemReadError {
1027 error: io::Error::other(
1028 "Unexpected storage item: `SegmentHeaders`",
1029 ),
1030 });
1031 }
1032 StorageItemTemporary::SuperSegmentHeaders(_) => {
1033 return Err(ReadBlockError::StorageItemReadError {
1034 error: io::Error::other(
1035 "Unexpected storage item: `SuperSegmentHeaders`",
1036 ),
1037 });
1038 }
1039 };
1040
1041 let StorageItemTemporaryBlock {
1042 header: _,
1043 body,
1044 mmr_with_block: _,
1045 system_contract_states: _,
1046 } = storage_item_block;
1047
1048 Block::from_buffers(header.buffer().clone(), body)
1049 .ok_or(ReadBlockError::FailedToDecode)
1050 }
1051 };
1052 }
1053 }
1054
1055 unreachable!("Known block root always has block candidate associated with it; qed")
1056 }
1057
1058 #[inline]
1059 fn last_segment_header(&self) -> Option<SegmentHeader> {
1060 let state = self.inner.state.read_blocking();
1063 state.segment_headers_cache.last_segment_header()
1064 }
1065
1066 #[inline]
1067 fn get_segment_header(&self, segment_index: LocalSegmentIndex) -> Option<SegmentHeader> {
1068 let state = self.inner.state.read_blocking();
1071
1072 state
1073 .segment_headers_cache
1074 .get_segment_header(segment_index)
1075 }
1076
1077 fn segment_headers_for_block(&self, block_number: BlockNumber) -> Vec<SegmentHeader> {
1078 let state = self.inner.state.read_blocking();
1081
1082 let Some(last_local_segment_index) = state.segment_headers_cache.max_local_segment_index()
1083 else {
1084 return Vec::new();
1086 };
1087
1088 if Block::Block::SHARD_KIND == RealShardKind::BeaconChain
1090 && block_number == BlockNumber::ONE
1091 {
1092 return vec![
1095 state
1096 .segment_headers_cache
1097 .get_segment_header(LocalSegmentIndex::ZERO)
1098 .expect("Segment headers are stored in monotonically increasing order; qed"),
1099 ];
1100 }
1101
1102 if last_local_segment_index == LocalSegmentIndex::ZERO {
1103 return Vec::new();
1105 }
1106
1107 let mut current_local_segment_index = last_local_segment_index;
1108 loop {
1109 let current_segment_header = state
1112 .segment_headers_cache
1113 .get_segment_header(current_local_segment_index)
1114 .expect("Segment headers are stored in monotonically increasing order; qed");
1115
1116 let target_block_number = current_segment_header.last_archived_block.number()
1118 + BlockNumber::ONE
1119 + self.inner.options.block_confirmation_depth;
1120 if target_block_number == block_number {
1121 let mut headers_for_block = vec![current_segment_header];
1122
1123 let last_archived_block_number = current_segment_header.last_archived_block.number;
1125 let mut local_segment_index = current_local_segment_index - LocalSegmentIndex::ONE;
1126
1127 while let Some(segment_header) = state
1128 .segment_headers_cache
1129 .get_segment_header(local_segment_index)
1130 {
1131 if segment_header.last_archived_block.number == last_archived_block_number {
1132 headers_for_block.insert(0, segment_header);
1133 local_segment_index -= LocalSegmentIndex::ONE;
1134 } else {
1135 break;
1136 }
1137 }
1138
1139 return headers_for_block;
1140 }
1141
1142 if target_block_number > block_number {
1144 if current_local_segment_index > LocalSegmentIndex::ONE {
1146 current_local_segment_index -= LocalSegmentIndex::ONE;
1147 } else {
1148 break;
1149 }
1150 } else {
1151 return Vec::new();
1153 }
1154 }
1155
1156 Vec::new()
1158 }
1159}
1160
1161impl<Block, StorageBackend> ChainInfoWrite<Block> for ClientDatabase<Block, StorageBackend>
1162where
1163 Block: GenericOwnedBlock,
1164 StorageBackend: ClientDatabaseStorageBackend,
1165{
1166 async fn persist_block(
1167 &self,
1168 block: Block,
1169 block_details: BlockDetails,
1170 ) -> Result<(), PersistBlockError> {
1171 let mut state = self.inner.state.write().await;
1172 let best_number = state.best_tip().number;
1173
1174 let header = block.header().header();
1175
1176 let block_number = header.prefix.number;
1177
1178 if best_number == BlockNumber::ZERO && block_number != BlockNumber::ONE {
1179 Self::insert_first_block(&mut state.data, block, block_details);
1181
1182 return Ok(());
1183 }
1184
1185 if block_number == best_number + BlockNumber::ONE {
1186 return Self::insert_new_best_block(state, &self.inner, block, block_details).await;
1187 }
1188
1189 let block_offset = u64::from(
1190 best_number
1191 .checked_sub(block_number)
1192 .ok_or(PersistBlockError::MissingParent)?,
1193 ) as usize;
1194
1195 if block_offset >= u64::from(self.inner.options.block_confirmation_depth) as usize {
1196 return Err(PersistBlockError::OutsideAcceptableRange);
1197 }
1198
1199 let state = &mut *state;
1200
1201 let block_forks = state.data.blocks.get_mut(block_offset).ok_or_else(|| {
1202 error!(
1203 %block_number,
1204 %block_offset,
1205 "Failed to store block fork, header offset is missing despite being within \
1206 acceptable range"
1207 );
1208
1209 PersistBlockError::OutsideAcceptableRange
1210 })?;
1211
1212 for (index, fork_tip) in state.data.fork_tips.iter_mut().enumerate() {
1213 if fork_tip.root == header.prefix.parent_root {
1215 state.data.fork_tips.remove(index);
1216 break;
1217 }
1218 }
1219
1220 let block_root = *header.root();
1221 state.data.fork_tips.insert(
1224 1,
1225 ForkTip {
1226 number: block_number,
1227 root: block_root,
1228 },
1229 );
1230 state.data.block_roots.insert(block_root, block_number);
1231 let beacon_chain_block_details = <dyn Any>::downcast_ref::<OwnedBeaconChainBlock>(&block)
1232 .map(|block| BeaconChainBlockDetails::from_body(block.body.body()));
1233 block_forks.push(ClientDatabaseBlock::InMemory {
1234 block,
1235 block_details,
1236 beacon_chain_block_details,
1237 });
1238
1239 Self::prune_outdated_fork_tips(block_number, &mut state.data, &self.inner.options);
1240
1241 Ok(())
1242 }
1243
1244 async fn persist_segment_headers(
1245 &self,
1246 segment_headers: Vec<SegmentHeader>,
1247 ) -> Result<(), PersistSegmentHeadersError> {
1248 let mut state = self.inner.state.write().await;
1249
1250 let added_segment_headers = state
1251 .segment_headers_cache
1252 .add_segment_headers(segment_headers)?;
1253
1254 if added_segment_headers.is_empty() {
1255 return Ok(());
1256 }
1257
1258 let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1263
1264 let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1265
1266 storage_backend_adapter
1267 .write_storage_item(StorageItemTemporary::SegmentHeaders(
1268 StorageItemTemporarySegmentHeaders {
1269 segment_headers: added_segment_headers,
1270 },
1271 ))
1272 .await?;
1273
1274 Ok(())
1275 }
1276}
1277
1278impl<StorageBackend> BeaconChainInfo for ClientDatabase<OwnedBeaconChainBlock, StorageBackend>
1279where
1280 StorageBackend: ClientDatabaseStorageBackend,
1281{
1282 fn shard_segment_roots(
1283 &self,
1284 block_number: BlockNumber,
1285 ) -> Result<StdArc<[ShardSegmentRoot]>, ShardSegmentRootsError> {
1286 let state = self.inner.state.read_blocking();
1289 let best_number = state.best_tip().number;
1290
1291 let block_offset = u64::from(
1292 best_number
1293 .checked_sub(block_number)
1294 .ok_or(ShardSegmentRootsError::BlockMissing { block_number })?,
1295 ) as usize;
1296
1297 let block = state
1298 .data
1299 .blocks
1300 .get(block_offset)
1301 .ok_or(ShardSegmentRootsError::BlockMissing { block_number })?
1302 .first()
1303 .expect("There is always at least one block candidate; qed");
1304
1305 Ok(StdArc::clone(
1306 &block
1307 .beacon_chain_block_details()
1308 .as_ref()
1309 .expect("Always present in the beacon chain block; qed")
1310 .shard_segment_roots,
1311 ))
1312 }
1313
1314 #[inline]
1315 fn last_super_segment_header(&self) -> Option<SuperSegmentHeader> {
1316 let state = self.inner.state.read_blocking();
1319 state
1320 .super_segment_headers_cache
1321 .last_super_segment_header()
1322 }
1323
1324 #[inline]
1325 fn previous_super_segment_header(
1326 &self,
1327 block_number: BlockNumber,
1328 ) -> Option<SuperSegmentHeader> {
1329 let state = self.inner.state.read_blocking();
1332
1333 state
1334 .super_segment_headers_cache
1335 .previous_super_segment_header(block_number)
1336 }
1337
1338 #[inline]
1339 fn get_super_segment_header(
1340 &self,
1341 super_segment_index: SuperSegmentIndex,
1342 ) -> Option<SuperSegmentHeader> {
1343 let state = self.inner.state.read_blocking();
1346
1347 state
1348 .super_segment_headers_cache
1349 .get_super_segment_header(super_segment_index)
1350 }
1351
1352 fn get_super_segment_header_for_segment_index(
1353 &self,
1354 segment_index: SegmentIndex,
1355 ) -> Option<SuperSegmentHeader> {
1356 let state = self.inner.state.read_blocking();
1359
1360 state
1361 .super_segment_headers_cache
1362 .get_super_segment_header_for_segment_index(segment_index)
1363 }
1364}
1365
1366impl<StorageBackend> BeaconChainInfoWrite for ClientDatabase<OwnedBeaconChainBlock, StorageBackend>
1367where
1368 StorageBackend: ClientDatabaseStorageBackend,
1369{
1370 async fn persist_super_segment_header(
1371 &self,
1372 super_segment_header: SuperSegmentHeader,
1373 ) -> Result<bool, PersistSuperSegmentHeadersError> {
1374 let mut state = self.inner.state.write().await;
1375
1376 let added_super_segment_headers = state
1377 .super_segment_headers_cache
1378 .add_super_segment_headers(vec![super_segment_header])?;
1379
1380 if added_super_segment_headers.is_empty() {
1381 return Ok(false);
1382 }
1383
1384 let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1389
1390 let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1391
1392 storage_backend_adapter
1393 .write_storage_item(StorageItemTemporary::SuperSegmentHeaders(
1394 StorageItemTemporarySuperSegmentHeaders {
1395 super_segment_headers: added_super_segment_headers,
1396 },
1397 ))
1398 .await?;
1399
1400 Ok(true)
1401 }
1402
1403 async fn persist_super_segment_headers(
1404 &self,
1405 super_segment_headers: Vec<SuperSegmentHeader>,
1406 ) -> Result<(), PersistSuperSegmentHeadersError> {
1407 let mut state = self.inner.state.write().await;
1408
1409 let added_super_segment_headers = state
1410 .super_segment_headers_cache
1411 .add_super_segment_headers(super_segment_headers)?;
1412
1413 if added_super_segment_headers.is_empty() {
1414 return Ok(());
1415 }
1416
1417 let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1422
1423 let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1424
1425 storage_backend_adapter
1426 .write_storage_item(StorageItemTemporary::SuperSegmentHeaders(
1427 StorageItemTemporarySuperSegmentHeaders {
1428 super_segment_headers: added_super_segment_headers,
1429 },
1430 ))
1431 .await?;
1432
1433 Ok(())
1434 }
1435}
1436
1437impl<Block, StorageBackend> ClientDatabase<Block, StorageBackend>
1438where
1439 Block: GenericOwnedBlock,
1440 StorageBackend: ClientDatabaseStorageBackend,
1441{
1442 pub async fn open<GBB>(
1446 options: ClientDatabaseOptions<GBB, StorageBackend>,
1447 ) -> Result<Self, ClientDatabaseError>
1448 where
1449 GBB: FnOnce() -> GenesisBlockBuilderResult<Block>,
1450 {
1451 let ClientDatabaseOptions {
1452 write_buffer_size,
1453 block_confirmation_depth,
1454 soft_confirmation_depth,
1455 max_fork_tips,
1456 max_fork_tip_distance,
1457 genesis_block_builder,
1458 storage_backend,
1459 } = options;
1460 if soft_confirmation_depth >= block_confirmation_depth {
1461 return Err(ClientDatabaseError::InvalidSoftConfirmationDepth);
1462 }
1463
1464 if max_fork_tip_distance > block_confirmation_depth {
1465 return Err(ClientDatabaseError::InvalidMaxForkTipDistance);
1466 }
1467
1468 let mut state_data = StateData {
1469 fork_tips: VecDeque::new(),
1470 block_roots: HashMap::default(),
1471 blocks: VecDeque::new(),
1472 };
1473 let mut segment_headers_cache = SegmentHeadersCache {
1474 segment_headers_cache: Vec::new(),
1475 };
1476 let mut super_segment_headers_cache = SuperSegmentHeadersCache {
1477 super_segment_headers_cache: Vec::new(),
1478 };
1479
1480 let options = ClientDatabaseInnerOptions {
1481 block_confirmation_depth,
1482 soft_confirmation_depth,
1483 max_fork_tips,
1484 max_fork_tip_distance,
1485 };
1486
1487 let storage_item_handlers = StorageItemHandlers {
1488 permanent: |_arg| {
1489 Ok(())
1491 },
1492 temporary: |arg| {
1493 let StorageItemHandlerArg {
1494 storage_item,
1495 page_offset,
1496 num_pages,
1497 } = arg;
1498 let storage_item_block = match storage_item {
1499 StorageItemTemporary::Block(storage_item_block) => storage_item_block,
1500 StorageItemTemporary::SegmentHeaders(segment_headers) => {
1501 let num_segment_headers = segment_headers.segment_headers.len();
1502 return match segment_headers_cache
1503 .add_segment_headers(segment_headers.segment_headers)
1504 {
1505 Ok(_) => Ok(()),
1506 Err(error) => {
1507 error!(
1508 %page_offset,
1509 %num_segment_headers,
1510 %error,
1511 "Failed to add segment headers from storage item"
1512 );
1513
1514 Err(ClientDatabaseError::InvalidSegmentHeaders { page_offset })
1515 }
1516 };
1517 }
1518 StorageItemTemporary::SuperSegmentHeaders(super_segment_headers) => {
1519 let num_super_segment_headers =
1520 super_segment_headers.super_segment_headers.len();
1521 return match super_segment_headers_cache
1522 .add_super_segment_headers(super_segment_headers.super_segment_headers)
1523 {
1524 Ok(_) => Ok(()),
1525 Err(error) => {
1526 error!(
1527 %page_offset,
1528 %num_super_segment_headers,
1529 %error,
1530 "Failed to add segment headers from storage item"
1531 );
1532
1533 Err(ClientDatabaseError::InvalidSegmentHeaders { page_offset })
1534 }
1535 };
1536 }
1537 };
1538
1539 let StorageItemTemporaryBlock {
1542 header,
1543 body,
1544 mmr_with_block,
1545 system_contract_states,
1546 } = storage_item_block;
1547
1548 let header = Block::Header::from_buffer(header).map_err(|_buffer| {
1549 error!(%page_offset, "Failed to decode block header from bytes");
1550
1551 ClientDatabaseError::InvalidBlock { page_offset }
1552 })?;
1553 let body = Block::Body::from_buffer(body).map_err(|_buffer| {
1554 error!(%page_offset, "Failed to decode block body from bytes");
1555
1556 ClientDatabaseError::InvalidBlock { page_offset }
1557 })?;
1558
1559 let block_root = *header.header().root();
1560 let block_number = header.header().prefix.number;
1561
1562 state_data.block_roots.insert(block_root, block_number);
1563
1564 let maybe_best_number = state_data
1565 .blocks
1566 .front()
1567 .and_then(|block_forks| block_forks.first())
1568 .map(|best_block| {
1569 let header: &Block::Header = best_block.header();
1571
1572 header.header().prefix.number
1573 });
1574
1575 let block_offset = if let Some(best_number) = maybe_best_number {
1576 if block_number <= best_number {
1577 u64::from(best_number - block_number) as usize
1578 } else {
1579 if block_number - best_number != BlockNumber::ONE {
1581 error!(
1582 %page_offset,
1583 %best_number,
1584 %block_number,
1585 "Invalid new best block number, it must be only one block \
1586 higher than the best block"
1587 );
1588
1589 return Err(ClientDatabaseError::InvalidBlock { page_offset });
1590 }
1591
1592 state_data.blocks.push_front(SmallVec::new());
1593 0
1595 }
1596 } else {
1597 state_data.blocks.push_front(SmallVec::new());
1598 0
1600 };
1601
1602 let Some(block_forks) = state_data.blocks.get_mut(block_offset) else {
1603 return Ok(());
1606 };
1607
1608 let beacon_chain_block_details =
1610 <dyn Any>::downcast_ref::<OwnedBeaconChainBody>(&body)
1611 .map(|body| BeaconChainBlockDetails::from_body(body.body()));
1612 block_forks.push(ClientDatabaseBlock::Persisted {
1613 header,
1614 block_details: BlockDetails {
1615 mmr_with_block,
1616 system_contract_states,
1617 },
1618 beacon_chain_block_details,
1619 write_location: WriteLocation {
1620 page_offset,
1621 num_pages,
1622 },
1623 });
1624
1625 if block_offset == 0 && block_forks.len() == 1 {
1628 Self::confirm_canonical_block(block_number, &mut state_data, &options);
1629 }
1630
1631 Ok(())
1632 },
1633 };
1634
1635 let storage_backend_adapter =
1636 StorageBackendAdapter::open(write_buffer_size, storage_item_handlers, storage_backend)
1637 .await?;
1638
1639 if let Some(best_block) = state_data.blocks.front().and_then(|block_forks| {
1640 block_forks.last()
1643 }) {
1644 let header: &Block::Header = best_block.header();
1646 let header = header.header();
1647 let block_number = header.prefix.number;
1648 let block_root = *header.root();
1649
1650 if !Self::adjust_ancestor_block_forks(&mut state_data.blocks, block_root) {
1651 return Err(ClientDatabaseError::FailedToAdjustAncestorBlockForks);
1652 }
1653
1654 state_data.fork_tips.push_front(ForkTip {
1656 number: block_number,
1657 root: block_root,
1658 });
1659 } else {
1660 let GenesisBlockBuilderResult {
1661 block,
1662 system_contract_states,
1663 } = genesis_block_builder();
1664
1665 let header = block.header().header();
1667 let block_number = header.prefix.number;
1668 let block_root = *header.root();
1669
1670 state_data.fork_tips.push_front(ForkTip {
1671 number: block_number,
1672 root: block_root,
1673 });
1674 state_data.block_roots.insert(block_root, block_number);
1675 let beacon_chain_block_details =
1676 <dyn Any>::downcast_ref::<OwnedBeaconChainBlock>(&block)
1677 .map(|block| BeaconChainBlockDetails::from_body(block.body.body()));
1678 state_data
1679 .blocks
1680 .push_front(smallvec![ClientDatabaseBlock::InMemory {
1681 block,
1682 block_details: BlockDetails {
1683 system_contract_states,
1684 mmr_with_block: Arc::new({
1685 let mut mmr = BlockMerkleMountainRange::new();
1686 mmr.add_leaf(&block_root);
1687 mmr
1688 })
1689 },
1690 beacon_chain_block_details,
1691 }]);
1692 }
1693
1694 let state = State {
1695 data: state_data,
1696 segment_headers_cache,
1697 super_segment_headers_cache,
1698 storage_backend_adapter: AsyncRwLock::new(storage_backend_adapter),
1699 };
1700
1701 let inner = Inner {
1702 state: AsyncRwLock::new(state),
1703 options,
1704 };
1705
1706 Ok(Self {
1707 inner: Arc::new(inner),
1708 })
1709 }
1710
1711 pub async fn format(
1713 storage_backend: &StorageBackend,
1714 options: ClientDatabaseFormatOptions,
1715 ) -> Result<(), ClientDatabaseFormatError> {
1716 StorageBackendAdapter::format(storage_backend, options).await
1717 }
1718
1719 fn insert_first_block(state: &mut StateData<Block>, block: Block, block_details: BlockDetails) {
1720 let header = block.header().header();
1722 let block_number = header.prefix.number;
1723 let block_root = *header.root();
1724
1725 state.fork_tips.clear();
1726 state.fork_tips.push_front(ForkTip {
1727 number: block_number,
1728 root: block_root,
1729 });
1730 state.block_roots.clear();
1731 state.block_roots.insert(block_root, block_number);
1732 state.blocks.clear();
1733 let beacon_chain_block_details = <dyn Any>::downcast_ref::<OwnedBeaconChainBlock>(&block)
1734 .map(|block| BeaconChainBlockDetails::from_body(block.body.body()));
1735 state
1736 .blocks
1737 .push_front(smallvec![ClientDatabaseBlock::InMemory {
1738 block,
1739 block_details,
1740 beacon_chain_block_details,
1741 }]);
1742 }
1743
1744 async fn insert_new_best_block(
1745 mut state: AsyncRwLockWriteGuard<'_, State<Block, StorageBackend>>,
1746 inner: &Inner<Block, StorageBackend>,
1747 block: Block,
1748 block_details: BlockDetails,
1749 ) -> Result<(), PersistBlockError> {
1750 let header = block.header().header();
1751 let block_number = header.prefix.number;
1752 let block_root = *header.root();
1753 let parent_root = header.prefix.parent_root;
1754
1755 if !Self::adjust_ancestor_block_forks(&mut state.data.blocks, parent_root) {
1758 return Err(PersistBlockError::MissingParent);
1759 }
1760
1761 {
1763 for (index, fork_tip) in state.data.fork_tips.iter_mut().enumerate() {
1764 if fork_tip.root == parent_root {
1766 state.data.fork_tips.remove(index);
1767 break;
1768 }
1769 }
1770
1771 state.data.fork_tips.push_front(ForkTip {
1772 number: block_number,
1773 root: block_root,
1774 });
1775 state.data.block_roots.insert(block_root, block_number);
1776 let beacon_chain_block_details =
1777 <dyn Any>::downcast_ref::<OwnedBeaconChainBlock>(&block)
1778 .map(|block| BeaconChainBlockDetails::from_body(block.body.body()));
1779 state
1780 .data
1781 .blocks
1782 .push_front(smallvec![ClientDatabaseBlock::InMemory {
1783 block,
1784 block_details: block_details.clone(),
1785 beacon_chain_block_details,
1786 }]);
1787 }
1788
1789 let options = &inner.options;
1790
1791 Self::confirm_canonical_block(block_number, &mut state.data, options);
1792 Self::prune_outdated_fork_tips(block_number, &mut state.data, options);
1793
1794 let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1799
1800 let mut blocks_to_persist = Vec::new();
1801 for block_offset in u64::from(options.soft_confirmation_depth) as usize.. {
1802 let Some(fork_blocks) = state.data.blocks.get(block_offset) else {
1803 break;
1804 };
1805
1806 let len_before = blocks_to_persist.len();
1807 fork_blocks
1808 .iter()
1809 .enumerate()
1810 .filter_map(|(fork_offset, client_database_block)| {
1811 match client_database_block {
1812 ClientDatabaseBlock::InMemory {
1813 block,
1814 block_details,
1815 beacon_chain_block_details: _,
1816 } => Some(BlockToPersist {
1817 block_offset,
1818 fork_offset,
1819 block,
1820 block_details,
1821 }),
1822 ClientDatabaseBlock::Persisted { .. }
1823 | ClientDatabaseBlock::PersistedConfirmed { .. } => {
1824 None
1826 }
1827 }
1828 })
1829 .collect_into(&mut blocks_to_persist);
1830
1831 if blocks_to_persist.len() == len_before {
1832 break;
1833 }
1834 }
1835
1836 let mut persisted_blocks = Vec::with_capacity(blocks_to_persist.len());
1838 {
1839 let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1840
1841 for block_to_persist in blocks_to_persist.into_iter().rev() {
1842 let BlockToPersist {
1843 block_offset,
1844 fork_offset,
1845 block,
1846 block_details,
1847 } = block_to_persist;
1848
1849 let write_location = storage_backend_adapter
1850 .write_storage_item(StorageItemTemporary::Block(StorageItemTemporaryBlock {
1851 header: block.header().buffer().clone(),
1852 body: block.body().buffer().clone(),
1853 mmr_with_block: Arc::clone(&block_details.mmr_with_block),
1854 system_contract_states: StdArc::clone(
1855 &block_details.system_contract_states,
1856 ),
1857 }))
1858 .await?;
1859
1860 persisted_blocks.push(PersistedBlock {
1861 block_offset,
1862 fork_offset,
1863 write_location,
1864 });
1865 }
1866 }
1867
1868 let mut state = RwLockUpgradableReadGuard::upgrade(state).await;
1870 for persisted_block in persisted_blocks {
1871 let PersistedBlock {
1872 block_offset,
1873 fork_offset,
1874 write_location,
1875 } = persisted_block;
1876
1877 let block = state
1878 .data
1879 .blocks
1880 .get_mut(block_offset)
1881 .expect("Still holding the same lock since last check; qed")
1882 .get_mut(fork_offset)
1883 .expect("Still holding the same lock since last check; qed");
1884
1885 replace_with_or_abort(block, |block| {
1886 if let ClientDatabaseBlock::InMemory {
1887 block,
1888 block_details,
1889 beacon_chain_block_details,
1890 } = block
1891 {
1892 let (header, _body) = block.split();
1893
1894 ClientDatabaseBlock::Persisted {
1895 header,
1896 block_details,
1897 beacon_chain_block_details,
1898 write_location,
1899 }
1900 } else {
1901 unreachable!("Still holding the same lock since last check; qed");
1902 }
1903 });
1904 }
1905
1906 Ok(())
1910 }
1911
1912 #[must_use]
1917 fn adjust_ancestor_block_forks(
1918 blocks: &mut VecDeque<SmallVec<[ClientDatabaseBlock<Block>; 2]>>,
1919 mut parent_block_root: BlockRoot,
1920 ) -> bool {
1921 let mut ancestor_blocks = blocks.iter_mut();
1922
1923 loop {
1924 if ancestor_blocks.len() == 1 {
1925 break;
1927 }
1928
1929 let Some(parent_blocks) = ancestor_blocks.next() else {
1930 break;
1932 };
1933
1934 let Some(fork_offset_parent_block_root) =
1935 parent_blocks
1936 .iter()
1937 .enumerate()
1938 .find_map(|(fork_offset, fork_block)| {
1939 let fork_header = fork_block.header().header();
1940 if *fork_header.root() == parent_block_root {
1941 Some((fork_offset, fork_header.prefix.parent_root))
1942 } else {
1943 None
1944 }
1945 })
1946 else {
1947 return false;
1948 };
1949
1950 let fork_offset;
1951 (fork_offset, parent_block_root) = fork_offset_parent_block_root;
1952
1953 parent_blocks.swap(0, fork_offset);
1954 }
1955
1956 true
1957 }
1958
1959 fn prune_outdated_fork_tips(
1966 best_number: BlockNumber,
1967 state: &mut StateData<Block>,
1968 options: &ClientDatabaseInnerOptions,
1969 ) {
1970 let state = &mut *state;
1971
1972 let mut candidate_forks_to_remove = Vec::with_capacity(options.max_fork_tips.get());
1975
1976 state.fork_tips.retain(|fork_tip| {
1978 if best_number - fork_tip.number > options.max_fork_tip_distance {
1979 candidate_forks_to_remove.push(*fork_tip);
1980 false
1981 } else {
1982 true
1983 }
1984 });
1985 if state.fork_tips.len() > options.max_fork_tips.get() {
1987 state
1988 .fork_tips
1989 .drain(options.max_fork_tips.get()..)
1990 .collect_into(&mut candidate_forks_to_remove);
1991 }
1992
1993 candidate_forks_to_remove
1995 .retain(|fork_tip| !Self::prune_outdated_fork(best_number, fork_tip, state));
1996 state.fork_tips.extend(candidate_forks_to_remove);
1998 }
1999
2000 #[must_use]
2003 fn prune_outdated_fork(
2004 best_number: BlockNumber,
2005 fork_tip: &ForkTip,
2006 state: &mut StateData<Block>,
2007 ) -> bool {
2008 let block_offset = u64::from(best_number - fork_tip.number) as usize;
2009
2010 let mut block_root_to_prune = fork_tip.root;
2012 let mut pruned_tip = false;
2013 for block_offset in block_offset.. {
2014 let Some(fork_blocks) = state.blocks.get_mut(block_offset) else {
2015 if !pruned_tip {
2016 error!(
2017 %best_number,
2018 ?fork_tip,
2019 block_offset,
2020 "Block offset was not present in the database, this is an implementation \
2021 bug #1"
2022 );
2023 }
2024 break;
2026 };
2027
2028 if fork_blocks.len() == 1 {
2029 if !pruned_tip {
2030 error!(
2031 %best_number,
2032 ?fork_tip,
2033 block_offset,
2034 "Block offset was not present in the database, this is an implementation \
2035 bug #2"
2036 );
2037 }
2038
2039 break;
2041 }
2042
2043 let Some((fork_offset, block)) = fork_blocks
2044 .iter()
2045 .enumerate()
2046 .skip(1)
2048 .find(|(_fork_offset, block)| {
2049 *block.header().header().root() == block_root_to_prune
2050 })
2051 else {
2052 if !pruned_tip {
2053 error!(
2054 %best_number,
2055 ?fork_tip,
2056 block_offset,
2057 "Block offset was not present in the database, this is an implementation \
2058 bug #3"
2059 );
2060 }
2061
2062 break;
2064 };
2065
2066 if block.header().ref_count() > 1 {
2068 break;
2069 }
2070
2071 match block {
2073 ClientDatabaseBlock::InMemory { .. } => {
2074 }
2076 ClientDatabaseBlock::Persisted { .. }
2077 | ClientDatabaseBlock::PersistedConfirmed { .. } => {
2078 pruned_tip = true;
2080 break;
2081 }
2082 }
2083
2084 state.block_roots.get_mut(&block_root_to_prune);
2085 block_root_to_prune = block.header().header().prefix.parent_root;
2086 fork_blocks.swap_remove(fork_offset);
2087
2088 pruned_tip = true;
2089 }
2090
2091 pruned_tip
2092 }
2093
2094 fn confirm_canonical_block(
2097 best_number: BlockNumber,
2098 state_data: &mut StateData<Block>,
2099 options: &ClientDatabaseInnerOptions,
2100 ) {
2101 let block_offset = u64::from(options.block_confirmation_depth + BlockNumber::ONE) as usize;
2105
2106 let Some(fork_blocks) = state_data.blocks.get_mut(block_offset) else {
2107 return;
2109 };
2110
2111 {
2113 let Some(canonical_block) = fork_blocks.first_mut() else {
2114 error!(
2115 %best_number,
2116 block_offset,
2117 "Have not found a canonical block to confirm, this is an implementation bug"
2118 );
2119 return;
2120 };
2121
2122 replace_with_or_abort(canonical_block, |block| match block {
2123 ClientDatabaseBlock::InMemory { .. } => {
2124 error!(
2125 %best_number,
2126 block_offset,
2127 header = ?block.header(),
2128 "Block to be confirmed must not be in memory, this is an implementation bug"
2129 );
2130 block
2131 }
2132 ClientDatabaseBlock::Persisted {
2133 header,
2134 block_details: _,
2135 beacon_chain_block_details,
2136 write_location,
2137 } => ClientDatabaseBlock::PersistedConfirmed {
2138 header,
2139 beacon_chain_block_details,
2140 write_location,
2141 },
2142 ClientDatabaseBlock::PersistedConfirmed { .. } => {
2143 error!(
2144 %best_number,
2145 block_offset,
2146 header = ?block.header(),
2147 "Block to be confirmed must not be confirmed yet, this is an \
2148 implementation bug"
2149 );
2150 block
2151 }
2152 });
2153 }
2154
2155 let mut block_roots_to_prune = fork_blocks
2157 .drain(1..)
2158 .map(|block| *block.header().header().root())
2159 .collect::<Vec<_>>();
2160 let mut current_block_offset = block_offset;
2161 while !block_roots_to_prune.is_empty() {
2162 state_data
2164 .fork_tips
2165 .retain(|fork_tip| !block_roots_to_prune.contains(&fork_tip.root));
2166
2167 for block_root in &block_roots_to_prune {
2169 state_data.block_roots.remove(block_root);
2170 }
2171
2172 if let Some(next_block_offset) = current_block_offset.checked_sub(1) {
2174 current_block_offset = next_block_offset;
2175 } else {
2176 break;
2178 }
2179
2180 let fork_blocks = state_data
2181 .blocks
2182 .get_mut(current_block_offset)
2183 .expect("Lower block offset always exists; qed");
2184
2185 block_roots_to_prune = fork_blocks
2187 .drain_filter(|block| {
2188 let header = block.header().header();
2189
2190 block_roots_to_prune.contains(&header.prefix.parent_root)
2191 })
2192 .map(|block| *block.header().header().root())
2193 .collect();
2194 }
2195 }
2196}