1use crate::importing_blocks::{ImportingBlockHandle, ImportingBlocks, ParentBlockImportStatus};
2use crate::{BlockImport, BlockImportError};
3use ab_client_api::{BlockDetails, BlockOrigin, ChainInfo, ChainInfoWrite};
4use ab_client_block_verification::{BlockVerification, BlockVerificationError};
5use ab_client_consensus_common::BlockImportingNotification;
6use ab_client_consensus_common::consensus_parameters::{
7 DeriveConsensusParametersChainInfo, DeriveConsensusParametersConsensusInfo,
8 ShardMembershipEntropySourceChainInfo,
9};
10use ab_client_consensus_common::state::GlobalState;
11use ab_core_primitives::block::header::owned::OwnedBeaconChainHeader;
12use ab_core_primitives::block::owned::OwnedBeaconChainBlock;
13use ab_core_primitives::block::{BlockNumber, BlockRoot};
14use ab_core_primitives::hashes::Blake3Hash;
15use ab_core_primitives::pot::PotOutput;
16use ab_proof_of_space::Table;
17use futures::channel::mpsc;
18use futures::prelude::*;
19use rclite::Arc;
20use send_future::SendFuture;
21use std::marker::PhantomData;
22use std::sync::Arc as StdArc;
23use tracing::{info, warn};
24
25#[derive(Debug, thiserror::Error)]
27pub enum BeaconChainBlockImportError {
28 #[error("Block verification error: {error}")]
30 VerificationError {
31 #[from]
33 error: BlockVerificationError,
34 },
35}
36
37impl From<BeaconChainBlockImportError> for BlockImportError {
38 #[inline(always)]
39 fn from(error: BeaconChainBlockImportError) -> Self {
40 Self::Custom {
41 error: error.into(),
42 }
43 }
44}
45
46#[derive(Debug)]
49struct VerificationChainInfo<'a, CI> {
50 chain_info: &'a CI,
51 importing_blocks: &'a ImportingBlocks<OwnedBeaconChainHeader>,
52}
53
54impl<'a, CI> DeriveConsensusParametersChainInfo for VerificationChainInfo<'a, CI>
55where
56 CI: ChainInfo<OwnedBeaconChainBlock>,
57{
58 fn ancestor_header_consensus_info(
59 &self,
60 ancestor_block_number: BlockNumber,
61 descendant_block_root: &BlockRoot,
62 ) -> Option<DeriveConsensusParametersConsensusInfo> {
63 if let Some(consensus_info) = self
64 .chain_info
65 .ancestor_header_consensus_info(ancestor_block_number, descendant_block_root)
66 {
67 return Some(consensus_info);
68 }
69
70 let mut current_block_root = *descendant_block_root;
71 loop {
72 let Some(importing_entry) = self.importing_blocks.get(¤t_block_root) else {
73 break;
74 };
75 let header = importing_entry.header().header();
76
77 if header.prefix.number == ancestor_block_number {
78 return Some(DeriveConsensusParametersConsensusInfo::from_consensus_info(
79 header.consensus_info,
80 ));
81 }
82
83 current_block_root = *header.root();
84 }
85
86 self.chain_info
89 .ancestor_header_consensus_info(ancestor_block_number, descendant_block_root)
90 }
91}
92
93impl<'a, CI> ShardMembershipEntropySourceChainInfo for VerificationChainInfo<'a, CI>
94where
95 CI: ChainInfo<OwnedBeaconChainBlock>,
96{
97 fn ancestor_header_proof_of_time(
98 &self,
99 ancestor_block_number: BlockNumber,
100 descendant_block_root: &BlockRoot,
101 ) -> Option<PotOutput> {
102 if let Some(header) = self
103 .chain_info
104 .ancestor_header(ancestor_block_number, descendant_block_root)
105 {
106 return Some(header.header().consensus_info.proof_of_time);
107 }
108
109 let mut current_block_root = *descendant_block_root;
110 loop {
111 let Some(importing_entry) = self.importing_blocks.get(¤t_block_root) else {
112 break;
113 };
114 let header = importing_entry.header();
115
116 if header.header().prefix.number == ancestor_block_number {
117 return Some(header.header().consensus_info.proof_of_time);
118 }
119
120 current_block_root = *header.header().root();
121 }
122
123 self.chain_info
126 .ancestor_header_proof_of_time(ancestor_block_number, descendant_block_root)
127 }
128}
129
130#[derive(Debug)]
131pub struct BeaconChainBlockImport<PosTable, CI, BV> {
132 chain_info: CI,
133 block_verification: BV,
134 importing_blocks: ImportingBlocks<OwnedBeaconChainHeader>,
135 block_importing_notification_sender: mpsc::Sender<BlockImportingNotification>,
136 block_import_notification_sender: mpsc::Sender<OwnedBeaconChainBlock>,
137 _pos_table: PhantomData<PosTable>,
138}
139
140impl<PosTable, CI, BV> BlockImport<OwnedBeaconChainBlock>
141 for BeaconChainBlockImport<PosTable, CI, BV>
142where
143 PosTable: Table,
144 CI: ChainInfoWrite<OwnedBeaconChainBlock>,
145 BV: BlockVerification<OwnedBeaconChainBlock>,
146{
147 fn import(
148 &self,
149 block: OwnedBeaconChainBlock,
150 origin: BlockOrigin,
151 ) -> Result<impl Future<Output = Result<(), BlockImportError>> + Send, BlockImportError> {
152 let parent_root = &block.header.header().prefix.parent_root;
153
154 let (parent_header, parent_block_mmr, parent_block_import_status) =
155 if let Some((parent_header, parent_block_details)) =
156 self.chain_info.header_with_details(parent_root)
157 {
158 (
159 parent_header,
160 parent_block_details.mmr_with_block,
161 ParentBlockImportStatus::Imported {
162 system_contract_states: parent_block_details.system_contract_states,
163 },
164 )
165 } else if let Some(importing_entry) = self.importing_blocks.get(parent_root) {
166 (
167 importing_entry.header().clone(),
168 Arc::clone(importing_entry.mmr()),
169 ParentBlockImportStatus::Importing {
170 entry: importing_entry,
171 },
172 )
173 } else {
174 return Err(BlockImportError::UnknownParentBlock {
175 block_root: *parent_root,
176 });
177 };
178
179 let parent_block_mmr_root = Blake3Hash::from(
180 parent_block_mmr
181 .root()
182 .ok_or(BlockImportError::ParentBlockMmrInvalid)?,
183 );
184 let mut block_mmr = *parent_block_mmr;
185
186 if !block_mmr.add_leaf(&block.header.header().root()) {
187 return Err(BlockImportError::CantExtendMmr);
188 }
189
190 let importing_handle = self
191 .importing_blocks
192 .insert(block.header.clone(), Arc::new(block_mmr))
193 .ok_or(BlockImportError::AlreadyImporting)?;
194
195 if self
196 .chain_info
197 .header(&block.header.header().root())
198 .is_some()
199 {
200 return Err(BlockImportError::AlreadyImported);
201 }
202
203 Ok(self.import(
204 parent_header,
205 parent_block_mmr_root,
206 block,
207 origin,
208 importing_handle,
209 parent_block_import_status,
210 ))
211 }
212}
213
214impl<PosTable, CI, BV> BeaconChainBlockImport<PosTable, CI, BV>
215where
216 PosTable: Table,
217 CI: ChainInfoWrite<OwnedBeaconChainBlock>,
218 BV: BlockVerification<OwnedBeaconChainBlock>,
219{
220 #[inline(always)]
222 pub fn new(
223 chain_info: CI,
224 block_verification: BV,
225 block_importing_notification_sender: mpsc::Sender<BlockImportingNotification>,
226 block_import_notification_sender: mpsc::Sender<OwnedBeaconChainBlock>,
227 ) -> Self {
228 Self {
229 chain_info,
230 block_verification,
231 importing_blocks: ImportingBlocks::new(),
232 block_importing_notification_sender,
233 block_import_notification_sender,
234 _pos_table: PhantomData,
235 }
236 }
237
238 async fn import(
239 &self,
240 parent_header: OwnedBeaconChainHeader,
241 parent_block_mmr_root: Blake3Hash,
242 block: OwnedBeaconChainBlock,
243 origin: BlockOrigin,
244 importing_handle: ImportingBlockHandle<OwnedBeaconChainHeader>,
245 parent_block_import_status: ParentBlockImportStatus<OwnedBeaconChainHeader>,
246 ) -> Result<(), BlockImportError> {
247 let parent_header = parent_header.header();
248 let header = block.header.header();
249 let body = block.body.body();
250
251 let log_block_import = match origin {
252 BlockOrigin::LocalBlockBuilder { .. } => true,
253 BlockOrigin::Sync => false,
254 BlockOrigin::Broadcast => true,
255 };
256
257 self.block_verification
260 .verify_concurrent(
261 parent_header,
262 &parent_block_mmr_root,
263 header,
264 body,
265 &origin,
266 &VerificationChainInfo {
267 chain_info: &self.chain_info,
268 importing_blocks: &self.importing_blocks,
269 },
270 )
271 .send()
272 .await
273 .map_err(BeaconChainBlockImportError::from)?;
274
275 let Some(system_contract_states) = parent_block_import_status.wait().await else {
276 return Err(BlockImportError::ParentBlockImportFailed);
277 };
278
279 self.block_verification
282 .verify_sequential(parent_header, &parent_block_mmr_root, header, body, &origin)
283 .send()
284 .await
285 .map_err(BeaconChainBlockImportError::from)?;
286
287 let global_state = GlobalState::new(&system_contract_states);
288
289 let state_root = global_state.root();
292
293 if header.result.state_root != state_root {
294 return Err(BlockImportError::InvalidStateRoot {
295 expected: state_root,
296 actual: header.result.state_root,
297 });
298 }
299
300 let system_contract_states = global_state.to_system_contract_states();
301
302 let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
303 if let Err(error) = self
304 .block_importing_notification_sender
305 .clone()
306 .send(BlockImportingNotification {
307 block_number: header.prefix.number,
308 acknowledgement_sender,
309 })
310 .await
311 {
312 warn!(%error, "Failed to send block importing notification");
313 }
314
315 while acknowledgement_receiver.next().await.is_some() {
316 }
318
319 let number = header.prefix.number;
320 let root = *header.root();
321
322 self.chain_info
323 .persist_block(
324 block.clone(),
325 BlockDetails {
326 mmr_with_block: Arc::clone(importing_handle.mmr()),
327 system_contract_states: StdArc::clone(&system_contract_states),
328 },
329 )
330 .await?;
331
332 importing_handle.set_success(system_contract_states);
333
334 if let Err(error) = self
335 .block_import_notification_sender
336 .clone()
337 .send(block)
338 .await
339 {
340 warn!(
341 %error,
342 block_number = %number,
343 block_root = %root,
344 "Failed to send block import notification"
345 );
346 }
347
348 if log_block_import {
349 info!(
350 %number,
351 %root,
352 "🏆 Imported block",
353 );
354 }
355
356 Ok(())
357 }
358}