ab_client_block_authoring/
slot_worker.rs

1//! Slot worker drives block and vote production based on slots produced in
2//! [`ab_client_proof_of_time`].
3
4use ab_client_api::{BlockDetails, BlockOrigin, ChainInfo, ChainSyncStatus};
5use ab_client_archiving::segment_headers_store::SegmentHeadersStore;
6use ab_client_block_builder::{BlockBuilder, BlockBuilderResult};
7use ab_client_block_import::BlockImport;
8use ab_client_consensus_common::ConsensusConstants;
9use ab_client_proof_of_time::PotNextSlotInput;
10use ab_client_proof_of_time::source::{PotSlotInfo, PotSlotInfoStream};
11use ab_client_proof_of_time::verifier::PotVerifier;
12use ab_core_primitives::block::BlockNumber;
13use ab_core_primitives::block::header::owned::GenericOwnedBlockHeader;
14use ab_core_primitives::block::header::{
15    BeaconChainHeader, BlockHeaderConsensusInfo, GenericBlockHeader, OwnedBlockHeaderSeal,
16    SharedBlockHeader,
17};
18use ab_core_primitives::block::owned::{GenericOwnedBlock, OwnedBeaconChainBlock};
19use ab_core_primitives::hashes::Blake3Hash;
20use ab_core_primitives::pot::{PotCheckpoints, PotOutput, PotParametersChange, SlotNumber};
21use ab_core_primitives::sectors::SectorId;
22use ab_core_primitives::segments::HistorySize;
23use ab_core_primitives::solutions::{
24    Solution, SolutionRange, SolutionVerifyError, SolutionVerifyParams,
25    SolutionVerifyPieceCheckParams,
26};
27use ab_proof_of_space::Table;
28use futures::StreamExt;
29use futures::channel::{mpsc, oneshot};
30use send_future::SendFuture;
31use std::collections::BTreeMap;
32use std::marker::PhantomData;
33use tokio::sync::broadcast;
34use tracing::{debug, error, info, trace, warn};
35
36/// Large enough size for any practical purposes, there shouldn't be even this many solutions.
37const PENDING_SOLUTIONS_CHANNEL_CAPACITY: usize = 10;
38
39/// Information about a new slot that just arrived
40#[derive(Debug, Copy, Clone)]
41pub struct NewSlotInfo {
42    /// Slot number
43    pub slot: SlotNumber,
44    /// The PoT output for `slot`
45    pub proof_of_time: PotOutput,
46    /// Acceptable solution range for block authoring
47    pub solution_range: SolutionRange,
48}
49
50/// New slot notification with slot information and sender for a solution for the slot.
51#[derive(Debug, Clone)]
52pub struct NewSlotNotification {
53    /// New slot information.
54    pub new_slot_info: NewSlotInfo,
55    /// Sender that can be used to send solutions for the slot.
56    pub solution_sender: mpsc::Sender<Solution>,
57}
58/// Notification with a pre-seal hash that needs to be sealed (signed) to create a block and receive
59/// a block reward
60#[derive(Debug)]
61pub struct BlockSealNotification {
62    /// Hash to be signed.
63    pub pre_seal_hash: Blake3Hash,
64    /// Public key hash of the plot identity that should create signature
65    pub public_key_hash: Blake3Hash,
66    /// Sender that can be used to send the seal
67    pub seal_sender: oneshot::Sender<OwnedBlockHeaderSeal>,
68}
69
70#[derive(Debug)]
71pub struct ClaimedSlot {
72    /// Consensus info for a block header
73    pub consensus_info: BlockHeaderConsensusInfo,
74    /// Proof of time checkpoints from after future proof of parent block to current block's
75    /// future proof (inclusive)
76    pub checkpoints: Vec<PotCheckpoints>,
77}
78
79/// Parameters for [`SubspaceSlotWorker`]
80#[derive(Debug)]
81pub struct SubspaceSlotWorkerOptions<BB, BI, BCI, CI, CSS> {
82    /// Builder that can create a new block
83    pub block_builder: BB,
84    /// Block import to import the block created by a block builder
85    pub block_import: BI,
86    /// Beacon chain info
87    pub beacon_chain_info: BCI,
88    /// Chain info
89    pub chain_info: CI,
90    /// Chain sync status
91    pub chain_sync_status: CSS,
92    /// Force authoring of blocks even if we are offline
93    pub force_authoring: bool,
94    /// Sender for new slot notifications
95    pub new_slot_notification_sender: mpsc::Sender<NewSlotNotification>,
96    /// Sender for block sealing notifications
97    pub block_sealing_notification_sender: mpsc::Sender<BlockSealNotification>,
98    // TODO: Should be super segments instead for verification purposes
99    /// Persistent storage of segment headers
100    pub segment_headers_store: SegmentHeadersStore,
101    /// Consensus constants
102    pub consensus_constants: ConsensusConstants,
103    /// Proof of time verifier
104    pub pot_verifier: PotVerifier,
105}
106
107/// Subspace slot worker responsible for block and vote production
108#[derive(Debug)]
109pub struct SubspaceSlotWorker<PosTable, Block, BB, BI, BCI, CI, CSS> {
110    block_builder: BB,
111    block_import: BI,
112    beacon_chain_info: BCI,
113    chain_info: CI,
114    chain_sync_status: CSS,
115    force_authoring: bool,
116    new_slot_notification_sender: mpsc::Sender<NewSlotNotification>,
117    block_sealing_notification_sender: mpsc::Sender<BlockSealNotification>,
118    segment_headers_store: SegmentHeadersStore,
119    /// Solution receivers for challenges that were sent to farmers and expected to be received
120    /// eventually
121    pending_solutions: BTreeMap<SlotNumber, mpsc::Receiver<Solution>>,
122    /// Collection of PoT slots that can be retrieved later if needed by block production
123    pot_checkpoints: BTreeMap<SlotNumber, PotCheckpoints>,
124    consensus_constants: ConsensusConstants,
125    pot_verifier: PotVerifier,
126    _pos_table: PhantomData<(PosTable, Block)>,
127}
128
129impl<PosTable, Block, BB, BI, BCI, CI, CSS>
130    SubspaceSlotWorker<PosTable, Block, BB, BI, BCI, CI, CSS>
131where
132    PosTable: Table,
133    Block: GenericOwnedBlock,
134    BB: BlockBuilder<Block>,
135    BI: BlockImport<Block>,
136    BCI: ChainInfo<OwnedBeaconChainBlock>,
137    CI: ChainInfo<Block>,
138    CSS: ChainSyncStatus,
139{
140    async fn claim_slot(
141        &mut self,
142        parent_header: &SharedBlockHeader<'_>,
143        parent_beacon_chain_header: &BeaconChainHeader<'_>,
144        slot: SlotNumber,
145    ) -> Option<ClaimedSlot> {
146        let parent_number = parent_header.prefix.number;
147        let parent_slot = parent_header.consensus_info.slot;
148
149        if slot <= parent_slot {
150            debug!(
151                "Skipping claiming slot {slot}, it must be higher than parent slot {parent_slot}",
152            );
153
154            return None;
155        } else {
156            debug!(%slot, "Attempting to claim a slot");
157        }
158
159        let parent_consensus_parameters = parent_beacon_chain_header.consensus_parameters();
160
161        let solution_range = parent_consensus_parameters
162            .next_solution_range
163            .unwrap_or(parent_consensus_parameters.fixed_parameters.solution_range);
164
165        let parent_pot_parameters_change = parent_consensus_parameters
166            .pot_parameters_change
167            .copied()
168            .map(PotParametersChange::from);
169        let parent_future_slot = if parent_number == BlockNumber::ZERO {
170            parent_slot
171        } else {
172            parent_slot + self.consensus_constants.block_authoring_delay
173        };
174
175        let (proof_of_time, future_proof_of_time, checkpoints) = {
176            // Remove checkpoints from old slots we will not need anymore
177            self.pot_checkpoints
178                .retain(|&stored_slot, _checkpoints| stored_slot > parent_slot);
179
180            let proof_of_time = self.pot_checkpoints.get(&slot)?.output();
181
182            // Future slot for which proof must be available before authoring a block at this slot
183            let future_slot = slot + self.consensus_constants.block_authoring_delay;
184
185            let pot_input = if parent_number == BlockNumber::ZERO {
186                PotNextSlotInput {
187                    slot: parent_slot + SlotNumber::ONE,
188                    slot_iterations: parent_consensus_parameters.fixed_parameters.slot_iterations,
189                    seed: self.pot_verifier.genesis_seed(),
190                }
191            } else {
192                PotNextSlotInput::derive(
193                    parent_consensus_parameters.fixed_parameters.slot_iterations,
194                    parent_slot,
195                    parent_header.consensus_info.proof_of_time,
196                    &parent_pot_parameters_change,
197                )
198            };
199
200            // Ensure proof of time is valid, according to parent block
201            if !self.pot_verifier.is_output_valid(
202                pot_input,
203                slot - parent_slot,
204                proof_of_time,
205                parent_pot_parameters_change,
206            ) {
207                warn!(
208                    %slot,
209                    ?pot_input,
210                    consensus_info = ?parent_header.consensus_info,
211                    "Proof of time is invalid, skipping block authoring at the slot"
212                );
213                return None;
214            }
215
216            let mut checkpoints_pot_input = if parent_number == BlockNumber::ZERO {
217                PotNextSlotInput {
218                    slot: parent_slot + SlotNumber::ONE,
219                    slot_iterations: parent_consensus_parameters.fixed_parameters.slot_iterations,
220                    seed: self.pot_verifier.genesis_seed(),
221                }
222            } else {
223                PotNextSlotInput::derive(
224                    parent_consensus_parameters.fixed_parameters.slot_iterations,
225                    parent_future_slot,
226                    parent_header.consensus_info.future_proof_of_time,
227                    &parent_pot_parameters_change,
228                )
229            };
230
231            let mut checkpoints =
232                Vec::with_capacity((future_slot - parent_future_slot).as_u64() as usize);
233
234            for slot in parent_future_slot + SlotNumber::ONE..=future_slot {
235                let maybe_slot_checkpoints = self.pot_verifier.get_checkpoints(
236                    checkpoints_pot_input.slot_iterations,
237                    checkpoints_pot_input.seed,
238                );
239                let Some(slot_checkpoints) = maybe_slot_checkpoints else {
240                    warn!("Proving failed during block authoring");
241                    return None;
242                };
243
244                checkpoints.push(slot_checkpoints);
245
246                checkpoints_pot_input = PotNextSlotInput::derive(
247                    checkpoints_pot_input.slot_iterations,
248                    slot,
249                    slot_checkpoints.output(),
250                    &parent_pot_parameters_change,
251                );
252            }
253
254            let future_proof_of_time = checkpoints
255                .last()
256                .expect("Never empty, there is at least one slot between blocks; qed")
257                .output();
258
259            (proof_of_time, future_proof_of_time, checkpoints)
260        };
261
262        let mut solution_receiver = {
263            // Remove receivers for old slots we will not need anymore
264            self.pending_solutions
265                .retain(|&stored_slot, _solution_receiver| stored_slot >= slot);
266
267            let mut solution_receiver = self.pending_solutions.remove(&slot)?;
268            // Time is out, we will not accept any more solutions
269            solution_receiver.close();
270            solution_receiver
271        };
272
273        let mut maybe_consensus_info = None;
274
275        // TODO: Consider skipping most/all checks here and do them in block import instead
276        while let Some(solution) = solution_receiver.next().await {
277            let sector_id = SectorId::new(
278                &solution.public_key_hash,
279                solution.sector_index,
280                solution.history_size,
281            );
282
283            // TODO: Query it from an actual chain
284            // let history_size = runtime_api.history_size(parent_block_root).ok()?;
285            // let max_pieces_in_sector = runtime_api.max_pieces_in_sector(parent_block_root).ok()?;
286            let history_size = HistorySize::ONE;
287            let max_pieces_in_sector = 1000;
288
289            let segment_index = sector_id
290                .derive_piece_index(
291                    solution.piece_offset,
292                    solution.history_size,
293                    max_pieces_in_sector,
294                    self.consensus_constants.recent_segments,
295                    self.consensus_constants.recent_history_fraction,
296                )
297                .segment_index();
298            let maybe_segment_root = self
299                .segment_headers_store
300                .get_segment_header(segment_index)
301                .map(|segment_header| segment_header.segment_root);
302
303            let segment_root = match maybe_segment_root {
304                Some(segment_root) => segment_root,
305                None => {
306                    warn!(
307                        %slot,
308                        %segment_index,
309                        "Segment root not found",
310                    );
311                    continue;
312                }
313            };
314            let sector_expiration_check_segment_index = match solution
315                .history_size
316                .sector_expiration_check(self.consensus_constants.min_sector_lifetime)
317            {
318                Some(sector_expiration_check) => sector_expiration_check.segment_index(),
319                None => {
320                    continue;
321                }
322            };
323            let sector_expiration_check_segment_root = self
324                .segment_headers_store
325                .get_segment_header(sector_expiration_check_segment_index)
326                .map(|segment_header| segment_header.segment_root);
327
328            let solution_verification_result = solution.verify::<PosTable>(
329                slot,
330                &SolutionVerifyParams {
331                    proof_of_time,
332                    solution_range,
333                    piece_check_params: Some(SolutionVerifyPieceCheckParams {
334                        max_pieces_in_sector,
335                        segment_root,
336                        recent_segments: self.consensus_constants.recent_segments,
337                        recent_history_fraction: self.consensus_constants.recent_history_fraction,
338                        min_sector_lifetime: self.consensus_constants.min_sector_lifetime,
339                        current_history_size: history_size,
340                        sector_expiration_check_segment_root,
341                    }),
342                },
343            );
344
345            match solution_verification_result {
346                Ok(()) => {
347                    if maybe_consensus_info.is_none() {
348                        info!(%slot, "🚜 Claimed slot");
349                        maybe_consensus_info.replace(BlockHeaderConsensusInfo {
350                            slot,
351                            proof_of_time,
352                            future_proof_of_time,
353                            solution,
354                        });
355                    } else {
356                        info!(
357                            %slot,
358                            "Skipping a solution that has quality sufficient for block because \
359                            slot has already been claimed",
360                        );
361                    }
362                }
363                Err(error @ SolutionVerifyError::OutsideSolutionRange { .. }) => {
364                    // Solution range might have just adjusted, but when a farmer was auditing it
365                    // didn't know about this, so downgrade the warning to a debug message
366                    if parent_consensus_parameters.next_solution_range.is_some() {
367                        debug!(
368                            %slot,
369                            %error,
370                            "Invalid solution received",
371                        );
372                    } else {
373                        warn!(
374                            %slot,
375                            %error,
376                            "Invalid solution received",
377                        );
378                    }
379                }
380                Err(error) => {
381                    warn!(
382                        %slot,
383                        %error,
384                        "Invalid solution received",
385                    );
386                }
387            }
388        }
389
390        maybe_consensus_info.map(|consensus_info| ClaimedSlot {
391            consensus_info,
392            checkpoints,
393        })
394    }
395
396    /// Create a new Subspace slot worker
397    pub fn new(
398        SubspaceSlotWorkerOptions {
399            block_builder,
400            block_import,
401            beacon_chain_info,
402            chain_info,
403            chain_sync_status,
404            force_authoring,
405            new_slot_notification_sender,
406            block_sealing_notification_sender,
407            segment_headers_store,
408            consensus_constants,
409            pot_verifier,
410        }: SubspaceSlotWorkerOptions<BB, BI, BCI, CI, CSS>,
411    ) -> Self {
412        Self {
413            block_builder,
414            block_import,
415            beacon_chain_info,
416            chain_info,
417            chain_sync_status,
418            force_authoring,
419            new_slot_notification_sender,
420            block_sealing_notification_sender,
421            segment_headers_store,
422            pending_solutions: BTreeMap::new(),
423            pot_checkpoints: BTreeMap::new(),
424            consensus_constants,
425            pot_verifier,
426            _pos_table: PhantomData,
427        }
428    }
429
430    /// Run slot worker
431    pub async fn run(mut self, mut slot_info_stream: PotSlotInfoStream) {
432        let mut maybe_last_processed_slot = None;
433
434        loop {
435            let PotSlotInfo { slot, checkpoints } = match slot_info_stream.recv().await {
436                Ok(slot_info) => slot_info,
437                Err(error) => match error {
438                    broadcast::error::RecvError::Closed => {
439                        info!("No Slot info senders available. Exiting slot worker.");
440                        return;
441                    }
442                    broadcast::error::RecvError::Lagged(skipped_notifications) => {
443                        debug!(
444                            "Slot worker is lagging. Skipped {} slot notification(s)",
445                            skipped_notifications
446                        );
447                        continue;
448                    }
449                },
450            };
451            if let Some(last_processed_slot) = maybe_last_processed_slot
452                && last_processed_slot >= slot
453            {
454                // Already processed
455                continue;
456            }
457            maybe_last_processed_slot.replace(slot);
458
459            self.store_checkpoints(slot, checkpoints);
460
461            let best_beacon_chain_header = self.beacon_chain_info.best_header();
462            let best_beacon_chain_header = best_beacon_chain_header.header();
463            let (best_header, best_block_details) = self.chain_info.best_header_with_details();
464            let best_header = best_header.header();
465
466            self.on_new_slot(slot, checkpoints, best_beacon_chain_header);
467
468            if self.chain_sync_status.is_syncing() {
469                debug!(%slot, "Skipping proposal slot due to sync");
470                continue;
471            }
472
473            // Slots that we claim must be `block_authoring_delay` behind the best slot we know of
474            let Some(slot_to_claim) =
475                slot.checked_sub(self.consensus_constants.block_authoring_delay)
476            else {
477                trace!("Skipping a very early slot during chain start");
478                continue;
479            };
480
481            // TODO: `.send()` is a hack for compiler bug, see:
482            //  https://github.com/rust-lang/rust/issues/100013#issuecomment-2210995259
483            let Some(block_builder_result) = self
484                .produce_block(
485                    slot_to_claim,
486                    best_header,
487                    &best_block_details,
488                    best_beacon_chain_header,
489                )
490                .send()
491                .await
492            else {
493                continue;
494            };
495
496            let block_import_fut = match self.block_import.import(
497                block_builder_result.block,
498                BlockOrigin::LocalBlockBuilder {
499                    block_details: block_builder_result.block_details,
500                },
501            ) {
502                Ok(block_import_fut) => block_import_fut,
503                Err(error) => {
504                    error!(
505                        best_root = %*best_header.root(),
506                        %error,
507                        "Failed to queue a newly produced block for import"
508                    );
509                    continue;
510                }
511            };
512
513            match block_import_fut.await {
514                Ok(()) => {
515                    // Nothing else to do
516                }
517                Err(error) => {
518                    warn!(
519                        best_root = %*best_header.root(),
520                        %error,
521                        "Failed to import a newly produced block"
522                    );
523                }
524            }
525        }
526    }
527
528    /// Handle new slot: store checkpoints and generate notification for a farmer
529    fn store_checkpoints(&mut self, slot: SlotNumber, checkpoints: PotCheckpoints) {
530        // Remove checkpoints from future slots, if present they are out of date anyway
531        self.pot_checkpoints
532            .retain(|&stored_slot, _checkpoints| stored_slot < slot);
533
534        self.pot_checkpoints.insert(slot, checkpoints);
535    }
536
537    /// Handle new slot: store checkpoints and generate notification for a farmer
538    fn on_new_slot(
539        &mut self,
540        slot: SlotNumber,
541        checkpoints: PotCheckpoints,
542        best_beacon_chain_header: &BeaconChainHeader<'_>,
543    ) {
544        if self.chain_sync_status.is_syncing() {
545            debug!("Skipping farming slot {slot} due to sync");
546            return;
547        }
548
549        let proof_of_time = checkpoints.output();
550
551        // NOTE: Best hash is not necessarily going to be the parent of the corresponding block, but
552        // solution range shouldn't be too far off
553        let solution_range = best_beacon_chain_header
554            .consensus_parameters()
555            .next_solution_range
556            .unwrap_or(
557                best_beacon_chain_header
558                    .consensus_parameters()
559                    .fixed_parameters
560                    .solution_range,
561            );
562        let new_slot_info = NewSlotInfo {
563            slot,
564            proof_of_time,
565            solution_range,
566        };
567        let (solution_sender, solution_receiver) =
568            mpsc::channel(PENDING_SOLUTIONS_CHANNEL_CAPACITY);
569
570        if let Err(error) = self
571            .new_slot_notification_sender
572            .try_send(NewSlotNotification {
573                new_slot_info,
574                solution_sender,
575            })
576        {
577            warn!(%error, "Failed to send a new slot notification");
578        }
579
580        self.pending_solutions.insert(slot, solution_receiver);
581    }
582
583    /// Called with slot for which block needs to be produced (if a suitable solution was found)
584    async fn produce_block(
585        &mut self,
586        slot: SlotNumber,
587        parent_header: &<Block::Header as GenericOwnedBlockHeader>::Header<'_>,
588        parent_block_details: &BlockDetails,
589        parent_beacon_chain_header: &BeaconChainHeader<'_>,
590    ) -> Option<BlockBuilderResult<Block>> {
591        if !self.force_authoring && self.chain_sync_status.is_offline() {
592            debug!("Skipping slot, waiting for the network");
593
594            return None;
595        }
596
597        let claimed_slot = self
598            .claim_slot(parent_header, parent_beacon_chain_header, slot)
599            .await?;
600
601        debug!(%slot, "Starting block authorship");
602
603        let seal_block = {
604            let block_sealing_notification_sender = &mut self.block_sealing_notification_sender;
605            let public_key_hash = claimed_slot.consensus_info.solution.public_key_hash;
606
607            move |pre_seal_hash| async move {
608                let (seal_sender, seal_receiver) = oneshot::channel::<OwnedBlockHeaderSeal>();
609
610                if let Err(error) =
611                    block_sealing_notification_sender.try_send(BlockSealNotification {
612                        pre_seal_hash,
613                        public_key_hash,
614                        seal_sender,
615                    })
616                {
617                    warn!(%error, "Failed to send block sealing notification");
618                }
619
620                seal_receiver.await.ok()
621            }
622        };
623
624        let parent_block_root = *parent_header.root();
625
626        // TODO: `.send()` is a hack for compiler bug, see:
627        //  https://github.com/rust-lang/rust/issues/100013#issuecomment-2210995259
628        let block_builder_result = match self
629            .block_builder
630            .build(
631                &parent_block_root,
632                parent_header,
633                parent_block_details,
634                &claimed_slot.consensus_info,
635                &claimed_slot.checkpoints,
636                seal_block,
637            )
638            .send()
639            .await
640        {
641            Ok(block_builder_result) => block_builder_result,
642            Err(error) => {
643                error!(%slot, %parent_block_root, %error, "Failed to build a block");
644                return None;
645            }
646        };
647
648        let header = block_builder_result.block.header().header();
649        info!(
650            number = %header.prefix.number,
651            root = %&*header.root(),
652            "🔖 Built new block",
653        );
654
655        Some(block_builder_result)
656    }
657}