ab_client_block_import/
beacon_chain.rs

1use crate::importing_blocks::{ImportingBlockHandle, ImportingBlocks, ParentBlockImportStatus};
2use crate::{BlockImport, BlockImportError};
3use ab_client_api::{BlockDetails, BlockOrigin, ChainInfo, ChainInfoWrite};
4use ab_client_block_verification::{BlockVerification, BlockVerificationError};
5use ab_client_consensus_common::BlockImportingNotification;
6use ab_client_consensus_common::consensus_parameters::{
7    DeriveConsensusParametersChainInfo, DeriveConsensusParametersConsensusInfo,
8    ShardMembershipEntropySourceChainInfo,
9};
10use ab_client_consensus_common::state::GlobalState;
11use ab_core_primitives::block::header::owned::OwnedBeaconChainHeader;
12use ab_core_primitives::block::owned::OwnedBeaconChainBlock;
13use ab_core_primitives::block::{BlockNumber, BlockRoot};
14use ab_core_primitives::hashes::Blake3Hash;
15use ab_core_primitives::pot::PotOutput;
16use ab_proof_of_space::Table;
17use futures::channel::mpsc;
18use futures::prelude::*;
19use rclite::Arc;
20use send_future::SendFuture;
21use std::marker::PhantomData;
22use std::sync::Arc as StdArc;
23use tracing::{info, warn};
24
25/// Errors for [`BeaconChainBlockImport`]
26#[derive(Debug, thiserror::Error)]
27pub enum BeaconChainBlockImportError {
28    /// Block verification error
29    #[error("Block verification error: {error}")]
30    VerificationError {
31        /// Block verification error
32        #[from]
33        error: BlockVerificationError,
34    },
35}
36
37impl From<BeaconChainBlockImportError> for BlockImportError {
38    #[inline(always)]
39    fn from(error: BeaconChainBlockImportError) -> Self {
40        Self::Custom {
41            error: error.into(),
42        }
43    }
44}
45
46/// A custom wrapper that will query chain info from either already persisted blocks or blocks that
47/// are currently queued for import
48#[derive(Debug)]
49struct VerificationChainInfo<'a, CI> {
50    chain_info: &'a CI,
51    importing_blocks: &'a ImportingBlocks<OwnedBeaconChainHeader>,
52}
53
54impl<'a, CI> DeriveConsensusParametersChainInfo for VerificationChainInfo<'a, CI>
55where
56    CI: ChainInfo<OwnedBeaconChainBlock>,
57{
58    fn ancestor_header_consensus_info(
59        &self,
60        ancestor_block_number: BlockNumber,
61        descendant_block_root: &BlockRoot,
62    ) -> Option<DeriveConsensusParametersConsensusInfo> {
63        if let Some(consensus_info) = self
64            .chain_info
65            .ancestor_header_consensus_info(ancestor_block_number, descendant_block_root)
66        {
67            return Some(consensus_info);
68        }
69
70        let mut current_block_root = *descendant_block_root;
71        loop {
72            let Some(importing_entry) = self.importing_blocks.get(&current_block_root) else {
73                break;
74            };
75            let header = importing_entry.header().header();
76
77            if header.prefix.number == ancestor_block_number {
78                return Some(DeriveConsensusParametersConsensusInfo::from_consensus_info(
79                    header.consensus_info,
80                ));
81            }
82
83            current_block_root = *header.root();
84        }
85
86        // Query again in case of a race condition where previously importing block was imported in
87        // between iterations in the above loop
88        self.chain_info
89            .ancestor_header_consensus_info(ancestor_block_number, descendant_block_root)
90    }
91}
92
93impl<'a, CI> ShardMembershipEntropySourceChainInfo for VerificationChainInfo<'a, CI>
94where
95    CI: ChainInfo<OwnedBeaconChainBlock>,
96{
97    fn ancestor_header_proof_of_time(
98        &self,
99        ancestor_block_number: BlockNumber,
100        descendant_block_root: &BlockRoot,
101    ) -> Option<PotOutput> {
102        if let Some(header) = self
103            .chain_info
104            .ancestor_header(ancestor_block_number, descendant_block_root)
105        {
106            return Some(header.header().consensus_info.proof_of_time);
107        }
108
109        let mut current_block_root = *descendant_block_root;
110        loop {
111            let Some(importing_entry) = self.importing_blocks.get(&current_block_root) else {
112                break;
113            };
114            let header = importing_entry.header();
115
116            if header.header().prefix.number == ancestor_block_number {
117                return Some(header.header().consensus_info.proof_of_time);
118            }
119
120            current_block_root = *header.header().root();
121        }
122
123        // Query again in case of a race condition where previously importing block was imported in
124        // between iterations in the above loop
125        self.chain_info
126            .ancestor_header_proof_of_time(ancestor_block_number, descendant_block_root)
127    }
128}
129
130#[derive(Debug)]
131pub struct BeaconChainBlockImport<PosTable, CI, BV> {
132    chain_info: CI,
133    block_verification: BV,
134    importing_blocks: ImportingBlocks<OwnedBeaconChainHeader>,
135    block_importing_notification_sender: mpsc::Sender<BlockImportingNotification>,
136    block_import_notification_sender: mpsc::Sender<OwnedBeaconChainBlock>,
137    _pos_table: PhantomData<PosTable>,
138}
139
140impl<PosTable, CI, BV> BlockImport<OwnedBeaconChainBlock>
141    for BeaconChainBlockImport<PosTable, CI, BV>
142where
143    PosTable: Table,
144    CI: ChainInfoWrite<OwnedBeaconChainBlock>,
145    BV: BlockVerification<OwnedBeaconChainBlock>,
146{
147    fn import(
148        &self,
149        block: OwnedBeaconChainBlock,
150        origin: BlockOrigin,
151    ) -> Result<impl Future<Output = Result<(), BlockImportError>> + Send, BlockImportError> {
152        let parent_root = &block.header.header().prefix.parent_root;
153
154        let (parent_header, parent_block_mmr, parent_block_import_status) =
155            if let Some((parent_header, parent_block_details)) =
156                self.chain_info.header_with_details(parent_root)
157            {
158                (
159                    parent_header,
160                    parent_block_details.mmr_with_block,
161                    ParentBlockImportStatus::Imported {
162                        system_contract_states: parent_block_details.system_contract_states,
163                    },
164                )
165            } else if let Some(importing_entry) = self.importing_blocks.get(parent_root) {
166                (
167                    importing_entry.header().clone(),
168                    Arc::clone(importing_entry.mmr()),
169                    ParentBlockImportStatus::Importing {
170                        entry: importing_entry,
171                    },
172                )
173            } else {
174                return Err(BlockImportError::UnknownParentBlock {
175                    block_root: *parent_root,
176                });
177            };
178
179        let parent_block_mmr_root = Blake3Hash::from(
180            parent_block_mmr
181                .root()
182                .ok_or(BlockImportError::ParentBlockMmrInvalid)?,
183        );
184        let mut block_mmr = *parent_block_mmr;
185
186        if !block_mmr.add_leaf(&block.header.header().root()) {
187            return Err(BlockImportError::CantExtendMmr);
188        }
189
190        let importing_handle = self
191            .importing_blocks
192            .insert(block.header.clone(), Arc::new(block_mmr))
193            .ok_or(BlockImportError::AlreadyImporting)?;
194
195        if self
196            .chain_info
197            .header(&block.header.header().root())
198            .is_some()
199        {
200            return Err(BlockImportError::AlreadyImported);
201        }
202
203        Ok(self.import(
204            parent_header,
205            parent_block_mmr_root,
206            block,
207            origin,
208            importing_handle,
209            parent_block_import_status,
210        ))
211    }
212}
213
214impl<PosTable, CI, BV> BeaconChainBlockImport<PosTable, CI, BV>
215where
216    PosTable: Table,
217    CI: ChainInfoWrite<OwnedBeaconChainBlock>,
218    BV: BlockVerification<OwnedBeaconChainBlock>,
219{
220    /// Create a new instance
221    #[inline(always)]
222    pub fn new(
223        chain_info: CI,
224        block_verification: BV,
225        block_importing_notification_sender: mpsc::Sender<BlockImportingNotification>,
226        block_import_notification_sender: mpsc::Sender<OwnedBeaconChainBlock>,
227    ) -> Self {
228        Self {
229            chain_info,
230            block_verification,
231            importing_blocks: ImportingBlocks::new(),
232            block_importing_notification_sender,
233            block_import_notification_sender,
234            _pos_table: PhantomData,
235        }
236    }
237
238    async fn import(
239        &self,
240        parent_header: OwnedBeaconChainHeader,
241        parent_block_mmr_root: Blake3Hash,
242        block: OwnedBeaconChainBlock,
243        origin: BlockOrigin,
244        importing_handle: ImportingBlockHandle<OwnedBeaconChainHeader>,
245        parent_block_import_status: ParentBlockImportStatus<OwnedBeaconChainHeader>,
246    ) -> Result<(), BlockImportError> {
247        let parent_header = parent_header.header();
248        let header = block.header.header();
249        let body = block.body.body();
250
251        let log_block_import = match origin {
252            BlockOrigin::LocalBlockBuilder { .. } => true,
253            BlockOrigin::Sync => false,
254            BlockOrigin::Broadcast => true,
255        };
256
257        // TODO: `.send()` is a hack for compiler bug, see:
258        //  https://github.com/rust-lang/rust/issues/100013#issuecomment-2210995259
259        self.block_verification
260            .verify_concurrent(
261                parent_header,
262                &parent_block_mmr_root,
263                header,
264                body,
265                &origin,
266                &VerificationChainInfo {
267                    chain_info: &self.chain_info,
268                    importing_blocks: &self.importing_blocks,
269                },
270            )
271            .send()
272            .await
273            .map_err(BeaconChainBlockImportError::from)?;
274
275        let Some(system_contract_states) = parent_block_import_status.wait().await else {
276            return Err(BlockImportError::ParentBlockImportFailed);
277        };
278
279        // TODO: `.send()` is a hack for compiler bug, see:
280        //  https://github.com/rust-lang/rust/issues/100013#issuecomment-2210995259
281        self.block_verification
282            .verify_sequential(parent_header, &parent_block_mmr_root, header, body, &origin)
283            .send()
284            .await
285            .map_err(BeaconChainBlockImportError::from)?;
286
287        let global_state = GlobalState::new(&system_contract_states);
288
289        // TODO: Execute block
290
291        let state_root = global_state.root();
292
293        if header.result.state_root != state_root {
294            return Err(BlockImportError::InvalidStateRoot {
295                expected: state_root,
296                actual: header.result.state_root,
297            });
298        }
299
300        let system_contract_states = global_state.to_system_contract_states();
301
302        let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
303        if let Err(error) = self
304            .block_importing_notification_sender
305            .clone()
306            .send(BlockImportingNotification {
307                block_number: header.prefix.number,
308                acknowledgement_sender,
309            })
310            .await
311        {
312            warn!(%error, "Failed to send block importing notification");
313        }
314
315        while acknowledgement_receiver.next().await.is_some() {
316            // Wait for all the acknowledgements to arrive
317        }
318
319        let number = header.prefix.number;
320        let root = *header.root();
321
322        self.chain_info
323            .persist_block(
324                block.clone(),
325                BlockDetails {
326                    mmr_with_block: Arc::clone(importing_handle.mmr()),
327                    system_contract_states: StdArc::clone(&system_contract_states),
328                },
329            )
330            .await?;
331
332        importing_handle.set_success(system_contract_states);
333
334        if let Err(error) = self
335            .block_import_notification_sender
336            .clone()
337            .send(block)
338            .await
339        {
340            warn!(
341                %error,
342                block_number = %number,
343                block_root = %root,
344                "Failed to send block import notification"
345            );
346        }
347
348        if log_block_import {
349            info!(
350                %number,
351                %root,
352                "🏆 Imported block",
353            );
354        }
355
356        Ok(())
357    }
358}