Skip to main content

ab_client_block_import/
beacon_chain.rs

1use crate::importing_blocks::{ImportingBlockHandle, ImportingBlocks, ParentBlockImportStatus};
2use crate::{BlockImport, BlockImportError};
3use ab_client_api::{
4    BeaconChainInfoWrite, BlockDetails, BlockOrigin, ChainInfo, PersistSuperSegmentHeadersError,
5};
6use ab_client_block_verification::{BlockVerification, BlockVerificationError};
7use ab_client_consensus_common::BlockImportingNotification;
8use ab_client_consensus_common::consensus_parameters::{
9    DeriveConsensusParametersChainInfo, DeriveConsensusParametersConsensusInfo,
10    ShardMembershipEntropySourceChainInfo,
11};
12use ab_client_consensus_common::state::GlobalState;
13use ab_core_primitives::block::header::owned::OwnedBeaconChainHeader;
14use ab_core_primitives::block::owned::OwnedBeaconChainBlock;
15use ab_core_primitives::block::{BlockNumber, BlockRoot};
16use ab_core_primitives::hashes::Blake3Hash;
17use ab_core_primitives::pot::PotOutput;
18use ab_core_primitives::segments::SuperSegment;
19use ab_proof_of_space::Table;
20use async_lock::Mutex as AsyncMutex;
21use futures::channel::mpsc;
22use futures::prelude::*;
23use rclite::Arc;
24use send_future::SendFuture;
25use std::marker::PhantomData;
26use std::sync::Arc as StdArc;
27use tracing::{info, warn};
28
29/// Errors for [`BeaconChainBlockImport`]
30#[derive(Debug, thiserror::Error)]
31pub enum BeaconChainBlockImportError {
32    /// Block verification error
33    #[error("Block verification error: {error}")]
34    VerificationError {
35        /// Block verification error
36        #[from]
37        error: BlockVerificationError,
38    },
39    /// Failed to persist a new super segment header
40    #[error("Failed to persist a new super segment header: {error}")]
41    PersistSuperSegmentHeaders {
42        /// Low-level error
43        #[from]
44        error: PersistSuperSegmentHeadersError,
45    },
46}
47
48impl From<BeaconChainBlockImportError> for BlockImportError {
49    #[inline(always)]
50    fn from(error: BeaconChainBlockImportError) -> Self {
51        Self::Custom {
52            error: error.into(),
53        }
54    }
55}
56
57/// A custom wrapper that will query chain info from either already persisted blocks or blocks that
58/// are currently queued for import
59#[derive(Debug)]
60struct VerificationChainInfo<'a, CI> {
61    chain_info: &'a CI,
62    importing_blocks: &'a ImportingBlocks<OwnedBeaconChainHeader>,
63}
64
65impl<'a, CI> DeriveConsensusParametersChainInfo for VerificationChainInfo<'a, CI>
66where
67    CI: ChainInfo<OwnedBeaconChainBlock>,
68{
69    fn ancestor_header_consensus_info(
70        &self,
71        ancestor_block_number: BlockNumber,
72        descendant_block_root: &BlockRoot,
73    ) -> Option<DeriveConsensusParametersConsensusInfo> {
74        if let Some(consensus_info) = self
75            .chain_info
76            .ancestor_header_consensus_info(ancestor_block_number, descendant_block_root)
77        {
78            return Some(consensus_info);
79        }
80
81        let mut current_block_root = *descendant_block_root;
82        loop {
83            let Some(importing_entry) = self.importing_blocks.get(&current_block_root) else {
84                break;
85            };
86            let header = importing_entry.header().header();
87
88            if header.prefix.number == ancestor_block_number {
89                return Some(DeriveConsensusParametersConsensusInfo::from_consensus_info(
90                    header.consensus_info,
91                ));
92            }
93
94            current_block_root = *header.root();
95        }
96
97        // Query again in case of a race condition where previously importing block was imported in
98        // between iterations in the above loop
99        self.chain_info
100            .ancestor_header_consensus_info(ancestor_block_number, descendant_block_root)
101    }
102}
103
104impl<'a, CI> ShardMembershipEntropySourceChainInfo for VerificationChainInfo<'a, CI>
105where
106    CI: ChainInfo<OwnedBeaconChainBlock>,
107{
108    fn ancestor_header_proof_of_time(
109        &self,
110        ancestor_block_number: BlockNumber,
111        descendant_block_root: &BlockRoot,
112    ) -> Option<PotOutput> {
113        if let Some(header) = self
114            .chain_info
115            .ancestor_header(ancestor_block_number, descendant_block_root)
116        {
117            return Some(header.header().consensus_info.proof_of_time);
118        }
119
120        let mut current_block_root = *descendant_block_root;
121        loop {
122            let Some(importing_entry) = self.importing_blocks.get(&current_block_root) else {
123                break;
124            };
125            let header = importing_entry.header();
126
127            if header.header().prefix.number == ancestor_block_number {
128                return Some(header.header().consensus_info.proof_of_time);
129            }
130
131            current_block_root = *header.header().root();
132        }
133
134        // Query again in case of a race condition where previously importing block was imported in
135        // between iterations in the above loop
136        self.chain_info
137            .ancestor_header_proof_of_time(ancestor_block_number, descendant_block_root)
138    }
139}
140
141#[derive(Debug)]
142pub struct BeaconChainBlockImport<PosTable, CI, BV> {
143    chain_info: CI,
144    block_verification: BV,
145    importing_blocks: ImportingBlocks<OwnedBeaconChainHeader>,
146    block_importing_notification_sender: mpsc::Sender<BlockImportingNotification>,
147    super_segments_sender: AsyncMutex<mpsc::Sender<SuperSegment>>,
148    block_import_notification_sender: mpsc::Sender<OwnedBeaconChainBlock>,
149    _pos_table: PhantomData<PosTable>,
150}
151
152impl<PosTable, CI, BV> BlockImport<OwnedBeaconChainBlock>
153    for BeaconChainBlockImport<PosTable, CI, BV>
154where
155    PosTable: Table,
156    CI: BeaconChainInfoWrite,
157    BV: BlockVerification<OwnedBeaconChainBlock, Option<SuperSegment>>,
158{
159    fn import(
160        &self,
161        block: OwnedBeaconChainBlock,
162        origin: BlockOrigin,
163    ) -> Result<impl Future<Output = Result<(), BlockImportError>> + Send, BlockImportError> {
164        let parent_root = &block.header.header().prefix.parent_root;
165
166        let (parent_header, parent_block_mmr, parent_block_import_status) =
167            if let Some((parent_header, parent_block_details)) =
168                self.chain_info.header_with_details(parent_root)
169            {
170                (
171                    parent_header,
172                    parent_block_details.mmr_with_block,
173                    ParentBlockImportStatus::Imported {
174                        system_contract_states: parent_block_details.system_contract_states,
175                    },
176                )
177            } else if let Some(importing_entry) = self.importing_blocks.get(parent_root) {
178                (
179                    importing_entry.header().clone(),
180                    Arc::clone(importing_entry.mmr()),
181                    ParentBlockImportStatus::Importing {
182                        entry: importing_entry,
183                    },
184                )
185            } else {
186                return Err(BlockImportError::UnknownParentBlock {
187                    block_root: *parent_root,
188                });
189            };
190
191        let parent_block_mmr_root = Blake3Hash::from(
192            parent_block_mmr
193                .root()
194                .ok_or(BlockImportError::ParentBlockMmrInvalid)?,
195        );
196        let mut block_mmr = *parent_block_mmr;
197
198        if !block_mmr.add_leaf(&block.header.header().root()) {
199            return Err(BlockImportError::CantExtendMmr);
200        }
201
202        let importing_handle = self
203            .importing_blocks
204            .insert(block.header.clone(), Arc::new(block_mmr))
205            .ok_or(BlockImportError::AlreadyImporting)?;
206
207        if self
208            .chain_info
209            .header(&block.header.header().root())
210            .is_some()
211        {
212            return Err(BlockImportError::AlreadyImported);
213        }
214
215        Ok(self.import(
216            parent_header,
217            parent_block_mmr_root,
218            block,
219            origin,
220            importing_handle,
221            parent_block_import_status,
222        ))
223    }
224}
225
226impl<PosTable, CI, BV> BeaconChainBlockImport<PosTable, CI, BV>
227where
228    PosTable: Table,
229    CI: BeaconChainInfoWrite,
230    BV: BlockVerification<OwnedBeaconChainBlock, Option<SuperSegment>>,
231{
232    /// Create a new instance
233    #[inline(always)]
234    pub fn new(
235        chain_info: CI,
236        block_verification: BV,
237        block_importing_notification_sender: mpsc::Sender<BlockImportingNotification>,
238        super_segments_sender: mpsc::Sender<SuperSegment>,
239        block_import_notification_sender: mpsc::Sender<OwnedBeaconChainBlock>,
240    ) -> Self {
241        Self {
242            chain_info,
243            block_verification,
244            importing_blocks: ImportingBlocks::new(),
245            block_importing_notification_sender,
246            super_segments_sender: AsyncMutex::new(super_segments_sender),
247            block_import_notification_sender,
248            _pos_table: PhantomData,
249        }
250    }
251
252    async fn import(
253        &self,
254        parent_header: OwnedBeaconChainHeader,
255        parent_block_mmr_root: Blake3Hash,
256        block: OwnedBeaconChainBlock,
257        origin: BlockOrigin,
258        importing_handle: ImportingBlockHandle<OwnedBeaconChainHeader>,
259        parent_block_import_status: ParentBlockImportStatus<OwnedBeaconChainHeader>,
260    ) -> Result<(), BlockImportError> {
261        let parent_header = parent_header.header();
262        let header = block.header.header();
263        let body = block.body.body();
264
265        let log_block_import = match origin {
266            BlockOrigin::LocalBlockBuilder { .. } => true,
267            BlockOrigin::Sync => false,
268            BlockOrigin::Broadcast => true,
269        };
270
271        // TODO: `.send()` is a hack for compiler bug, see:
272        //  https://github.com/rust-lang/rust/issues/100013#issuecomment-2210995259
273        self.block_verification
274            .verify_concurrent(
275                parent_header,
276                &parent_block_mmr_root,
277                header,
278                body,
279                &origin,
280                &VerificationChainInfo {
281                    chain_info: &self.chain_info,
282                    importing_blocks: &self.importing_blocks,
283                },
284            )
285            .send()
286            .await
287            .map_err(BeaconChainBlockImportError::from)?;
288
289        let Some(system_contract_states) = parent_block_import_status.wait().await else {
290            return Err(BlockImportError::ParentBlockImportFailed);
291        };
292
293        // TODO: `.send()` is a hack for compiler bug, see:
294        //  https://github.com/rust-lang/rust/issues/100013#issuecomment-2210995259
295        let maybe_super_segment = self
296            .block_verification
297            .verify_sequential(parent_header, &parent_block_mmr_root, header, body, &origin)
298            .send()
299            .await
300            .map_err(BeaconChainBlockImportError::from)?;
301
302        let global_state = GlobalState::new(&system_contract_states);
303
304        // TODO: Execute block
305
306        let state_root = global_state.root();
307
308        if header.result.state_root != state_root {
309            return Err(BlockImportError::InvalidStateRoot {
310                expected: state_root,
311                actual: header.result.state_root,
312            });
313        }
314
315        let system_contract_states = global_state.to_system_contract_states();
316
317        let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
318        if let Err(error) = self
319            .block_importing_notification_sender
320            .clone()
321            .send(BlockImportingNotification {
322                block_number: header.prefix.number,
323                acknowledgement_sender,
324            })
325            .await
326        {
327            warn!(%error, "Failed to send block importing notification");
328        }
329
330        while acknowledgement_receiver.next().await.is_some() {
331            // Wait for all the acknowledgements to arrive
332        }
333
334        let number = header.prefix.number;
335        let root = *header.root();
336
337        if let Some(super_segment) = maybe_super_segment {
338            let mut super_segment_sender = self.super_segments_sender.lock().await;
339            if self
340                .chain_info
341                .persist_super_segment_header(super_segment.header)
342                .await
343                .map_err(
344                    |error| BeaconChainBlockImportError::PersistSuperSegmentHeaders { error },
345                )?
346                && let Err(error) = super_segment_sender.send(super_segment).await
347            {
348                warn!(%error, "Failed to send a super segment notification");
349            }
350        }
351
352        self.chain_info
353            .persist_block(
354                block.clone(),
355                BlockDetails {
356                    mmr_with_block: Arc::clone(importing_handle.mmr()),
357                    system_contract_states: StdArc::clone(&system_contract_states),
358                },
359            )
360            .await?;
361
362        importing_handle.set_success(system_contract_states);
363
364        if let Err(error) = self
365            .block_import_notification_sender
366            .clone()
367            .send(block)
368            .await
369        {
370            warn!(
371                %error,
372                block_number = %number,
373                block_root = %root,
374                "Failed to send block import notification"
375            );
376        }
377
378        if log_block_import {
379            info!(
380                %number,
381                %root,
382                "🏆 Imported block",
383            );
384        }
385
386        Ok(())
387    }
388}