1use crate::importing_blocks::{ImportingBlockHandle, ImportingBlocks, ParentBlockImportStatus};
2use crate::{BlockImport, BlockImportError};
3use ab_client_api::{BlockDetails, BlockOrigin, ChainInfoWrite};
4use ab_client_block_verification::{BlockVerification, BlockVerificationError};
5use ab_client_consensus_common::BlockImportingNotification;
6use ab_client_consensus_common::state::GlobalState;
7use ab_core_primitives::block::header::owned::OwnedBeaconChainHeader;
8use ab_core_primitives::block::owned::OwnedBeaconChainBlock;
9use ab_core_primitives::hashes::Blake3Hash;
10use ab_proof_of_space::Table;
11use futures::channel::mpsc;
12use futures::prelude::*;
13use rclite::Arc;
14use send_future::SendFuture;
15use std::marker::PhantomData;
16use std::sync::Arc as StdArc;
17use tracing::{info, warn};
18
19#[derive(Debug, thiserror::Error)]
21pub enum BeaconChainBlockImportError {
22 #[error("Block verification error: {error}")]
24 VerificationError {
25 #[from]
27 error: BlockVerificationError,
28 },
29}
30
31impl From<BeaconChainBlockImportError> for BlockImportError {
32 #[inline(always)]
33 fn from(error: BeaconChainBlockImportError) -> Self {
34 Self::Custom {
35 error: error.into(),
36 }
37 }
38}
39
40#[derive(Debug)]
41pub struct BeaconChainBlockImport<PosTable, CI, BV> {
42 chain_info: CI,
43 block_verification: BV,
44 importing_blocks: ImportingBlocks<OwnedBeaconChainHeader>,
45 block_importing_notification_sender: mpsc::Sender<BlockImportingNotification>,
46 block_import_notification_sender: mpsc::Sender<OwnedBeaconChainBlock>,
47 _pos_table: PhantomData<PosTable>,
48}
49
50impl<PosTable, CI, BV> BlockImport<OwnedBeaconChainBlock>
51 for BeaconChainBlockImport<PosTable, CI, BV>
52where
53 PosTable: Table,
54 CI: ChainInfoWrite<OwnedBeaconChainBlock>,
55 BV: BlockVerification<OwnedBeaconChainBlock>,
56{
57 fn import(
58 &self,
59 block: OwnedBeaconChainBlock,
60 origin: BlockOrigin,
61 ) -> Result<impl Future<Output = Result<(), BlockImportError>> + Send, BlockImportError> {
62 let parent_root = &block.header.header().prefix.parent_root;
63
64 let (parent_header, parent_block_mmr, parent_block_import_status) =
65 if let Some((parent_header, parent_block_details)) =
66 self.chain_info.header_with_details(parent_root)
67 {
68 (
69 parent_header,
70 parent_block_details.mmr_with_block,
71 ParentBlockImportStatus::Imported {
72 system_contract_states: parent_block_details.system_contract_states,
73 },
74 )
75 } else if let Some(importing_entry) = self.importing_blocks.get(parent_root) {
76 (
77 importing_entry.header().clone(),
78 Arc::clone(importing_entry.mmr()),
79 ParentBlockImportStatus::Importing {
80 entry: importing_entry,
81 },
82 )
83 } else {
84 return Err(BlockImportError::UnknownParentBlock {
85 block_root: *parent_root,
86 });
87 };
88
89 let parent_block_mmr_root = Blake3Hash::from(
90 parent_block_mmr
91 .root()
92 .ok_or(BlockImportError::ParentBlockMmrInvalid)?,
93 );
94 let mut block_mmr = *parent_block_mmr;
95
96 if !block_mmr.add_leaf(&block.header.header().root()) {
97 return Err(BlockImportError::CantExtendMmr);
98 }
99
100 let importing_handle = self
101 .importing_blocks
102 .insert(block.header.clone(), Arc::new(block_mmr))
103 .ok_or(BlockImportError::AlreadyImporting)?;
104
105 if self
106 .chain_info
107 .header(&block.header.header().root())
108 .is_some()
109 {
110 return Err(BlockImportError::AlreadyImported);
111 }
112
113 Ok(self.import(
114 parent_header,
115 parent_block_mmr_root,
116 block,
117 origin,
118 importing_handle,
119 parent_block_import_status,
120 ))
121 }
122}
123
124impl<PosTable, CI, BV> BeaconChainBlockImport<PosTable, CI, BV>
125where
126 PosTable: Table,
127 CI: ChainInfoWrite<OwnedBeaconChainBlock>,
128 BV: BlockVerification<OwnedBeaconChainBlock>,
129{
130 #[inline(always)]
132 pub fn new(
133 chain_info: CI,
134 block_verification: BV,
135 block_importing_notification_sender: mpsc::Sender<BlockImportingNotification>,
136 block_import_notification_sender: mpsc::Sender<OwnedBeaconChainBlock>,
137 ) -> Self {
138 Self {
139 chain_info,
140 block_verification,
141 importing_blocks: ImportingBlocks::new(),
142 block_importing_notification_sender,
143 block_import_notification_sender,
144 _pos_table: PhantomData,
145 }
146 }
147
148 async fn import(
149 &self,
150 parent_header: OwnedBeaconChainHeader,
151 parent_block_mmr_root: Blake3Hash,
152 block: OwnedBeaconChainBlock,
153 origin: BlockOrigin,
154 importing_handle: ImportingBlockHandle<OwnedBeaconChainHeader>,
155 parent_block_import_status: ParentBlockImportStatus<OwnedBeaconChainHeader>,
156 ) -> Result<(), BlockImportError> {
157 let parent_header = parent_header.header();
158 let header = block.header.header();
159 let body = block.body.body();
160
161 let log_block_import = match origin {
162 BlockOrigin::LocalBlockBuilder { .. } => true,
163 BlockOrigin::Sync => false,
164 BlockOrigin::Broadcast => true,
165 };
166
167 self.block_verification
170 .verify(parent_header, &parent_block_mmr_root, header, body, origin)
171 .send()
172 .await
173 .map_err(BeaconChainBlockImportError::from)?;
174
175 if parent_block_import_status.has_failed() {
176 return Err(BlockImportError::ParentBlockImportFailed);
178 }
179
180 let Some(system_contract_states) = parent_block_import_status.wait().await else {
181 return Err(BlockImportError::ParentBlockImportFailed);
182 };
183
184 let global_state = GlobalState::new(&system_contract_states);
185
186 let state_root = global_state.root();
189
190 if header.result.state_root != state_root {
191 return Err(BlockImportError::InvalidStateRoot {
192 expected: state_root,
193 actual: header.result.state_root,
194 });
195 }
196
197 let system_contract_states = global_state.to_system_contract_states();
198
199 let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
200 if let Err(error) = self
201 .block_importing_notification_sender
202 .clone()
203 .send(BlockImportingNotification {
204 block_number: header.prefix.number,
205 acknowledgement_sender,
206 })
207 .await
208 {
209 warn!(%error, "Failed to send block importing notification");
210 }
211
212 while acknowledgement_receiver.next().await.is_some() {
213 }
215
216 let number = header.prefix.number;
217 let root = *header.root();
218
219 self.chain_info
220 .persist_block(
221 block.clone(),
222 BlockDetails {
223 mmr_with_block: Arc::clone(importing_handle.mmr()),
224 system_contract_states: StdArc::clone(&system_contract_states),
225 },
226 )
227 .await?;
228
229 importing_handle.set_success(system_contract_states);
230
231 if let Err(error) = self
232 .block_import_notification_sender
233 .clone()
234 .send(block)
235 .await
236 {
237 warn!(
238 %error,
239 block_number = %number,
240 block_root = %root,
241 "Failed to send block import notification"
242 );
243 }
244
245 if log_block_import {
246 info!(
247 %number,
248 %root,
249 "🏆 Imported block",
250 );
251 }
252
253 Ok(())
254 }
255}