1use crate::importing_blocks::{ImportingBlockHandle, ImportingBlocks, ParentBlockImportStatus};
2use crate::{BlockImport, BlockImportError};
3use ab_client_api::{
4 BeaconChainInfoWrite, BlockDetails, BlockOrigin, ChainInfo, PersistSuperSegmentHeadersError,
5};
6use ab_client_block_verification::{BlockVerification, BlockVerificationError};
7use ab_client_consensus_common::BlockImportingNotification;
8use ab_client_consensus_common::consensus_parameters::{
9 DeriveConsensusParametersChainInfo, DeriveConsensusParametersConsensusInfo,
10 ShardMembershipEntropySourceChainInfo,
11};
12use ab_client_consensus_common::state::GlobalState;
13use ab_core_primitives::block::header::owned::OwnedBeaconChainHeader;
14use ab_core_primitives::block::owned::OwnedBeaconChainBlock;
15use ab_core_primitives::block::{BlockNumber, BlockRoot};
16use ab_core_primitives::hashes::Blake3Hash;
17use ab_core_primitives::pot::PotOutput;
18use ab_core_primitives::segments::SuperSegment;
19use ab_proof_of_space::Table;
20use futures::channel::mpsc;
21use futures::prelude::*;
22use rclite::Arc;
23use send_future::SendFuture;
24use std::marker::PhantomData;
25use std::sync::Arc as StdArc;
26use tracing::{info, warn};
27
28#[derive(Debug, thiserror::Error)]
30pub enum BeaconChainBlockImportError {
31 #[error("Block verification error: {error}")]
33 VerificationError {
34 #[from]
36 error: BlockVerificationError,
37 },
38 #[error("Failed to persist a new super segment header: {error}")]
40 PersistSuperSegmentHeaders {
41 #[from]
43 error: PersistSuperSegmentHeadersError,
44 },
45}
46
47impl From<BeaconChainBlockImportError> for BlockImportError {
48 #[inline(always)]
49 fn from(error: BeaconChainBlockImportError) -> Self {
50 Self::Custom {
51 error: error.into(),
52 }
53 }
54}
55
56#[derive(Debug)]
59struct VerificationChainInfo<'a, CI> {
60 chain_info: &'a CI,
61 importing_blocks: &'a ImportingBlocks<OwnedBeaconChainHeader>,
62}
63
64impl<'a, CI> DeriveConsensusParametersChainInfo for VerificationChainInfo<'a, CI>
65where
66 CI: ChainInfo<OwnedBeaconChainBlock>,
67{
68 fn ancestor_header_consensus_info(
69 &self,
70 ancestor_block_number: BlockNumber,
71 descendant_block_root: &BlockRoot,
72 ) -> Option<DeriveConsensusParametersConsensusInfo> {
73 if let Some(consensus_info) = self
74 .chain_info
75 .ancestor_header_consensus_info(ancestor_block_number, descendant_block_root)
76 {
77 return Some(consensus_info);
78 }
79
80 let mut current_block_root = *descendant_block_root;
81 loop {
82 let Some(importing_entry) = self.importing_blocks.get(¤t_block_root) else {
83 break;
84 };
85 let header = importing_entry.header().header();
86
87 if header.prefix.number == ancestor_block_number {
88 return Some(DeriveConsensusParametersConsensusInfo::from_consensus_info(
89 header.consensus_info,
90 ));
91 }
92
93 current_block_root = *header.root();
94 }
95
96 self.chain_info
99 .ancestor_header_consensus_info(ancestor_block_number, descendant_block_root)
100 }
101}
102
103impl<'a, CI> ShardMembershipEntropySourceChainInfo for VerificationChainInfo<'a, CI>
104where
105 CI: ChainInfo<OwnedBeaconChainBlock>,
106{
107 fn ancestor_header_proof_of_time(
108 &self,
109 ancestor_block_number: BlockNumber,
110 descendant_block_root: &BlockRoot,
111 ) -> Option<PotOutput> {
112 if let Some(header) = self
113 .chain_info
114 .ancestor_header(ancestor_block_number, descendant_block_root)
115 {
116 return Some(header.header().consensus_info.proof_of_time);
117 }
118
119 let mut current_block_root = *descendant_block_root;
120 loop {
121 let Some(importing_entry) = self.importing_blocks.get(¤t_block_root) else {
122 break;
123 };
124 let header = importing_entry.header();
125
126 if header.header().prefix.number == ancestor_block_number {
127 return Some(header.header().consensus_info.proof_of_time);
128 }
129
130 current_block_root = *header.header().root();
131 }
132
133 self.chain_info
136 .ancestor_header_proof_of_time(ancestor_block_number, descendant_block_root)
137 }
138}
139
140#[derive(Debug)]
141pub struct BeaconChainBlockImport<PosTable, CI, BV> {
142 chain_info: CI,
143 block_verification: BV,
144 importing_blocks: ImportingBlocks<OwnedBeaconChainHeader>,
145 block_importing_notification_sender: mpsc::Sender<BlockImportingNotification>,
146 super_segments_sender: mpsc::Sender<SuperSegment>,
147 block_import_notification_sender: mpsc::Sender<OwnedBeaconChainBlock>,
148 _pos_table: PhantomData<PosTable>,
149}
150
151impl<PosTable, CI, BV> BlockImport<OwnedBeaconChainBlock>
152 for BeaconChainBlockImport<PosTable, CI, BV>
153where
154 PosTable: Table,
155 CI: BeaconChainInfoWrite,
156 BV: BlockVerification<OwnedBeaconChainBlock, Option<SuperSegment>>,
157{
158 fn import(
159 &self,
160 block: OwnedBeaconChainBlock,
161 origin: BlockOrigin,
162 ) -> Result<impl Future<Output = Result<(), BlockImportError>> + Send, BlockImportError> {
163 let parent_root = &block.header.header().prefix.parent_root;
164
165 let (parent_header, parent_block_mmr, parent_block_import_status) =
166 if let Some((parent_header, parent_block_details)) =
167 self.chain_info.header_with_details(parent_root)
168 {
169 (
170 parent_header,
171 parent_block_details.mmr_with_block,
172 ParentBlockImportStatus::Imported {
173 system_contract_states: parent_block_details.system_contract_states,
174 },
175 )
176 } else if let Some(importing_entry) = self.importing_blocks.get(parent_root) {
177 (
178 importing_entry.header().clone(),
179 Arc::clone(importing_entry.mmr()),
180 ParentBlockImportStatus::Importing {
181 entry: importing_entry,
182 },
183 )
184 } else {
185 return Err(BlockImportError::UnknownParentBlock {
186 block_root: *parent_root,
187 });
188 };
189
190 let parent_block_mmr_root = Blake3Hash::from(
191 parent_block_mmr
192 .root()
193 .ok_or(BlockImportError::ParentBlockMmrInvalid)?,
194 );
195 let mut block_mmr = *parent_block_mmr;
196
197 if !block_mmr.add_leaf(&block.header.header().root()) {
198 return Err(BlockImportError::CantExtendMmr);
199 }
200
201 let importing_handle = self
202 .importing_blocks
203 .insert(block.header.clone(), Arc::new(block_mmr))
204 .ok_or(BlockImportError::AlreadyImporting)?;
205
206 if self
207 .chain_info
208 .header(&block.header.header().root())
209 .is_some()
210 {
211 return Err(BlockImportError::AlreadyImported);
212 }
213
214 Ok(self.import(
215 parent_header,
216 parent_block_mmr_root,
217 block,
218 origin,
219 importing_handle,
220 parent_block_import_status,
221 ))
222 }
223}
224
225impl<PosTable, CI, BV> BeaconChainBlockImport<PosTable, CI, BV>
226where
227 PosTable: Table,
228 CI: BeaconChainInfoWrite,
229 BV: BlockVerification<OwnedBeaconChainBlock, Option<SuperSegment>>,
230{
231 #[inline(always)]
233 pub fn new(
234 chain_info: CI,
235 block_verification: BV,
236 block_importing_notification_sender: mpsc::Sender<BlockImportingNotification>,
237 super_segments_sender: mpsc::Sender<SuperSegment>,
238 block_import_notification_sender: mpsc::Sender<OwnedBeaconChainBlock>,
239 ) -> Self {
240 Self {
241 chain_info,
242 block_verification,
243 importing_blocks: ImportingBlocks::new(),
244 block_importing_notification_sender,
245 super_segments_sender,
246 block_import_notification_sender,
247 _pos_table: PhantomData,
248 }
249 }
250
251 async fn import(
252 &self,
253 parent_header: OwnedBeaconChainHeader,
254 parent_block_mmr_root: Blake3Hash,
255 block: OwnedBeaconChainBlock,
256 origin: BlockOrigin,
257 importing_handle: ImportingBlockHandle<OwnedBeaconChainHeader>,
258 parent_block_import_status: ParentBlockImportStatus<OwnedBeaconChainHeader>,
259 ) -> Result<(), BlockImportError> {
260 let parent_header = parent_header.header();
261 let header = block.header.header();
262 let body = block.body.body();
263
264 let log_block_import = match origin {
265 BlockOrigin::LocalBlockBuilder { .. } => true,
266 BlockOrigin::Sync => false,
267 BlockOrigin::Broadcast => true,
268 };
269
270 self.block_verification
273 .verify_concurrent(
274 parent_header,
275 &parent_block_mmr_root,
276 header,
277 body,
278 &origin,
279 &VerificationChainInfo {
280 chain_info: &self.chain_info,
281 importing_blocks: &self.importing_blocks,
282 },
283 )
284 .send()
285 .await
286 .map_err(BeaconChainBlockImportError::from)?;
287
288 let Some(system_contract_states) = parent_block_import_status.wait().await else {
289 return Err(BlockImportError::ParentBlockImportFailed);
290 };
291
292 let maybe_super_segment = self
295 .block_verification
296 .verify_sequential(parent_header, &parent_block_mmr_root, header, body, &origin)
297 .send()
298 .await
299 .map_err(BeaconChainBlockImportError::from)?;
300
301 let global_state = GlobalState::new(&system_contract_states);
302
303 let state_root = global_state.root();
306
307 if header.result.state_root != state_root {
308 return Err(BlockImportError::InvalidStateRoot {
309 expected: state_root,
310 actual: header.result.state_root,
311 });
312 }
313
314 let system_contract_states = global_state.to_system_contract_states();
315
316 let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
317 if let Err(error) = self
318 .block_importing_notification_sender
319 .clone()
320 .send(BlockImportingNotification {
321 block_number: header.prefix.number,
322 acknowledgement_sender,
323 })
324 .await
325 {
326 warn!(%error, "Failed to send block importing notification");
327 }
328
329 while acknowledgement_receiver.next().await.is_some() {
330 }
332
333 let number = header.prefix.number;
334 let root = *header.root();
335
336 if let Some(super_segment) = maybe_super_segment
337 && self
338 .chain_info
339 .persist_super_segment_header(super_segment.header)
340 .await
341 .map_err(
342 |error| BeaconChainBlockImportError::PersistSuperSegmentHeaders { error },
343 )?
344 && let Err(error) = self.super_segments_sender.clone().send(super_segment).await
345 {
346 warn!(%error, "Failed to send a super segment notification");
347 }
348
349 self.chain_info
350 .persist_block(
351 block.clone(),
352 BlockDetails {
353 mmr_with_block: Arc::clone(importing_handle.mmr()),
354 system_contract_states: StdArc::clone(&system_contract_states),
355 },
356 )
357 .await?;
358
359 importing_handle.set_success(system_contract_states);
360
361 if let Err(error) = self
362 .block_import_notification_sender
363 .clone()
364 .send(block)
365 .await
366 {
367 warn!(
368 %error,
369 block_number = %number,
370 block_root = %root,
371 "Failed to send block import notification"
372 );
373 }
374
375 if log_block_import {
376 info!(
377 %number,
378 %root,
379 "🏆 Imported block",
380 );
381 }
382
383 Ok(())
384 }
385}