1#![expect(incomplete_features, reason = "generic_const_exprs")]
44#![feature(generic_const_exprs)]
47#![feature(
48 const_block_items,
49 const_convert,
50 const_trait_impl,
51 default_field_values,
52 get_mut_unchecked,
53 iter_collect_into,
54 maybe_uninit_fill
55)]
56
57mod page_group;
58pub mod storage_backend;
59mod storage_backend_adapter;
60
61use crate::page_group::block::StorageItemBlock;
62use crate::page_group::block::block::StorageItemBlockBlock;
63use crate::page_group::block::segment_headers::StorageItemBlockSegmentHeaders;
64use crate::page_group::block::super_segment_headers::StorageItemBlockSuperSegmentHeaders;
65use crate::storage_backend::ClientDatabaseStorageBackend;
66use crate::storage_backend_adapter::{
67 StorageBackendAdapter, StorageItemHandlerArg, StorageItemHandlers, WriteLocation,
68};
69use ab_client_api::{
70 BeaconChainInfo, BeaconChainInfoWrite, BlockDetails, BlockMerkleMountainRange, ChainInfo,
71 ChainInfoWrite, ContractSlotState, PersistBlockError, PersistSegmentHeadersError,
72 PersistSuperSegmentHeadersError, ReadBlockError, ShardSegmentRoot, ShardSegmentRootsError,
73};
74use ab_core_primitives::block::body::BeaconChainBody;
75use ab_core_primitives::block::body::owned::{GenericOwnedBlockBody, OwnedBeaconChainBody};
76use ab_core_primitives::block::header::GenericBlockHeader;
77use ab_core_primitives::block::header::owned::GenericOwnedBlockHeader;
78use ab_core_primitives::block::owned::{GenericOwnedBlock, OwnedBeaconChainBlock};
79use ab_core_primitives::block::{BlockNumber, BlockRoot, GenericBlock};
80use ab_core_primitives::segments::{
81 LocalSegmentIndex, SegmentHeader, SuperSegmentHeader, SuperSegmentIndex,
82};
83use ab_core_primitives::shard::RealShardKind;
84use ab_io_type::trivial_type::TrivialType;
85use async_lock::{
86 RwLock as AsyncRwLock, RwLockUpgradableReadGuard, RwLockWriteGuard as AsyncRwLockWriteGuard,
87};
88use rand::rngs::SysError;
89use rclite::Arc;
90use replace_with::replace_with_or_abort;
91use smallvec::{SmallVec, smallvec};
92use std::any::Any;
93use std::collections::{HashMap, VecDeque};
94use std::hash::{BuildHasherDefault, Hasher};
95use std::num::{NonZeroU32, NonZeroUsize};
96use std::ops::Deref;
97use std::sync::Arc as StdArc;
98use std::{fmt, io};
99use tracing::error;
100
101#[derive(Debug, Copy, Clone, Eq, PartialEq, TrivialType)]
103#[repr(C)]
104pub struct DatabaseId([u8; 32]);
105
106impl Deref for DatabaseId {
107 type Target = [u8; 32];
108
109 #[inline(always)]
110 fn deref(&self) -> &Self::Target {
111 &self.0
112 }
113}
114
115impl AsRef<[u8]> for DatabaseId {
116 #[inline(always)]
117 fn as_ref(&self) -> &[u8] {
118 &self.0
119 }
120}
121
122impl DatabaseId {
123 #[inline(always)]
124 pub const fn new(bytes: [u8; 32]) -> Self {
125 Self(bytes)
126 }
127}
128
129#[derive(Default)]
130struct BlockRootHasher(u64);
131
132impl Hasher for BlockRootHasher {
133 #[inline(always)]
134 fn finish(&self) -> u64 {
135 self.0
136 }
137
138 #[inline(always)]
139 fn write(&mut self, bytes: &[u8]) {
140 let Some(state) = bytes.as_chunks().0.first().copied().map(u64::from_le_bytes) else {
141 return;
142 };
143
144 self.0 = state;
145 }
146}
147
148#[derive(Debug)]
149pub struct GenesisBlockBuilderResult<Block> {
150 pub block: Block,
152 pub system_contract_states: StdArc<[ContractSlotState]>,
154}
155
156#[derive(Debug, Copy, Clone)]
158pub struct ClientDatabaseOptions<GBB, StorageBackend> {
159 pub write_buffer_size: usize = 5,
167 pub block_confirmation_depth: BlockNumber,
173 pub soft_confirmation_depth: BlockNumber = BlockNumber::from(3),
187 pub max_fork_tips: NonZeroUsize = NonZeroUsize::new(3).expect("Not zero; qed"),
198 pub max_fork_tip_distance: BlockNumber = BlockNumber::from(5),
209 pub genesis_block_builder: GBB,
212 pub storage_backend: StorageBackend,
214}
215
216#[derive(Debug, Copy, Clone)]
218pub struct ClientDatabaseFormatOptions {
219 pub page_group_size: NonZeroU32,
236 pub force: bool,
240}
241
242#[derive(Debug, thiserror::Error)]
243pub enum ClientDatabaseError {
244 #[error("Invalid soft confirmation depth, it must be smaller than confirmation depth k")]
246 InvalidSoftConfirmationDepth,
247 #[error("Invalid max fork tip distance, it must be smaller or equal to confirmation depth k")]
249 InvalidMaxForkTipDistance,
250 #[error("Storage backend has canceled read request")]
252 ReadRequestCancelled,
253 #[error("Storage backend read error: {error}")]
255 ReadError {
256 error: io::Error,
258 },
259 #[error("Unsupported database version: {database_version}")]
261 UnsupportedDatabaseVersion {
262 database_version: u8,
264 },
265 #[error("Page group size is too small ({page_group_size}), must be at least two pages")]
267 PageGroupSizeTooSmall {
268 page_group_size: u32,
270 },
271 #[error(
273 "Unexpected sequence number {actual} at page offset {page_offset} (expected \
274 {expected})"
275 )]
276 UnexpectedSequenceNumber {
277 actual: u64,
279 expected: u64,
281 page_offset: u32,
283 },
284 #[error("Unexpected storage item at offset {page_offset}: {storage_item:?}")]
286 UnexpectedStorageItem {
287 storage_item: Box<dyn fmt::Debug + Send + Sync>,
289 page_offset: u32,
291 },
292 #[error("Invalid block at offset {page_offset}")]
294 InvalidBlock {
295 page_offset: u32,
297 },
298 #[error("Invalid segment headers at offset {page_offset}")]
300 InvalidSegmentHeaders {
301 page_offset: u32,
303 },
304 #[error("Failed to adjust ancestor block forks")]
306 FailedToAdjustAncestorBlockForks,
307 #[error("Database is not formatted yet")]
309 Unformatted,
310 #[error("Non-permanent first page group")]
312 NonPermanentFirstPageGroup,
313}
314
315#[derive(Debug, thiserror::Error)]
317pub enum ClientDatabaseFormatError {
318 #[error("Storage backend has canceled read request")]
320 ReadRequestCancelled,
321 #[error("Storage backend read error: {error}")]
323 ReadError {
324 error: io::Error,
326 },
327 #[error("Failed to generate database id")]
329 FailedToGenerateDatabaseId {
330 #[from]
332 error: SysError,
333 },
334 #[error("Database is already formatted yet")]
336 AlreadyFormatted,
337 #[error("Storage backend has canceled a writing request")]
339 WriteRequestCancelled,
340 #[error("Storage item write error")]
342 StorageItemWriteError {
343 #[from]
345 error: io::Error,
346 },
347}
348
349#[derive(Debug, Copy, Clone)]
350struct ForkTip {
351 number: BlockNumber,
352 root: BlockRoot,
353}
354
355enum FullBlock<'a, Block>
356where
357 Block: GenericOwnedBlock,
358{
359 InMemory(&'a Block),
360 Persisted {
361 header: &'a Block::Header,
362 write_location: WriteLocation,
363 },
364}
365
366#[derive(Debug)]
367struct BeaconChainBlockDetails {
368 shard_segment_roots: StdArc<[ShardSegmentRoot]>,
370}
371
372impl BeaconChainBlockDetails {
373 fn from_body(body: &BeaconChainBody<'_>) -> Self {
374 let shard_segment_roots = body
375 .intermediate_shard_blocks()
376 .iter()
377 .flat_map(|intermediate_shard_block_info| {
378 let own_segments = intermediate_shard_block_info
379 .own_segments
380 .into_iter()
381 .flat_map({
382 let shard_index = intermediate_shard_block_info.header.prefix.shard_index;
383
384 move |own_segments| {
385 (own_segments.first_local_segment_index..)
386 .zip(own_segments.segment_roots)
387 .map(move |(segment_index, &segment_root)| ShardSegmentRoot {
388 shard_index,
389 segment_index,
390 segment_root,
391 })
392 }
393 });
394 let child_shard_segment_roots = intermediate_shard_block_info
395 .leaf_shards_segments()
396 .flat_map(move |(shard_index, own_segments)| {
397 (own_segments.first_local_segment_index..)
398 .zip(own_segments.segment_roots)
399 .map(move |(segment_index, &segment_root)| ShardSegmentRoot {
400 shard_index,
401 segment_index,
402 segment_root,
403 })
404 });
405
406 own_segments.chain(child_shard_segment_roots)
407 })
408 .collect();
409
410 Self {
411 shard_segment_roots,
412 }
413 }
414}
415
416#[derive(Debug)]
423enum ClientDatabaseBlock<Block>
424where
425 Block: GenericOwnedBlock,
426{
427 InMemory {
429 block: Block,
430 block_details: BlockDetails,
431 beacon_chain_block_details: Option<BeaconChainBlockDetails>,
433 },
434 Persisted {
436 header: Block::Header,
437 block_details: BlockDetails,
438 beacon_chain_block_details: Option<BeaconChainBlockDetails>,
440 write_location: WriteLocation,
441 },
442 PersistedConfirmed {
445 header: Block::Header,
446 beacon_chain_block_details: Option<BeaconChainBlockDetails>,
448 write_location: WriteLocation,
449 },
450}
451
452impl<Block> ClientDatabaseBlock<Block>
453where
454 Block: GenericOwnedBlock,
455{
456 #[inline(always)]
457 fn header(&self) -> &Block::Header {
458 match self {
459 Self::InMemory { block, .. } => block.header(),
460 Self::Persisted { header, .. } | Self::PersistedConfirmed { header, .. } => header,
461 }
462 }
463
464 #[inline(always)]
465 fn full_block(&self) -> FullBlock<'_, Block> {
466 match self {
467 Self::InMemory { block, .. } => FullBlock::InMemory(block),
468 Self::Persisted {
469 header,
470 write_location,
471 ..
472 }
473 | Self::PersistedConfirmed {
474 header,
475 write_location,
476 ..
477 } => FullBlock::Persisted {
478 header,
479 write_location: *write_location,
480 },
481 }
482 }
483
484 #[inline(always)]
485 fn block_details(&self) -> Option<&BlockDetails> {
486 match self {
487 Self::InMemory { block_details, .. } | Self::Persisted { block_details, .. } => {
488 Some(block_details)
489 }
490 Self::PersistedConfirmed { .. } => None,
491 }
492 }
493
494 #[inline(always)]
495 fn beacon_chain_block_details(&self) -> &Option<BeaconChainBlockDetails> {
496 match self {
497 Self::InMemory {
498 beacon_chain_block_details,
499 ..
500 }
501 | Self::Persisted {
502 beacon_chain_block_details,
503 ..
504 }
505 | Self::PersistedConfirmed {
506 beacon_chain_block_details,
507 ..
508 } => beacon_chain_block_details,
509 }
510 }
511}
512
513#[derive(Debug)]
514struct StateData<Block>
515where
516 Block: GenericOwnedBlock,
517{
518 fork_tips: VecDeque<ForkTip>,
523 block_roots: HashMap<BlockRoot, BlockNumber, BuildHasherDefault<BlockRootHasher>>,
528 blocks: VecDeque<SmallVec<[ClientDatabaseBlock<Block>; 2]>>,
543}
544
545#[derive(Debug)]
546struct SegmentHeadersCache {
547 segment_headers_cache: Vec<SegmentHeader>,
548}
549
550impl SegmentHeadersCache {
551 #[inline(always)]
552 fn last_segment_header(&self) -> Option<SegmentHeader> {
553 self.segment_headers_cache.last().cloned()
554 }
555
556 #[inline(always)]
557 fn max_local_segment_index(&self) -> Option<LocalSegmentIndex> {
558 self.segment_headers_cache
559 .last()
560 .map(|segment_header| segment_header.segment_index.as_inner())
561 }
562
563 #[inline(always)]
564 fn get_segment_header(&self, local_segment_index: LocalSegmentIndex) -> Option<SegmentHeader> {
565 self.segment_headers_cache
566 .get(u64::from(local_segment_index) as usize)
567 .copied()
568 }
569
570 fn add_segment_headers(
572 &mut self,
573 mut segment_headers: Vec<SegmentHeader>,
574 ) -> Result<Vec<SegmentHeader>, PersistSegmentHeadersError> {
575 self.segment_headers_cache.reserve(segment_headers.len());
576
577 let mut maybe_last_local_segment_index = self.max_local_segment_index();
578
579 if let Some(last_segment_index) = maybe_last_local_segment_index {
580 segment_headers.retain(|segment_header| {
582 segment_header.segment_index.as_inner() > last_segment_index
583 });
584 }
585
586 for segment_header in segment_headers.iter().copied() {
589 let local_segment_index = segment_header.local_segment_index();
590 match maybe_last_local_segment_index {
591 Some(last_local_segment_index) => {
592 if local_segment_index != last_local_segment_index + LocalSegmentIndex::ONE {
593 return Err(PersistSegmentHeadersError::MustFollowLastSegmentIndex {
594 local_segment_index,
595 last_local_segment_index,
596 });
597 }
598
599 self.segment_headers_cache.push(segment_header);
600 maybe_last_local_segment_index.replace(local_segment_index);
601 }
602 None => {
603 if local_segment_index != LocalSegmentIndex::ZERO {
604 return Err(PersistSegmentHeadersError::FirstSegmentIndexZero {
605 local_segment_index,
606 });
607 }
608
609 self.segment_headers_cache.push(segment_header);
610 maybe_last_local_segment_index.replace(local_segment_index);
611 }
612 }
613 }
614
615 Ok(segment_headers)
616 }
617}
618
619#[derive(Debug)]
620struct SuperSegmentHeadersCache {
621 super_segment_headers_cache: Vec<SuperSegmentHeader>,
622}
623
624impl SuperSegmentHeadersCache {
625 #[inline(always)]
626 fn last_super_segment_header(&self) -> Option<SuperSegmentHeader> {
627 self.super_segment_headers_cache.last().cloned()
628 }
629
630 #[inline]
631 fn previous_super_segment_header(
632 &self,
633 target_block_number: BlockNumber,
634 ) -> Option<SuperSegmentHeader> {
635 let block_number = target_block_number.checked_sub(BlockNumber::ONE)?;
636 let index = match self.super_segment_headers_cache.binary_search_by_key(
637 &block_number,
638 |super_segment_header| {
639 super_segment_header
640 .target_beacon_chain_block_number
641 .as_inner()
642 },
643 ) {
644 Ok(found_index) => found_index,
645 Err(insert_index) => insert_index.checked_sub(1)?,
646 };
647
648 self.super_segment_headers_cache.get(index).copied()
649 }
650
651 #[inline(always)]
652 fn get_super_segment_header(
653 &self,
654 local_segment_index: SuperSegmentIndex,
655 ) -> Option<SuperSegmentHeader> {
656 self.super_segment_headers_cache
657 .get(u64::from(local_segment_index) as usize)
658 .copied()
659 }
660
661 fn add_super_segment_headers(
663 &mut self,
664 mut super_segment_headers: Vec<SuperSegmentHeader>,
665 ) -> Result<Vec<SuperSegmentHeader>, PersistSuperSegmentHeadersError> {
666 self.super_segment_headers_cache
667 .reserve(super_segment_headers.len());
668
669 let mut maybe_last_super_segment_index = self
670 .super_segment_headers_cache
671 .last()
672 .map(|header| header.index.as_inner());
673
674 if let Some(last_super_segment_index) = maybe_last_super_segment_index {
675 super_segment_headers.retain(|super_segment_header| {
677 super_segment_header.index.as_inner() > last_super_segment_index
678 });
679 }
680
681 for super_segment_header in super_segment_headers.iter().copied() {
684 let super_segment_index = super_segment_header.index.as_inner();
685 match maybe_last_super_segment_index {
686 Some(last_super_segment_index) => {
687 if super_segment_index != last_super_segment_index + SuperSegmentIndex::ONE {
688 return Err(
689 PersistSuperSegmentHeadersError::MustFollowLastSegmentIndex {
690 super_segment_index,
691 last_super_segment_index,
692 },
693 );
694 }
695
696 self.super_segment_headers_cache.push(super_segment_header);
697 maybe_last_super_segment_index.replace(super_segment_index);
698 }
699 None => {
700 if super_segment_index != SuperSegmentIndex::ZERO {
701 return Err(PersistSuperSegmentHeadersError::FirstSegmentIndexZero {
702 super_segment_index,
703 });
704 }
705
706 self.super_segment_headers_cache.push(super_segment_header);
707 maybe_last_super_segment_index.replace(super_segment_index);
708 }
709 }
710 }
711
712 Ok(super_segment_headers)
713 }
714}
715
716#[derive(Debug)]
718struct State<Block, StorageBackend>
719where
720 Block: GenericOwnedBlock,
721{
722 data: StateData<Block>,
723 segment_headers_cache: SegmentHeadersCache,
724 super_segment_headers_cache: SuperSegmentHeadersCache,
725 storage_backend_adapter: AsyncRwLock<StorageBackendAdapter<StorageBackend>>,
726}
727
728impl<Block, StorageBackend> State<Block, StorageBackend>
729where
730 Block: GenericOwnedBlock,
731{
732 #[inline(always)]
733 fn best_tip(&self) -> &ForkTip {
734 self.data
735 .fork_tips
736 .front()
737 .expect("The best block is always present; qed")
738 }
739
740 #[inline(always)]
741 fn best_block(&self) -> &ClientDatabaseBlock<Block> {
742 self.data
743 .blocks
744 .front()
745 .expect("The best block is always present; qed")
746 .first()
747 .expect("The best block is always present; qed")
748 }
749}
750
751#[derive(Debug)]
752struct BlockToPersist<'a, Block>
753where
754 Block: GenericOwnedBlock,
755{
756 block_offset: usize,
757 fork_offset: usize,
758 block: &'a Block,
759 block_details: &'a BlockDetails,
760}
761
762#[derive(Debug)]
763struct PersistedBlock {
764 block_offset: usize,
765 fork_offset: usize,
766 write_location: WriteLocation,
767}
768
769#[derive(Debug)]
770struct ClientDatabaseInnerOptions {
771 block_confirmation_depth: BlockNumber,
772 soft_confirmation_depth: BlockNumber,
773 max_fork_tips: NonZeroUsize,
774 max_fork_tip_distance: BlockNumber,
775}
776
777#[derive(Debug)]
778struct Inner<Block, StorageBackend>
779where
780 Block: GenericOwnedBlock,
781{
782 state: AsyncRwLock<State<Block, StorageBackend>>,
783 options: ClientDatabaseInnerOptions,
784}
785
786#[derive(Debug)]
788pub struct ClientDatabase<Block, StorageBackend>
789where
790 Block: GenericOwnedBlock,
791{
792 inner: Arc<Inner<Block, StorageBackend>>,
793}
794
795impl<Block, StorageBackend> Clone for ClientDatabase<Block, StorageBackend>
796where
797 Block: GenericOwnedBlock,
798{
799 fn clone(&self) -> Self {
800 Self {
801 inner: self.inner.clone(),
802 }
803 }
804}
805
806impl<Block, StorageBackend> Drop for ClientDatabase<Block, StorageBackend>
807where
808 Block: GenericOwnedBlock,
809{
810 fn drop(&mut self) {
811 }
813}
814
815impl<Block, StorageBackend> ChainInfo<Block> for ClientDatabase<Block, StorageBackend>
816where
817 Block: GenericOwnedBlock,
818 StorageBackend: ClientDatabaseStorageBackend,
819{
820 #[inline]
821 fn best_root(&self) -> BlockRoot {
822 self.inner.state.read_blocking().best_tip().root
825 }
826
827 #[inline]
828 fn best_header(&self) -> Block::Header {
829 self.inner
832 .state
833 .read_blocking()
834 .best_block()
835 .header()
836 .clone()
837 }
838
839 #[inline]
840 fn best_header_with_details(&self) -> (Block::Header, BlockDetails) {
841 let state = self.inner.state.read_blocking();
844 let best_block = state.best_block();
845 (
846 best_block.header().clone(),
847 best_block
848 .block_details()
849 .expect("Always present for the best block; qed")
850 .clone(),
851 )
852 }
853
854 #[inline]
856 fn ancestor_header(
857 &self,
858 ancestor_block_number: BlockNumber,
859 descendant_block_root: &BlockRoot,
860 ) -> Option<Block::Header> {
861 let state = self.inner.state.read_blocking();
864 let best_number = state.best_tip().number;
865
866 let ancestor_block_offset =
867 u64::from(best_number.checked_sub(ancestor_block_number)?) as usize;
868 let ancestor_block_candidates = state.data.blocks.get(ancestor_block_offset)?;
869
870 let descendant_block_number = *state.data.block_roots.get(descendant_block_root)?;
871 if ancestor_block_number > descendant_block_number {
872 return None;
873 }
874 let descendant_block_offset =
875 u64::from(best_number.checked_sub(descendant_block_number)?) as usize;
876
877 let mut blocks_range_iter = state
879 .data
880 .blocks
881 .iter()
882 .enumerate()
883 .skip(descendant_block_offset);
884
885 let (_offset, descendant_block_candidates) = blocks_range_iter.next()?;
886 let descendant_header = descendant_block_candidates
887 .iter()
888 .find(|block| &*block.header().header().root() == descendant_block_root)?
889 .header()
890 .header();
891
892 if descendant_block_candidates.len() == 1 || ancestor_block_candidates.len() == 1 {
897 return ancestor_block_candidates
898 .iter()
899 .next()
900 .map(|block| block.header().clone());
901 }
902
903 let mut parent_block_root = &descendant_header.prefix.parent_root;
904
905 for (block_offset, parent_candidates) in blocks_range_iter {
907 let parent_header = parent_candidates
908 .iter()
909 .find(|header| &*header.header().header().root() == parent_block_root)?
910 .header();
911
912 if block_offset == ancestor_block_offset {
914 return Some(parent_header.clone());
915 }
916
917 parent_block_root = &parent_header.header().prefix.parent_root;
918 }
919
920 None
921 }
922
923 #[inline]
924 fn header(&self, block_root: &BlockRoot) -> Option<Block::Header> {
925 let state = self.inner.state.read_blocking();
928 let best_number = state.best_tip().number;
929
930 let block_number = *state.data.block_roots.get(block_root)?;
931 let block_offset = u64::from(best_number.checked_sub(block_number)?) as usize;
932 let block_candidates = state.data.blocks.get(block_offset)?;
933
934 block_candidates.iter().find_map(|block| {
935 let header = block.header();
936
937 if &*header.header().root() == block_root {
938 Some(header.clone())
939 } else {
940 None
941 }
942 })
943 }
944
945 #[inline]
946 fn header_with_details(&self, block_root: &BlockRoot) -> Option<(Block::Header, BlockDetails)> {
947 let state = self.inner.state.read_blocking();
950 let best_number = state.best_tip().number;
951
952 let block_number = *state.data.block_roots.get(block_root)?;
953 let block_offset = u64::from(best_number.checked_sub(block_number)?) as usize;
954 let block_candidates = state.data.blocks.get(block_offset)?;
955
956 block_candidates.iter().find_map(|block| {
957 let header = block.header();
958 let block_details = block.block_details().cloned()?;
959
960 if &*header.header().root() == block_root {
961 Some((header.clone(), block_details))
962 } else {
963 None
964 }
965 })
966 }
967
968 #[inline]
969 async fn block(&self, block_root: &BlockRoot) -> Result<Block, ReadBlockError> {
970 let state = self.inner.state.read().await;
971 let best_number = state.best_tip().number;
972
973 let block_number = *state
974 .data
975 .block_roots
976 .get(block_root)
977 .ok_or(ReadBlockError::UnknownBlockRoot)?;
978 let block_offset = u64::from(
979 best_number
980 .checked_sub(block_number)
981 .expect("Known block roots always have valid block offset; qed"),
982 ) as usize;
983 let block_candidates = state
984 .data
985 .blocks
986 .get(block_offset)
987 .expect("Valid block offsets always have block entries; qed");
988
989 for block_candidate in block_candidates {
990 let header = block_candidate.header();
991
992 if &*header.header().root() == block_root {
993 return match block_candidate.full_block() {
994 FullBlock::InMemory(block) => Ok(block.clone()),
995 FullBlock::Persisted {
996 header,
997 write_location,
998 } => {
999 let storage_backend_adapter = state.storage_backend_adapter.read().await;
1000
1001 let storage_item = storage_backend_adapter
1002 .read_storage_item::<StorageItemBlock>(write_location)
1003 .await?;
1004
1005 let storage_item_block = match storage_item {
1006 StorageItemBlock::Block(storage_item_block) => storage_item_block,
1007 StorageItemBlock::SegmentHeaders(_) => {
1008 return Err(ReadBlockError::StorageItemReadError {
1009 error: io::Error::other(
1010 "Unexpected storage item: `SegmentHeaders`",
1011 ),
1012 });
1013 }
1014 StorageItemBlock::SuperSegmentHeaders(_) => {
1015 return Err(ReadBlockError::StorageItemReadError {
1016 error: io::Error::other(
1017 "Unexpected storage item: `SuperSegmentHeaders`",
1018 ),
1019 });
1020 }
1021 };
1022
1023 let StorageItemBlockBlock {
1024 header: _,
1025 body,
1026 mmr_with_block: _,
1027 system_contract_states: _,
1028 } = storage_item_block;
1029
1030 Block::from_buffers(header.buffer().clone(), body)
1031 .ok_or(ReadBlockError::FailedToDecode)
1032 }
1033 };
1034 }
1035 }
1036
1037 unreachable!("Known block root always has block candidate associated with it; qed")
1038 }
1039
1040 #[inline]
1041 fn last_segment_header(&self) -> Option<SegmentHeader> {
1042 let state = self.inner.state.read_blocking();
1045 state.segment_headers_cache.last_segment_header()
1046 }
1047
1048 #[inline]
1049 fn get_segment_header(&self, segment_index: LocalSegmentIndex) -> Option<SegmentHeader> {
1050 let state = self.inner.state.read_blocking();
1053
1054 state
1055 .segment_headers_cache
1056 .get_segment_header(segment_index)
1057 }
1058
1059 fn segment_headers_for_block(&self, block_number: BlockNumber) -> Vec<SegmentHeader> {
1060 let state = self.inner.state.read_blocking();
1063
1064 let Some(last_local_segment_index) = state.segment_headers_cache.max_local_segment_index()
1065 else {
1066 return Vec::new();
1068 };
1069
1070 if Block::Block::SHARD_KIND == RealShardKind::BeaconChain
1072 && block_number == BlockNumber::ONE
1073 {
1074 return vec![
1077 state
1078 .segment_headers_cache
1079 .get_segment_header(LocalSegmentIndex::ZERO)
1080 .expect("Segment headers are stored in monotonically increasing order; qed"),
1081 ];
1082 }
1083
1084 if last_local_segment_index == LocalSegmentIndex::ZERO {
1085 return Vec::new();
1087 }
1088
1089 let mut current_local_segment_index = last_local_segment_index;
1090 loop {
1091 let current_segment_header = state
1094 .segment_headers_cache
1095 .get_segment_header(current_local_segment_index)
1096 .expect("Segment headers are stored in monotonically increasing order; qed");
1097
1098 let target_block_number = current_segment_header.last_archived_block.number()
1100 + BlockNumber::ONE
1101 + self.inner.options.block_confirmation_depth;
1102 if target_block_number == block_number {
1103 let mut headers_for_block = vec![current_segment_header];
1104
1105 let last_archived_block_number = current_segment_header.last_archived_block.number;
1107 let mut local_segment_index = current_local_segment_index - LocalSegmentIndex::ONE;
1108
1109 while let Some(segment_header) = state
1110 .segment_headers_cache
1111 .get_segment_header(local_segment_index)
1112 {
1113 if segment_header.last_archived_block.number == last_archived_block_number {
1114 headers_for_block.insert(0, segment_header);
1115 local_segment_index -= LocalSegmentIndex::ONE;
1116 } else {
1117 break;
1118 }
1119 }
1120
1121 return headers_for_block;
1122 }
1123
1124 if target_block_number > block_number {
1126 if current_local_segment_index > LocalSegmentIndex::ONE {
1128 current_local_segment_index -= LocalSegmentIndex::ONE
1129 } else {
1130 break;
1131 }
1132 } else {
1133 return Vec::new();
1135 }
1136 }
1137
1138 Vec::new()
1140 }
1141}
1142
1143impl<Block, StorageBackend> ChainInfoWrite<Block> for ClientDatabase<Block, StorageBackend>
1144where
1145 Block: GenericOwnedBlock,
1146 StorageBackend: ClientDatabaseStorageBackend,
1147{
1148 async fn persist_block(
1149 &self,
1150 block: Block,
1151 block_details: BlockDetails,
1152 ) -> Result<(), PersistBlockError> {
1153 let mut state = self.inner.state.write().await;
1154 let best_number = state.best_tip().number;
1155
1156 let header = block.header().header();
1157
1158 let block_number = header.prefix.number;
1159
1160 if best_number == BlockNumber::ZERO && block_number != BlockNumber::ONE {
1161 Self::insert_first_block(&mut state.data, block, block_details);
1163
1164 return Ok(());
1165 }
1166
1167 if block_number == best_number + BlockNumber::ONE {
1168 return Self::insert_new_best_block(state, &self.inner, block, block_details).await;
1169 }
1170
1171 let block_offset = u64::from(
1172 best_number
1173 .checked_sub(block_number)
1174 .ok_or(PersistBlockError::MissingParent)?,
1175 ) as usize;
1176
1177 if block_offset >= u64::from(self.inner.options.block_confirmation_depth) as usize {
1178 return Err(PersistBlockError::OutsideAcceptableRange);
1179 }
1180
1181 let state = &mut *state;
1182
1183 let block_forks = state.data.blocks.get_mut(block_offset).ok_or_else(|| {
1184 error!(
1185 %block_number,
1186 %block_offset,
1187 "Failed to store block fork, header offset is missing despite being within \
1188 acceptable range"
1189 );
1190
1191 PersistBlockError::OutsideAcceptableRange
1192 })?;
1193
1194 for (index, fork_tip) in state.data.fork_tips.iter_mut().enumerate() {
1195 if fork_tip.root == header.prefix.parent_root {
1197 state.data.fork_tips.remove(index);
1198 break;
1199 }
1200 }
1201
1202 let block_root = *header.root();
1203 state.data.fork_tips.insert(
1206 1,
1207 ForkTip {
1208 number: block_number,
1209 root: block_root,
1210 },
1211 );
1212 state.data.block_roots.insert(block_root, block_number);
1213 let beacon_chain_block_details = <dyn Any>::downcast_ref::<OwnedBeaconChainBlock>(&block)
1214 .map(|block| BeaconChainBlockDetails::from_body(block.body.body()));
1215 block_forks.push(ClientDatabaseBlock::InMemory {
1216 block,
1217 block_details,
1218 beacon_chain_block_details,
1219 });
1220
1221 Self::prune_outdated_fork_tips(block_number, &mut state.data, &self.inner.options);
1222
1223 Ok(())
1224 }
1225
1226 async fn persist_segment_headers(
1227 &self,
1228 segment_headers: Vec<SegmentHeader>,
1229 ) -> Result<(), PersistSegmentHeadersError> {
1230 let mut state = self.inner.state.write().await;
1231
1232 let added_segment_headers = state
1233 .segment_headers_cache
1234 .add_segment_headers(segment_headers)?;
1235
1236 if added_segment_headers.is_empty() {
1237 return Ok(());
1238 }
1239
1240 let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1245
1246 let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1247
1248 storage_backend_adapter
1249 .write_storage_item(StorageItemBlock::SegmentHeaders(
1250 StorageItemBlockSegmentHeaders {
1251 segment_headers: added_segment_headers,
1252 },
1253 ))
1254 .await?;
1255
1256 Ok(())
1257 }
1258}
1259
1260impl<StorageBackend> BeaconChainInfo for ClientDatabase<OwnedBeaconChainBlock, StorageBackend>
1261where
1262 StorageBackend: ClientDatabaseStorageBackend,
1263{
1264 fn shard_segment_roots(
1265 &self,
1266 block_number: BlockNumber,
1267 ) -> Result<StdArc<[ShardSegmentRoot]>, ShardSegmentRootsError> {
1268 let state = self.inner.state.read_blocking();
1271 let best_number = state.best_tip().number;
1272
1273 let block_offset = u64::from(
1274 best_number
1275 .checked_sub(block_number)
1276 .ok_or(ShardSegmentRootsError::BlockMissing { block_number })?,
1277 ) as usize;
1278
1279 let block = state
1280 .data
1281 .blocks
1282 .get(block_offset)
1283 .ok_or(ShardSegmentRootsError::BlockMissing { block_number })?
1284 .first()
1285 .expect("There is always at least one block candidate; qed");
1286
1287 Ok(StdArc::clone(
1288 &block
1289 .beacon_chain_block_details()
1290 .as_ref()
1291 .expect("Always present in the beacon chain block; qed")
1292 .shard_segment_roots,
1293 ))
1294 }
1295
1296 #[inline]
1297 fn last_super_segment_header(&self) -> Option<SuperSegmentHeader> {
1298 let state = self.inner.state.read_blocking();
1301 state
1302 .super_segment_headers_cache
1303 .last_super_segment_header()
1304 }
1305
1306 #[inline]
1307 fn previous_super_segment_header(
1308 &self,
1309 target_block_number: BlockNumber,
1310 ) -> Option<SuperSegmentHeader> {
1311 let state = self.inner.state.read_blocking();
1314
1315 state
1316 .super_segment_headers_cache
1317 .previous_super_segment_header(target_block_number)
1318 }
1319
1320 #[inline]
1321 fn get_super_segment_header(
1322 &self,
1323 super_segment_index: SuperSegmentIndex,
1324 ) -> Option<SuperSegmentHeader> {
1325 let state = self.inner.state.read_blocking();
1328
1329 state
1330 .super_segment_headers_cache
1331 .get_super_segment_header(super_segment_index)
1332 }
1333}
1334
1335impl<StorageBackend> BeaconChainInfoWrite for ClientDatabase<OwnedBeaconChainBlock, StorageBackend>
1336where
1337 StorageBackend: ClientDatabaseStorageBackend,
1338{
1339 async fn persist_super_segment_header(
1340 &self,
1341 super_segment_header: SuperSegmentHeader,
1342 ) -> Result<bool, PersistSuperSegmentHeadersError> {
1343 let mut state = self.inner.state.write().await;
1344
1345 let added_super_segment_headers = state
1346 .super_segment_headers_cache
1347 .add_super_segment_headers(vec![super_segment_header])?;
1348
1349 if added_super_segment_headers.is_empty() {
1350 return Ok(false);
1351 }
1352
1353 let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1358
1359 let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1360
1361 storage_backend_adapter
1362 .write_storage_item(StorageItemBlock::SuperSegmentHeaders(
1363 StorageItemBlockSuperSegmentHeaders {
1364 super_segment_headers: added_super_segment_headers,
1365 },
1366 ))
1367 .await?;
1368
1369 Ok(true)
1370 }
1371
1372 async fn persist_super_segment_headers(
1373 &self,
1374 super_segment_headers: Vec<SuperSegmentHeader>,
1375 ) -> Result<(), PersistSuperSegmentHeadersError> {
1376 let mut state = self.inner.state.write().await;
1377
1378 let added_super_segment_headers = state
1379 .super_segment_headers_cache
1380 .add_super_segment_headers(super_segment_headers)?;
1381
1382 if added_super_segment_headers.is_empty() {
1383 return Ok(());
1384 }
1385
1386 let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1391
1392 let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1393
1394 storage_backend_adapter
1395 .write_storage_item(StorageItemBlock::SuperSegmentHeaders(
1396 StorageItemBlockSuperSegmentHeaders {
1397 super_segment_headers: added_super_segment_headers,
1398 },
1399 ))
1400 .await?;
1401
1402 Ok(())
1403 }
1404}
1405
1406impl<Block, StorageBackend> ClientDatabase<Block, StorageBackend>
1407where
1408 Block: GenericOwnedBlock,
1409 StorageBackend: ClientDatabaseStorageBackend,
1410{
1411 pub async fn open<GBB>(
1415 options: ClientDatabaseOptions<GBB, StorageBackend>,
1416 ) -> Result<Self, ClientDatabaseError>
1417 where
1418 GBB: FnOnce() -> GenesisBlockBuilderResult<Block>,
1419 {
1420 let ClientDatabaseOptions {
1421 write_buffer_size,
1422 block_confirmation_depth,
1423 soft_confirmation_depth,
1424 max_fork_tips,
1425 max_fork_tip_distance,
1426 genesis_block_builder,
1427 storage_backend,
1428 } = options;
1429 if soft_confirmation_depth >= block_confirmation_depth {
1430 return Err(ClientDatabaseError::InvalidSoftConfirmationDepth);
1431 }
1432
1433 if max_fork_tip_distance > block_confirmation_depth {
1434 return Err(ClientDatabaseError::InvalidMaxForkTipDistance);
1435 }
1436
1437 let mut state_data = StateData {
1438 fork_tips: VecDeque::new(),
1439 block_roots: HashMap::default(),
1440 blocks: VecDeque::new(),
1441 };
1442 let mut segment_headers_cache = SegmentHeadersCache {
1443 segment_headers_cache: Vec::new(),
1444 };
1445 let mut super_segment_headers_cache = SuperSegmentHeadersCache {
1446 super_segment_headers_cache: Vec::new(),
1447 };
1448
1449 let options = ClientDatabaseInnerOptions {
1450 block_confirmation_depth,
1451 soft_confirmation_depth,
1452 max_fork_tips,
1453 max_fork_tip_distance,
1454 };
1455
1456 let storage_item_handlers = StorageItemHandlers {
1457 permanent: |_arg| {
1458 Ok(())
1460 },
1461 block: |arg| {
1462 let StorageItemHandlerArg {
1463 storage_item,
1464 page_offset,
1465 num_pages,
1466 } = arg;
1467 let storage_item_block = match storage_item {
1468 StorageItemBlock::Block(storage_item_block) => storage_item_block,
1469 StorageItemBlock::SegmentHeaders(segment_headers) => {
1470 let num_segment_headers = segment_headers.segment_headers.len();
1471 return match segment_headers_cache
1472 .add_segment_headers(segment_headers.segment_headers)
1473 {
1474 Ok(_) => Ok(()),
1475 Err(error) => {
1476 error!(
1477 %page_offset,
1478 %num_segment_headers,
1479 %error,
1480 "Failed to add segment headers from storage item"
1481 );
1482
1483 Err(ClientDatabaseError::InvalidSegmentHeaders { page_offset })
1484 }
1485 };
1486 }
1487 StorageItemBlock::SuperSegmentHeaders(super_segment_headers) => {
1488 let num_super_segment_headers =
1489 super_segment_headers.super_segment_headers.len();
1490 return match super_segment_headers_cache
1491 .add_super_segment_headers(super_segment_headers.super_segment_headers)
1492 {
1493 Ok(_) => Ok(()),
1494 Err(error) => {
1495 error!(
1496 %page_offset,
1497 %num_super_segment_headers,
1498 %error,
1499 "Failed to add segment headers from storage item"
1500 );
1501
1502 Err(ClientDatabaseError::InvalidSegmentHeaders { page_offset })
1503 }
1504 };
1505 }
1506 };
1507
1508 let StorageItemBlockBlock {
1511 header,
1512 body,
1513 mmr_with_block,
1514 system_contract_states,
1515 } = storage_item_block;
1516
1517 let header = Block::Header::from_buffer(header).map_err(|_buffer| {
1518 error!(%page_offset, "Failed to decode block header from bytes");
1519
1520 ClientDatabaseError::InvalidBlock { page_offset }
1521 })?;
1522 let body = Block::Body::from_buffer(body).map_err(|_buffer| {
1523 error!(%page_offset, "Failed to decode block body from bytes");
1524
1525 ClientDatabaseError::InvalidBlock { page_offset }
1526 })?;
1527
1528 let block_root = *header.header().root();
1529 let block_number = header.header().prefix.number;
1530
1531 state_data.block_roots.insert(block_root, block_number);
1532
1533 let maybe_best_number = state_data
1534 .blocks
1535 .front()
1536 .and_then(|block_forks| block_forks.first())
1537 .map(|best_block| {
1538 let header: &Block::Header = best_block.header();
1540
1541 header.header().prefix.number
1542 });
1543
1544 let block_offset = if let Some(best_number) = maybe_best_number {
1545 if block_number <= best_number {
1546 u64::from(best_number - block_number) as usize
1547 } else {
1548 if block_number - best_number != BlockNumber::ONE {
1550 error!(
1551 %page_offset,
1552 %best_number,
1553 %block_number,
1554 "Invalid new best block number, it must be only one block \
1555 higher than the best block"
1556 );
1557
1558 return Err(ClientDatabaseError::InvalidBlock { page_offset });
1559 }
1560
1561 state_data.blocks.push_front(SmallVec::new());
1562 0
1564 }
1565 } else {
1566 state_data.blocks.push_front(SmallVec::new());
1567 0
1569 };
1570
1571 let block_forks = match state_data.blocks.get_mut(block_offset) {
1572 Some(block_forks) => block_forks,
1573 None => {
1574 return Ok(());
1578 }
1579 };
1580
1581 let beacon_chain_block_details =
1583 <dyn Any>::downcast_ref::<OwnedBeaconChainBody>(&body)
1584 .map(|body| BeaconChainBlockDetails::from_body(body.body()));
1585 block_forks.push(ClientDatabaseBlock::Persisted {
1586 header,
1587 block_details: BlockDetails {
1588 mmr_with_block,
1589 system_contract_states,
1590 },
1591 beacon_chain_block_details,
1592 write_location: WriteLocation {
1593 page_offset,
1594 num_pages,
1595 },
1596 });
1597
1598 if block_offset == 0 && block_forks.len() == 1 {
1601 Self::confirm_canonical_block(block_number, &mut state_data, &options);
1602 }
1603
1604 Ok(())
1605 },
1606 };
1607
1608 let storage_backend_adapter =
1609 StorageBackendAdapter::open(write_buffer_size, storage_item_handlers, storage_backend)
1610 .await?;
1611
1612 if let Some(best_block) = state_data.blocks.front().and_then(|block_forks| {
1613 block_forks.last()
1616 }) {
1617 let header: &Block::Header = best_block.header();
1619 let header = header.header();
1620 let block_number = header.prefix.number;
1621 let block_root = *header.root();
1622
1623 if !Self::adjust_ancestor_block_forks(&mut state_data.blocks, block_root) {
1624 return Err(ClientDatabaseError::FailedToAdjustAncestorBlockForks);
1625 }
1626
1627 state_data.fork_tips.push_front(ForkTip {
1629 number: block_number,
1630 root: block_root,
1631 });
1632 } else {
1633 let GenesisBlockBuilderResult {
1634 block,
1635 system_contract_states,
1636 } = genesis_block_builder();
1637
1638 let header = block.header().header();
1640 let block_number = header.prefix.number;
1641 let block_root = *header.root();
1642
1643 state_data.fork_tips.push_front(ForkTip {
1644 number: block_number,
1645 root: block_root,
1646 });
1647 state_data.block_roots.insert(block_root, block_number);
1648 let beacon_chain_block_details =
1649 <dyn Any>::downcast_ref::<OwnedBeaconChainBlock>(&block)
1650 .map(|block| BeaconChainBlockDetails::from_body(block.body.body()));
1651 state_data
1652 .blocks
1653 .push_front(smallvec![ClientDatabaseBlock::InMemory {
1654 block,
1655 block_details: BlockDetails {
1656 system_contract_states,
1657 mmr_with_block: Arc::new({
1658 let mut mmr = BlockMerkleMountainRange::new();
1659 mmr.add_leaf(&block_root);
1660 mmr
1661 })
1662 },
1663 beacon_chain_block_details,
1664 }]);
1665 }
1666
1667 let state = State {
1668 data: state_data,
1669 segment_headers_cache,
1670 super_segment_headers_cache,
1671 storage_backend_adapter: AsyncRwLock::new(storage_backend_adapter),
1672 };
1673
1674 let inner = Inner {
1675 state: AsyncRwLock::new(state),
1676 options,
1677 };
1678
1679 Ok(Self {
1680 inner: Arc::new(inner),
1681 })
1682 }
1683
1684 pub async fn format(
1686 storage_backend: &StorageBackend,
1687 options: ClientDatabaseFormatOptions,
1688 ) -> Result<(), ClientDatabaseFormatError> {
1689 StorageBackendAdapter::format(storage_backend, options).await
1690 }
1691
1692 fn insert_first_block(state: &mut StateData<Block>, block: Block, block_details: BlockDetails) {
1693 let header = block.header().header();
1695 let block_number = header.prefix.number;
1696 let block_root = *header.root();
1697
1698 state.fork_tips.clear();
1699 state.fork_tips.push_front(ForkTip {
1700 number: block_number,
1701 root: block_root,
1702 });
1703 state.block_roots.clear();
1704 state.block_roots.insert(block_root, block_number);
1705 state.blocks.clear();
1706 let beacon_chain_block_details = <dyn Any>::downcast_ref::<OwnedBeaconChainBlock>(&block)
1707 .map(|block| BeaconChainBlockDetails::from_body(block.body.body()));
1708 state
1709 .blocks
1710 .push_front(smallvec![ClientDatabaseBlock::InMemory {
1711 block,
1712 block_details,
1713 beacon_chain_block_details,
1714 }]);
1715 }
1716
1717 async fn insert_new_best_block(
1718 mut state: AsyncRwLockWriteGuard<'_, State<Block, StorageBackend>>,
1719 inner: &Inner<Block, StorageBackend>,
1720 block: Block,
1721 block_details: BlockDetails,
1722 ) -> Result<(), PersistBlockError> {
1723 let header = block.header().header();
1724 let block_number = header.prefix.number;
1725 let block_root = *header.root();
1726 let parent_root = header.prefix.parent_root;
1727
1728 if !Self::adjust_ancestor_block_forks(&mut state.data.blocks, parent_root) {
1731 return Err(PersistBlockError::MissingParent);
1732 }
1733
1734 {
1736 for (index, fork_tip) in state.data.fork_tips.iter_mut().enumerate() {
1737 if fork_tip.root == parent_root {
1739 state.data.fork_tips.remove(index);
1740 break;
1741 }
1742 }
1743
1744 state.data.fork_tips.push_front(ForkTip {
1745 number: block_number,
1746 root: block_root,
1747 });
1748 state.data.block_roots.insert(block_root, block_number);
1749 let beacon_chain_block_details =
1750 <dyn Any>::downcast_ref::<OwnedBeaconChainBlock>(&block)
1751 .map(|block| BeaconChainBlockDetails::from_body(block.body.body()));
1752 state
1753 .data
1754 .blocks
1755 .push_front(smallvec![ClientDatabaseBlock::InMemory {
1756 block,
1757 block_details: block_details.clone(),
1758 beacon_chain_block_details,
1759 }]);
1760 }
1761
1762 let options = &inner.options;
1763
1764 Self::confirm_canonical_block(block_number, &mut state.data, options);
1765 Self::prune_outdated_fork_tips(block_number, &mut state.data, options);
1766
1767 let state = AsyncRwLockWriteGuard::downgrade_to_upgradable(state);
1772
1773 let mut blocks_to_persist = Vec::new();
1774 for block_offset in u64::from(options.soft_confirmation_depth) as usize.. {
1775 let Some(fork_blocks) = state.data.blocks.get(block_offset) else {
1776 break;
1777 };
1778
1779 let len_before = blocks_to_persist.len();
1780 fork_blocks
1781 .iter()
1782 .enumerate()
1783 .filter_map(|(fork_offset, client_database_block)| {
1784 match client_database_block {
1785 ClientDatabaseBlock::InMemory {
1786 block,
1787 block_details,
1788 beacon_chain_block_details: _,
1789 } => Some(BlockToPersist {
1790 block_offset,
1791 fork_offset,
1792 block,
1793 block_details,
1794 }),
1795 ClientDatabaseBlock::Persisted { .. }
1796 | ClientDatabaseBlock::PersistedConfirmed { .. } => {
1797 None
1799 }
1800 }
1801 })
1802 .collect_into(&mut blocks_to_persist);
1803
1804 if blocks_to_persist.len() == len_before {
1805 break;
1806 }
1807 }
1808
1809 let mut persisted_blocks = Vec::with_capacity(blocks_to_persist.len());
1811 {
1812 let mut storage_backend_adapter = state.storage_backend_adapter.write().await;
1813
1814 for block_to_persist in blocks_to_persist.into_iter().rev() {
1815 let BlockToPersist {
1816 block_offset,
1817 fork_offset,
1818 block,
1819 block_details,
1820 } = block_to_persist;
1821
1822 let write_location = storage_backend_adapter
1823 .write_storage_item(StorageItemBlock::Block(StorageItemBlockBlock {
1824 header: block.header().buffer().clone(),
1825 body: block.body().buffer().clone(),
1826 mmr_with_block: Arc::clone(&block_details.mmr_with_block),
1827 system_contract_states: StdArc::clone(
1828 &block_details.system_contract_states,
1829 ),
1830 }))
1831 .await?;
1832
1833 persisted_blocks.push(PersistedBlock {
1834 block_offset,
1835 fork_offset,
1836 write_location,
1837 });
1838 }
1839 }
1840
1841 let mut state = RwLockUpgradableReadGuard::upgrade(state).await;
1843 for persisted_block in persisted_blocks {
1844 let PersistedBlock {
1845 block_offset,
1846 fork_offset,
1847 write_location,
1848 } = persisted_block;
1849
1850 let block = state
1851 .data
1852 .blocks
1853 .get_mut(block_offset)
1854 .expect("Still holding the same lock since last check; qed")
1855 .get_mut(fork_offset)
1856 .expect("Still holding the same lock since last check; qed");
1857
1858 replace_with_or_abort(block, |block| {
1859 if let ClientDatabaseBlock::InMemory {
1860 block,
1861 block_details,
1862 beacon_chain_block_details,
1863 } = block
1864 {
1865 let (header, _body) = block.split();
1866
1867 ClientDatabaseBlock::Persisted {
1868 header,
1869 block_details,
1870 beacon_chain_block_details,
1871 write_location,
1872 }
1873 } else {
1874 unreachable!("Still holding the same lock since last check; qed");
1875 }
1876 });
1877 }
1878
1879 Ok(())
1883 }
1884
1885 #[must_use]
1890 fn adjust_ancestor_block_forks(
1891 blocks: &mut VecDeque<SmallVec<[ClientDatabaseBlock<Block>; 2]>>,
1892 mut parent_block_root: BlockRoot,
1893 ) -> bool {
1894 let mut ancestor_blocks = blocks.iter_mut();
1895
1896 loop {
1897 if ancestor_blocks.len() == 1 {
1898 break;
1900 }
1901
1902 let Some(parent_blocks) = ancestor_blocks.next() else {
1903 break;
1905 };
1906
1907 let Some(fork_offset_parent_block_root) =
1908 parent_blocks
1909 .iter()
1910 .enumerate()
1911 .find_map(|(fork_offset, fork_block)| {
1912 let fork_header = fork_block.header().header();
1913 if *fork_header.root() == parent_block_root {
1914 Some((fork_offset, fork_header.prefix.parent_root))
1915 } else {
1916 None
1917 }
1918 })
1919 else {
1920 return false;
1921 };
1922
1923 let fork_offset;
1924 (fork_offset, parent_block_root) = fork_offset_parent_block_root;
1925
1926 parent_blocks.swap(0, fork_offset);
1927 }
1928
1929 true
1930 }
1931
1932 fn prune_outdated_fork_tips(
1939 best_number: BlockNumber,
1940 state: &mut StateData<Block>,
1941 options: &ClientDatabaseInnerOptions,
1942 ) {
1943 let state = &mut *state;
1944
1945 let mut candidate_forks_to_remove = Vec::with_capacity(options.max_fork_tips.get());
1948
1949 state.fork_tips.retain(|fork_tip| {
1951 if best_number - fork_tip.number > options.max_fork_tip_distance {
1952 candidate_forks_to_remove.push(*fork_tip);
1953 false
1954 } else {
1955 true
1956 }
1957 });
1958 if state.fork_tips.len() > options.max_fork_tips.get() {
1960 state
1961 .fork_tips
1962 .drain(options.max_fork_tips.get()..)
1963 .collect_into(&mut candidate_forks_to_remove);
1964 }
1965
1966 candidate_forks_to_remove
1968 .retain(|fork_tip| !Self::prune_outdated_fork(best_number, fork_tip, state));
1969 state.fork_tips.extend(candidate_forks_to_remove);
1971 }
1972
1973 #[must_use]
1976 fn prune_outdated_fork(
1977 best_number: BlockNumber,
1978 fork_tip: &ForkTip,
1979 state: &mut StateData<Block>,
1980 ) -> bool {
1981 let block_offset = u64::from(best_number - fork_tip.number) as usize;
1982
1983 let mut block_root_to_prune = fork_tip.root;
1985 let mut pruned_tip = false;
1986 for block_offset in block_offset.. {
1987 let Some(fork_blocks) = state.blocks.get_mut(block_offset) else {
1988 if !pruned_tip {
1989 error!(
1990 %best_number,
1991 ?fork_tip,
1992 block_offset,
1993 "Block offset was not present in the database, this is an implementation \
1994 bug #1"
1995 );
1996 }
1997 break;
1999 };
2000
2001 if fork_blocks.len() == 1 {
2002 if !pruned_tip {
2003 error!(
2004 %best_number,
2005 ?fork_tip,
2006 block_offset,
2007 "Block offset was not present in the database, this is an implementation \
2008 bug #2"
2009 );
2010 }
2011
2012 break;
2014 }
2015
2016 let Some((fork_offset, block)) = fork_blocks
2017 .iter()
2018 .enumerate()
2019 .skip(1)
2021 .find(|(_fork_offset, block)| {
2022 *block.header().header().root() == block_root_to_prune
2023 })
2024 else {
2025 if !pruned_tip {
2026 error!(
2027 %best_number,
2028 ?fork_tip,
2029 block_offset,
2030 "Block offset was not present in the database, this is an implementation \
2031 bug #3"
2032 );
2033 }
2034
2035 break;
2037 };
2038
2039 if block.header().ref_count() > 1 {
2041 break;
2042 }
2043
2044 match block {
2046 ClientDatabaseBlock::InMemory { .. } => {
2047 }
2049 ClientDatabaseBlock::Persisted { .. }
2050 | ClientDatabaseBlock::PersistedConfirmed { .. } => {
2051 pruned_tip = true;
2053 break;
2054 }
2055 }
2056
2057 state.block_roots.get_mut(&block_root_to_prune);
2058 block_root_to_prune = block.header().header().prefix.parent_root;
2059 fork_blocks.swap_remove(fork_offset);
2060
2061 pruned_tip = true;
2062 }
2063
2064 pruned_tip
2065 }
2066
2067 fn confirm_canonical_block(
2070 best_number: BlockNumber,
2071 state_data: &mut StateData<Block>,
2072 options: &ClientDatabaseInnerOptions,
2073 ) {
2074 let block_offset = u64::from(options.block_confirmation_depth + BlockNumber::ONE) as usize;
2078
2079 let Some(fork_blocks) = state_data.blocks.get_mut(block_offset) else {
2080 return;
2082 };
2083
2084 {
2086 let Some(canonical_block) = fork_blocks.first_mut() else {
2087 error!(
2088 %best_number,
2089 block_offset,
2090 "Have not found a canonical block to confirm, this is an implementation bug"
2091 );
2092 return;
2093 };
2094
2095 replace_with_or_abort(canonical_block, |block| match block {
2096 ClientDatabaseBlock::InMemory { .. } => {
2097 error!(
2098 %best_number,
2099 block_offset,
2100 header = ?block.header(),
2101 "Block to be confirmed must not be in memory, this is an implementation bug"
2102 );
2103 block
2104 }
2105 ClientDatabaseBlock::Persisted {
2106 header,
2107 block_details: _,
2108 beacon_chain_block_details,
2109 write_location,
2110 } => ClientDatabaseBlock::PersistedConfirmed {
2111 header,
2112 beacon_chain_block_details,
2113 write_location,
2114 },
2115 ClientDatabaseBlock::PersistedConfirmed { .. } => {
2116 error!(
2117 %best_number,
2118 block_offset,
2119 header = ?block.header(),
2120 "Block to be confirmed must not be confirmed yet, this is an \
2121 implementation bug"
2122 );
2123 block
2124 }
2125 });
2126 }
2127
2128 let mut block_roots_to_prune = fork_blocks
2130 .drain(1..)
2131 .map(|block| *block.header().header().root())
2132 .collect::<Vec<_>>();
2133 let mut current_block_offset = block_offset;
2134 while !block_roots_to_prune.is_empty() {
2135 state_data
2137 .fork_tips
2138 .retain(|fork_tip| !block_roots_to_prune.contains(&fork_tip.root));
2139
2140 for block_root in &block_roots_to_prune {
2142 state_data.block_roots.remove(block_root);
2143 }
2144
2145 if let Some(next_block_offset) = current_block_offset.checked_sub(1) {
2147 current_block_offset = next_block_offset;
2148 } else {
2149 break;
2151 }
2152
2153 let fork_blocks = state_data
2154 .blocks
2155 .get_mut(current_block_offset)
2156 .expect("Lower block offset always exists; qed");
2157
2158 block_roots_to_prune = fork_blocks
2160 .drain_filter(|block| {
2161 let header = block.header().header();
2162
2163 block_roots_to_prune.contains(&header.prefix.parent_root)
2164 })
2165 .map(|block| *block.header().header().root())
2166 .collect();
2167 }
2168 }
2169}