Skip to main content

ab_client_block_import/
beacon_chain.rs

1use crate::importing_blocks::{ImportingBlockHandle, ImportingBlocks, ParentBlockImportStatus};
2use crate::{BlockImport, BlockImportError};
3use ab_client_api::{
4    BeaconChainInfoWrite, BlockDetails, BlockOrigin, ChainInfo, PersistSuperSegmentHeadersError,
5};
6use ab_client_block_verification::{BlockVerification, BlockVerificationError};
7use ab_client_consensus_common::BlockImportingNotification;
8use ab_client_consensus_common::consensus_parameters::{
9    DeriveConsensusParametersChainInfo, DeriveConsensusParametersConsensusInfo,
10    ShardMembershipEntropySourceChainInfo,
11};
12use ab_client_consensus_common::state::GlobalState;
13use ab_core_primitives::block::header::owned::OwnedBeaconChainHeader;
14use ab_core_primitives::block::owned::OwnedBeaconChainBlock;
15use ab_core_primitives::block::{BlockNumber, BlockRoot};
16use ab_core_primitives::hashes::Blake3Hash;
17use ab_core_primitives::pot::PotOutput;
18use ab_core_primitives::segments::SuperSegment;
19use ab_proof_of_space::Table;
20use futures::channel::mpsc;
21use futures::prelude::*;
22use rclite::Arc;
23use send_future::SendFuture;
24use std::marker::PhantomData;
25use std::sync::Arc as StdArc;
26use tracing::{info, warn};
27
28/// Errors for [`BeaconChainBlockImport`]
29#[derive(Debug, thiserror::Error)]
30pub enum BeaconChainBlockImportError {
31    /// Block verification error
32    #[error("Block verification error: {error}")]
33    VerificationError {
34        /// Block verification error
35        #[from]
36        error: BlockVerificationError,
37    },
38    /// Failed to persist a new super segment header
39    #[error("Failed to persist a new super segment header: {error}")]
40    PersistSuperSegmentHeaders {
41        /// Low-level error
42        #[from]
43        error: PersistSuperSegmentHeadersError,
44    },
45}
46
47impl From<BeaconChainBlockImportError> for BlockImportError {
48    #[inline(always)]
49    fn from(error: BeaconChainBlockImportError) -> Self {
50        Self::Custom {
51            error: error.into(),
52        }
53    }
54}
55
56/// A custom wrapper that will query chain info from either already persisted blocks or blocks that
57/// are currently queued for import
58#[derive(Debug)]
59struct VerificationChainInfo<'a, CI> {
60    chain_info: &'a CI,
61    importing_blocks: &'a ImportingBlocks<OwnedBeaconChainHeader>,
62}
63
64impl<'a, CI> DeriveConsensusParametersChainInfo for VerificationChainInfo<'a, CI>
65where
66    CI: ChainInfo<OwnedBeaconChainBlock>,
67{
68    fn ancestor_header_consensus_info(
69        &self,
70        ancestor_block_number: BlockNumber,
71        descendant_block_root: &BlockRoot,
72    ) -> Option<DeriveConsensusParametersConsensusInfo> {
73        if let Some(consensus_info) = self
74            .chain_info
75            .ancestor_header_consensus_info(ancestor_block_number, descendant_block_root)
76        {
77            return Some(consensus_info);
78        }
79
80        let mut current_block_root = *descendant_block_root;
81        loop {
82            let Some(importing_entry) = self.importing_blocks.get(&current_block_root) else {
83                break;
84            };
85            let header = importing_entry.header().header();
86
87            if header.prefix.number == ancestor_block_number {
88                return Some(DeriveConsensusParametersConsensusInfo::from_consensus_info(
89                    header.consensus_info,
90                ));
91            }
92
93            current_block_root = *header.root();
94        }
95
96        // Query again in case of a race condition where previously importing block was imported in
97        // between iterations in the above loop
98        self.chain_info
99            .ancestor_header_consensus_info(ancestor_block_number, descendant_block_root)
100    }
101}
102
103impl<'a, CI> ShardMembershipEntropySourceChainInfo for VerificationChainInfo<'a, CI>
104where
105    CI: ChainInfo<OwnedBeaconChainBlock>,
106{
107    fn ancestor_header_proof_of_time(
108        &self,
109        ancestor_block_number: BlockNumber,
110        descendant_block_root: &BlockRoot,
111    ) -> Option<PotOutput> {
112        if let Some(header) = self
113            .chain_info
114            .ancestor_header(ancestor_block_number, descendant_block_root)
115        {
116            return Some(header.header().consensus_info.proof_of_time);
117        }
118
119        let mut current_block_root = *descendant_block_root;
120        loop {
121            let Some(importing_entry) = self.importing_blocks.get(&current_block_root) else {
122                break;
123            };
124            let header = importing_entry.header();
125
126            if header.header().prefix.number == ancestor_block_number {
127                return Some(header.header().consensus_info.proof_of_time);
128            }
129
130            current_block_root = *header.header().root();
131        }
132
133        // Query again in case of a race condition where previously importing block was imported in
134        // between iterations in the above loop
135        self.chain_info
136            .ancestor_header_proof_of_time(ancestor_block_number, descendant_block_root)
137    }
138}
139
140#[derive(Debug)]
141pub struct BeaconChainBlockImport<PosTable, CI, BV> {
142    chain_info: CI,
143    block_verification: BV,
144    importing_blocks: ImportingBlocks<OwnedBeaconChainHeader>,
145    block_importing_notification_sender: mpsc::Sender<BlockImportingNotification>,
146    super_segments_sender: mpsc::Sender<SuperSegment>,
147    block_import_notification_sender: mpsc::Sender<OwnedBeaconChainBlock>,
148    _pos_table: PhantomData<PosTable>,
149}
150
151impl<PosTable, CI, BV> BlockImport<OwnedBeaconChainBlock>
152    for BeaconChainBlockImport<PosTable, CI, BV>
153where
154    PosTable: Table,
155    CI: BeaconChainInfoWrite,
156    BV: BlockVerification<OwnedBeaconChainBlock, Option<SuperSegment>>,
157{
158    fn import(
159        &self,
160        block: OwnedBeaconChainBlock,
161        origin: BlockOrigin,
162    ) -> Result<impl Future<Output = Result<(), BlockImportError>> + Send, BlockImportError> {
163        let parent_root = &block.header.header().prefix.parent_root;
164
165        let (parent_header, parent_block_mmr, parent_block_import_status) =
166            if let Some((parent_header, parent_block_details)) =
167                self.chain_info.header_with_details(parent_root)
168            {
169                (
170                    parent_header,
171                    parent_block_details.mmr_with_block,
172                    ParentBlockImportStatus::Imported {
173                        system_contract_states: parent_block_details.system_contract_states,
174                    },
175                )
176            } else if let Some(importing_entry) = self.importing_blocks.get(parent_root) {
177                (
178                    importing_entry.header().clone(),
179                    Arc::clone(importing_entry.mmr()),
180                    ParentBlockImportStatus::Importing {
181                        entry: importing_entry,
182                    },
183                )
184            } else {
185                return Err(BlockImportError::UnknownParentBlock {
186                    block_root: *parent_root,
187                });
188            };
189
190        let parent_block_mmr_root = Blake3Hash::from(
191            parent_block_mmr
192                .root()
193                .ok_or(BlockImportError::ParentBlockMmrInvalid)?,
194        );
195        let mut block_mmr = *parent_block_mmr;
196
197        if !block_mmr.add_leaf(&block.header.header().root()) {
198            return Err(BlockImportError::CantExtendMmr);
199        }
200
201        let importing_handle = self
202            .importing_blocks
203            .insert(block.header.clone(), Arc::new(block_mmr))
204            .ok_or(BlockImportError::AlreadyImporting)?;
205
206        if self
207            .chain_info
208            .header(&block.header.header().root())
209            .is_some()
210        {
211            return Err(BlockImportError::AlreadyImported);
212        }
213
214        Ok(self.import(
215            parent_header,
216            parent_block_mmr_root,
217            block,
218            origin,
219            importing_handle,
220            parent_block_import_status,
221        ))
222    }
223}
224
225impl<PosTable, CI, BV> BeaconChainBlockImport<PosTable, CI, BV>
226where
227    PosTable: Table,
228    CI: BeaconChainInfoWrite,
229    BV: BlockVerification<OwnedBeaconChainBlock, Option<SuperSegment>>,
230{
231    /// Create a new instance
232    #[inline(always)]
233    pub fn new(
234        chain_info: CI,
235        block_verification: BV,
236        block_importing_notification_sender: mpsc::Sender<BlockImportingNotification>,
237        super_segments_sender: mpsc::Sender<SuperSegment>,
238        block_import_notification_sender: mpsc::Sender<OwnedBeaconChainBlock>,
239    ) -> Self {
240        Self {
241            chain_info,
242            block_verification,
243            importing_blocks: ImportingBlocks::new(),
244            block_importing_notification_sender,
245            super_segments_sender,
246            block_import_notification_sender,
247            _pos_table: PhantomData,
248        }
249    }
250
251    async fn import(
252        &self,
253        parent_header: OwnedBeaconChainHeader,
254        parent_block_mmr_root: Blake3Hash,
255        block: OwnedBeaconChainBlock,
256        origin: BlockOrigin,
257        importing_handle: ImportingBlockHandle<OwnedBeaconChainHeader>,
258        parent_block_import_status: ParentBlockImportStatus<OwnedBeaconChainHeader>,
259    ) -> Result<(), BlockImportError> {
260        let parent_header = parent_header.header();
261        let header = block.header.header();
262        let body = block.body.body();
263
264        let log_block_import = match origin {
265            BlockOrigin::LocalBlockBuilder { .. } => true,
266            BlockOrigin::Sync => false,
267            BlockOrigin::Broadcast => true,
268        };
269
270        // TODO: `.send()` is a hack for compiler bug, see:
271        //  https://github.com/rust-lang/rust/issues/100013#issuecomment-2210995259
272        self.block_verification
273            .verify_concurrent(
274                parent_header,
275                &parent_block_mmr_root,
276                header,
277                body,
278                &origin,
279                &VerificationChainInfo {
280                    chain_info: &self.chain_info,
281                    importing_blocks: &self.importing_blocks,
282                },
283            )
284            .send()
285            .await
286            .map_err(BeaconChainBlockImportError::from)?;
287
288        let Some(system_contract_states) = parent_block_import_status.wait().await else {
289            return Err(BlockImportError::ParentBlockImportFailed);
290        };
291
292        // TODO: `.send()` is a hack for compiler bug, see:
293        //  https://github.com/rust-lang/rust/issues/100013#issuecomment-2210995259
294        let maybe_super_segment = self
295            .block_verification
296            .verify_sequential(parent_header, &parent_block_mmr_root, header, body, &origin)
297            .send()
298            .await
299            .map_err(BeaconChainBlockImportError::from)?;
300
301        let global_state = GlobalState::new(&system_contract_states);
302
303        // TODO: Execute block
304
305        let state_root = global_state.root();
306
307        if header.result.state_root != state_root {
308            return Err(BlockImportError::InvalidStateRoot {
309                expected: state_root,
310                actual: header.result.state_root,
311            });
312        }
313
314        let system_contract_states = global_state.to_system_contract_states();
315
316        let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
317        if let Err(error) = self
318            .block_importing_notification_sender
319            .clone()
320            .send(BlockImportingNotification {
321                block_number: header.prefix.number,
322                acknowledgement_sender,
323            })
324            .await
325        {
326            warn!(%error, "Failed to send block importing notification");
327        }
328
329        while acknowledgement_receiver.next().await.is_some() {
330            // Wait for all the acknowledgements to arrive
331        }
332
333        let number = header.prefix.number;
334        let root = *header.root();
335
336        if let Some(super_segment) = maybe_super_segment
337            && self
338                .chain_info
339                .persist_super_segment_header(super_segment.header)
340                .await
341                .map_err(
342                    |error| BeaconChainBlockImportError::PersistSuperSegmentHeaders { error },
343                )?
344            && let Err(error) = self.super_segments_sender.clone().send(super_segment).await
345        {
346            warn!(%error, "Failed to send a super segment notification");
347        }
348
349        self.chain_info
350            .persist_block(
351                block.clone(),
352                BlockDetails {
353                    mmr_with_block: Arc::clone(importing_handle.mmr()),
354                    system_contract_states: StdArc::clone(&system_contract_states),
355                },
356            )
357            .await?;
358
359        importing_handle.set_success(system_contract_states);
360
361        if let Err(error) = self
362            .block_import_notification_sender
363            .clone()
364            .send(block)
365            .await
366        {
367            warn!(
368                %error,
369                block_number = %number,
370                block_root = %root,
371                "Failed to send block import notification"
372            );
373        }
374
375        if log_block_import {
376            info!(
377                %number,
378                %root,
379                "🏆 Imported block",
380            );
381        }
382
383        Ok(())
384    }
385}