1use crate::importing_blocks::{ImportingBlockHandle, ImportingBlocks, ParentBlockImportStatus};
2use crate::{BlockImport, BlockImportError};
3use ab_client_api::{
4 BeaconChainInfoWrite, BlockDetails, BlockOrigin, ChainInfo, PersistSuperSegmentHeadersError,
5};
6use ab_client_block_verification::{BlockVerification, BlockVerificationError};
7use ab_client_consensus_common::BlockImportingNotification;
8use ab_client_consensus_common::consensus_parameters::{
9 DeriveConsensusParametersChainInfo, DeriveConsensusParametersConsensusInfo,
10 ShardMembershipEntropySourceChainInfo,
11};
12use ab_client_consensus_common::state::GlobalState;
13use ab_core_primitives::block::header::owned::OwnedBeaconChainHeader;
14use ab_core_primitives::block::owned::OwnedBeaconChainBlock;
15use ab_core_primitives::block::{BlockNumber, BlockRoot};
16use ab_core_primitives::hashes::Blake3Hash;
17use ab_core_primitives::pot::PotOutput;
18use ab_core_primitives::segments::SuperSegment;
19use ab_proof_of_space::Table;
20use async_lock::Mutex as AsyncMutex;
21use futures::channel::mpsc;
22use futures::prelude::*;
23use rclite::Arc;
24use send_future::SendFuture;
25use std::marker::PhantomData;
26use std::sync::Arc as StdArc;
27use tracing::{info, warn};
28
29#[derive(Debug, thiserror::Error)]
31pub enum BeaconChainBlockImportError {
32 #[error("Block verification error: {error}")]
34 VerificationError {
35 #[from]
37 error: BlockVerificationError,
38 },
39 #[error("Failed to persist a new super segment header: {error}")]
41 PersistSuperSegmentHeaders {
42 #[from]
44 error: PersistSuperSegmentHeadersError,
45 },
46}
47
48impl From<BeaconChainBlockImportError> for BlockImportError {
49 #[inline(always)]
50 fn from(error: BeaconChainBlockImportError) -> Self {
51 Self::Custom {
52 error: error.into(),
53 }
54 }
55}
56
57#[derive(Debug)]
60struct VerificationChainInfo<'a, CI> {
61 chain_info: &'a CI,
62 importing_blocks: &'a ImportingBlocks<OwnedBeaconChainHeader>,
63}
64
65impl<'a, CI> DeriveConsensusParametersChainInfo for VerificationChainInfo<'a, CI>
66where
67 CI: ChainInfo<OwnedBeaconChainBlock>,
68{
69 fn ancestor_header_consensus_info(
70 &self,
71 ancestor_block_number: BlockNumber,
72 descendant_block_root: &BlockRoot,
73 ) -> Option<DeriveConsensusParametersConsensusInfo> {
74 if let Some(consensus_info) = self
75 .chain_info
76 .ancestor_header_consensus_info(ancestor_block_number, descendant_block_root)
77 {
78 return Some(consensus_info);
79 }
80
81 let mut current_block_root = *descendant_block_root;
82 loop {
83 let Some(importing_entry) = self.importing_blocks.get(¤t_block_root) else {
84 break;
85 };
86 let header = importing_entry.header().header();
87
88 if header.prefix.number == ancestor_block_number {
89 return Some(DeriveConsensusParametersConsensusInfo::from_consensus_info(
90 header.consensus_info,
91 ));
92 }
93
94 current_block_root = *header.root();
95 }
96
97 self.chain_info
100 .ancestor_header_consensus_info(ancestor_block_number, descendant_block_root)
101 }
102}
103
104impl<'a, CI> ShardMembershipEntropySourceChainInfo for VerificationChainInfo<'a, CI>
105where
106 CI: ChainInfo<OwnedBeaconChainBlock>,
107{
108 fn ancestor_header_proof_of_time(
109 &self,
110 ancestor_block_number: BlockNumber,
111 descendant_block_root: &BlockRoot,
112 ) -> Option<PotOutput> {
113 if let Some(header) = self
114 .chain_info
115 .ancestor_header(ancestor_block_number, descendant_block_root)
116 {
117 return Some(header.header().consensus_info.proof_of_time);
118 }
119
120 let mut current_block_root = *descendant_block_root;
121 loop {
122 let Some(importing_entry) = self.importing_blocks.get(¤t_block_root) else {
123 break;
124 };
125 let header = importing_entry.header();
126
127 if header.header().prefix.number == ancestor_block_number {
128 return Some(header.header().consensus_info.proof_of_time);
129 }
130
131 current_block_root = *header.header().root();
132 }
133
134 self.chain_info
137 .ancestor_header_proof_of_time(ancestor_block_number, descendant_block_root)
138 }
139}
140
141#[derive(Debug)]
142pub struct BeaconChainBlockImport<PosTable, CI, BV> {
143 chain_info: CI,
144 block_verification: BV,
145 importing_blocks: ImportingBlocks<OwnedBeaconChainHeader>,
146 block_importing_notification_sender: mpsc::Sender<BlockImportingNotification>,
147 super_segments_sender: AsyncMutex<mpsc::Sender<SuperSegment>>,
148 block_import_notification_sender: mpsc::Sender<OwnedBeaconChainBlock>,
149 _pos_table: PhantomData<PosTable>,
150}
151
152impl<PosTable, CI, BV> BlockImport<OwnedBeaconChainBlock>
153 for BeaconChainBlockImport<PosTable, CI, BV>
154where
155 PosTable: Table,
156 CI: BeaconChainInfoWrite,
157 BV: BlockVerification<OwnedBeaconChainBlock, Option<SuperSegment>>,
158{
159 fn import(
160 &self,
161 block: OwnedBeaconChainBlock,
162 origin: BlockOrigin,
163 ) -> Result<impl Future<Output = Result<(), BlockImportError>> + Send, BlockImportError> {
164 let parent_root = &block.header.header().prefix.parent_root;
165
166 let (parent_header, parent_block_mmr, parent_block_import_status) =
167 if let Some((parent_header, parent_block_details)) =
168 self.chain_info.header_with_details(parent_root)
169 {
170 (
171 parent_header,
172 parent_block_details.mmr_with_block,
173 ParentBlockImportStatus::Imported {
174 system_contract_states: parent_block_details.system_contract_states,
175 },
176 )
177 } else if let Some(importing_entry) = self.importing_blocks.get(parent_root) {
178 (
179 importing_entry.header().clone(),
180 Arc::clone(importing_entry.mmr()),
181 ParentBlockImportStatus::Importing {
182 entry: importing_entry,
183 },
184 )
185 } else {
186 return Err(BlockImportError::UnknownParentBlock {
187 block_root: *parent_root,
188 });
189 };
190
191 let parent_block_mmr_root = Blake3Hash::from(
192 parent_block_mmr
193 .root()
194 .ok_or(BlockImportError::ParentBlockMmrInvalid)?,
195 );
196 let mut block_mmr = *parent_block_mmr;
197
198 if !block_mmr.add_leaf(&block.header.header().root()) {
199 return Err(BlockImportError::CantExtendMmr);
200 }
201
202 let importing_handle = self
203 .importing_blocks
204 .insert(block.header.clone(), Arc::new(block_mmr))
205 .ok_or(BlockImportError::AlreadyImporting)?;
206
207 if self
208 .chain_info
209 .header(&block.header.header().root())
210 .is_some()
211 {
212 return Err(BlockImportError::AlreadyImported);
213 }
214
215 Ok(self.import(
216 parent_header,
217 parent_block_mmr_root,
218 block,
219 origin,
220 importing_handle,
221 parent_block_import_status,
222 ))
223 }
224}
225
226impl<PosTable, CI, BV> BeaconChainBlockImport<PosTable, CI, BV>
227where
228 PosTable: Table,
229 CI: BeaconChainInfoWrite,
230 BV: BlockVerification<OwnedBeaconChainBlock, Option<SuperSegment>>,
231{
232 #[inline(always)]
234 pub fn new(
235 chain_info: CI,
236 block_verification: BV,
237 block_importing_notification_sender: mpsc::Sender<BlockImportingNotification>,
238 super_segments_sender: mpsc::Sender<SuperSegment>,
239 block_import_notification_sender: mpsc::Sender<OwnedBeaconChainBlock>,
240 ) -> Self {
241 Self {
242 chain_info,
243 block_verification,
244 importing_blocks: ImportingBlocks::new(),
245 block_importing_notification_sender,
246 super_segments_sender: AsyncMutex::new(super_segments_sender),
247 block_import_notification_sender,
248 _pos_table: PhantomData,
249 }
250 }
251
252 async fn import(
253 &self,
254 parent_header: OwnedBeaconChainHeader,
255 parent_block_mmr_root: Blake3Hash,
256 block: OwnedBeaconChainBlock,
257 origin: BlockOrigin,
258 importing_handle: ImportingBlockHandle<OwnedBeaconChainHeader>,
259 parent_block_import_status: ParentBlockImportStatus<OwnedBeaconChainHeader>,
260 ) -> Result<(), BlockImportError> {
261 let parent_header = parent_header.header();
262 let header = block.header.header();
263 let body = block.body.body();
264
265 let log_block_import = match origin {
266 BlockOrigin::LocalBlockBuilder { .. } => true,
267 BlockOrigin::Sync => false,
268 BlockOrigin::Broadcast => true,
269 };
270
271 self.block_verification
274 .verify_concurrent(
275 parent_header,
276 &parent_block_mmr_root,
277 header,
278 body,
279 &origin,
280 &VerificationChainInfo {
281 chain_info: &self.chain_info,
282 importing_blocks: &self.importing_blocks,
283 },
284 )
285 .send()
286 .await
287 .map_err(BeaconChainBlockImportError::from)?;
288
289 let Some(system_contract_states) = parent_block_import_status.wait().await else {
290 return Err(BlockImportError::ParentBlockImportFailed);
291 };
292
293 let maybe_super_segment = self
296 .block_verification
297 .verify_sequential(parent_header, &parent_block_mmr_root, header, body, &origin)
298 .send()
299 .await
300 .map_err(BeaconChainBlockImportError::from)?;
301
302 let global_state = GlobalState::new(&system_contract_states);
303
304 let state_root = global_state.root();
307
308 if header.result.state_root != state_root {
309 return Err(BlockImportError::InvalidStateRoot {
310 expected: state_root,
311 actual: header.result.state_root,
312 });
313 }
314
315 let system_contract_states = global_state.to_system_contract_states();
316
317 let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
318 if let Err(error) = self
319 .block_importing_notification_sender
320 .clone()
321 .send(BlockImportingNotification {
322 block_number: header.prefix.number,
323 acknowledgement_sender,
324 })
325 .await
326 {
327 warn!(%error, "Failed to send block importing notification");
328 }
329
330 while acknowledgement_receiver.next().await.is_some() {
331 }
333
334 let number = header.prefix.number;
335 let root = *header.root();
336
337 if let Some(super_segment) = maybe_super_segment {
338 let mut super_segment_sender = self.super_segments_sender.lock().await;
339 if self
340 .chain_info
341 .persist_super_segment_header(super_segment.header)
342 .await
343 .map_err(
344 |error| BeaconChainBlockImportError::PersistSuperSegmentHeaders { error },
345 )?
346 && let Err(error) = super_segment_sender.send(super_segment).await
347 {
348 warn!(%error, "Failed to send a super segment notification");
349 }
350 }
351
352 self.chain_info
353 .persist_block(
354 block.clone(),
355 BlockDetails {
356 mmr_with_block: Arc::clone(importing_handle.mmr()),
357 system_contract_states: StdArc::clone(&system_contract_states),
358 },
359 )
360 .await?;
361
362 importing_handle.set_success(system_contract_states);
363
364 if let Err(error) = self
365 .block_import_notification_sender
366 .clone()
367 .send(block)
368 .await
369 {
370 warn!(
371 %error,
372 block_number = %number,
373 block_root = %root,
374 "Failed to send block import notification"
375 );
376 }
377
378 if log_block_import {
379 info!(
380 %number,
381 %root,
382 "🏆 Imported block",
383 );
384 }
385
386 Ok(())
387 }
388}