ab_client_block_import/
beacon_chain.rs

1use crate::importing_blocks::{ImportingBlockHandle, ImportingBlocks, ParentBlockImportStatus};
2use crate::{BlockImport, BlockImportError};
3use ab_client_api::{BlockDetails, BlockOrigin, ChainInfoWrite};
4use ab_client_block_verification::{BlockVerification, BlockVerificationError};
5use ab_client_consensus_common::BlockImportingNotification;
6use ab_client_consensus_common::state::GlobalState;
7use ab_core_primitives::block::header::owned::OwnedBeaconChainHeader;
8use ab_core_primitives::block::owned::OwnedBeaconChainBlock;
9use ab_core_primitives::hashes::Blake3Hash;
10use ab_proof_of_space::Table;
11use futures::channel::mpsc;
12use futures::prelude::*;
13use rclite::Arc;
14use send_future::SendFuture;
15use std::marker::PhantomData;
16use std::sync::Arc as StdArc;
17use tracing::{info, warn};
18
19/// Errors for [`BeaconChainBlockImport`]
20#[derive(Debug, thiserror::Error)]
21pub enum BeaconChainBlockImportError {
22    /// Block verification error
23    #[error("Block verification error: {error}")]
24    VerificationError {
25        /// Block verification error
26        #[from]
27        error: BlockVerificationError,
28    },
29}
30
31impl From<BeaconChainBlockImportError> for BlockImportError {
32    #[inline(always)]
33    fn from(error: BeaconChainBlockImportError) -> Self {
34        Self::Custom {
35            error: error.into(),
36        }
37    }
38}
39
40#[derive(Debug)]
41pub struct BeaconChainBlockImport<PosTable, CI, BV> {
42    chain_info: CI,
43    block_verification: BV,
44    importing_blocks: ImportingBlocks<OwnedBeaconChainHeader>,
45    block_importing_notification_sender: mpsc::Sender<BlockImportingNotification>,
46    block_import_notification_sender: mpsc::Sender<OwnedBeaconChainBlock>,
47    _pos_table: PhantomData<PosTable>,
48}
49
50impl<PosTable, CI, BV> BlockImport<OwnedBeaconChainBlock>
51    for BeaconChainBlockImport<PosTable, CI, BV>
52where
53    PosTable: Table,
54    CI: ChainInfoWrite<OwnedBeaconChainBlock>,
55    BV: BlockVerification<OwnedBeaconChainBlock>,
56{
57    fn import(
58        &self,
59        block: OwnedBeaconChainBlock,
60        origin: BlockOrigin,
61    ) -> Result<impl Future<Output = Result<(), BlockImportError>> + Send, BlockImportError> {
62        let parent_root = &block.header.header().prefix.parent_root;
63
64        let (parent_header, parent_block_mmr, parent_block_import_status) =
65            if let Some((parent_header, parent_block_details)) =
66                self.chain_info.header_with_details(parent_root)
67            {
68                (
69                    parent_header,
70                    parent_block_details.mmr_with_block,
71                    ParentBlockImportStatus::Imported {
72                        system_contract_states: parent_block_details.system_contract_states,
73                    },
74                )
75            } else if let Some(importing_entry) = self.importing_blocks.get(parent_root) {
76                (
77                    importing_entry.header().clone(),
78                    Arc::clone(importing_entry.mmr()),
79                    ParentBlockImportStatus::Importing {
80                        entry: importing_entry,
81                    },
82                )
83            } else {
84                return Err(BlockImportError::UnknownParentBlock {
85                    block_root: *parent_root,
86                });
87            };
88
89        let parent_block_mmr_root = Blake3Hash::from(
90            parent_block_mmr
91                .root()
92                .ok_or(BlockImportError::ParentBlockMmrInvalid)?,
93        );
94        let mut block_mmr = *parent_block_mmr;
95
96        if !block_mmr.add_leaf(&block.header.header().root()) {
97            return Err(BlockImportError::CantExtendMmr);
98        }
99
100        let importing_handle = self
101            .importing_blocks
102            .insert(block.header.clone(), Arc::new(block_mmr))
103            .ok_or(BlockImportError::AlreadyImporting)?;
104
105        if self
106            .chain_info
107            .header(&block.header.header().root())
108            .is_some()
109        {
110            return Err(BlockImportError::AlreadyImported);
111        }
112
113        Ok(self.import(
114            parent_header,
115            parent_block_mmr_root,
116            block,
117            origin,
118            importing_handle,
119            parent_block_import_status,
120        ))
121    }
122}
123
124impl<PosTable, CI, BV> BeaconChainBlockImport<PosTable, CI, BV>
125where
126    PosTable: Table,
127    CI: ChainInfoWrite<OwnedBeaconChainBlock>,
128    BV: BlockVerification<OwnedBeaconChainBlock>,
129{
130    /// Create a new instance
131    #[inline(always)]
132    pub fn new(
133        chain_info: CI,
134        block_verification: BV,
135        block_importing_notification_sender: mpsc::Sender<BlockImportingNotification>,
136        block_import_notification_sender: mpsc::Sender<OwnedBeaconChainBlock>,
137    ) -> Self {
138        Self {
139            chain_info,
140            block_verification,
141            importing_blocks: ImportingBlocks::new(),
142            block_importing_notification_sender,
143            block_import_notification_sender,
144            _pos_table: PhantomData,
145        }
146    }
147
148    async fn import(
149        &self,
150        parent_header: OwnedBeaconChainHeader,
151        parent_block_mmr_root: Blake3Hash,
152        block: OwnedBeaconChainBlock,
153        origin: BlockOrigin,
154        importing_handle: ImportingBlockHandle<OwnedBeaconChainHeader>,
155        parent_block_import_status: ParentBlockImportStatus<OwnedBeaconChainHeader>,
156    ) -> Result<(), BlockImportError> {
157        let parent_header = parent_header.header();
158        let header = block.header.header();
159        let body = block.body.body();
160
161        let log_block_import = match origin {
162            BlockOrigin::LocalBlockBuilder { .. } => true,
163            BlockOrigin::Sync => false,
164            BlockOrigin::Broadcast => true,
165        };
166
167        // TODO: `.send()` is a hack for compiler bug, see:
168        //  https://github.com/rust-lang/rust/issues/100013#issuecomment-2210995259
169        self.block_verification
170            .verify(parent_header, &parent_block_mmr_root, header, body, origin)
171            .send()
172            .await
173            .map_err(BeaconChainBlockImportError::from)?;
174
175        if parent_block_import_status.has_failed() {
176            // Early exit to avoid extra work
177            return Err(BlockImportError::ParentBlockImportFailed);
178        }
179
180        let Some(system_contract_states) = parent_block_import_status.wait().await else {
181            return Err(BlockImportError::ParentBlockImportFailed);
182        };
183
184        let global_state = GlobalState::new(&system_contract_states);
185
186        // TODO: Execute block
187
188        let state_root = global_state.root();
189
190        if header.result.state_root != state_root {
191            return Err(BlockImportError::InvalidStateRoot {
192                expected: state_root,
193                actual: header.result.state_root,
194            });
195        }
196
197        let system_contract_states = global_state.to_system_contract_states();
198
199        let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
200        if let Err(error) = self
201            .block_importing_notification_sender
202            .clone()
203            .send(BlockImportingNotification {
204                block_number: header.prefix.number,
205                acknowledgement_sender,
206            })
207            .await
208        {
209            warn!(%error, "Failed to send block importing notification");
210        }
211
212        while acknowledgement_receiver.next().await.is_some() {
213            // Wait for all the acknowledgements to arrive
214        }
215
216        let number = header.prefix.number;
217        let root = *header.root();
218
219        self.chain_info
220            .persist_block(
221                block.clone(),
222                BlockDetails {
223                    mmr_with_block: Arc::clone(importing_handle.mmr()),
224                    system_contract_states: StdArc::clone(&system_contract_states),
225                },
226            )
227            .await?;
228
229        importing_handle.set_success(system_contract_states);
230
231        if let Err(error) = self
232            .block_import_notification_sender
233            .clone()
234            .send(block)
235            .await
236        {
237            warn!(
238                %error,
239                block_number = %number,
240                block_root = %root,
241                "Failed to send block import notification"
242            );
243        }
244
245        if log_block_import {
246            info!(
247                %number,
248                %root,
249                "🏆 Imported block",
250            );
251        }
252
253        Ok(())
254    }
255}