ab_client_block_authoring/
slot_worker.rs

1//! Slot worker drives block and vote production based on slots produced in
2//! [`ab_client_proof_of_time`].
3
4use ab_client_api::{BlockOrigin, ChainInfo, ChainSyncStatus};
5use ab_client_archiving::segment_headers_store::SegmentHeadersStore;
6use ab_client_block_builder::BlockBuilder;
7use ab_client_block_import::BlockImport;
8use ab_client_consensus_common::ConsensusConstants;
9use ab_client_proof_of_time::PotNextSlotInput;
10use ab_client_proof_of_time::source::{PotSlotInfo, PotSlotInfoStream};
11use ab_client_proof_of_time::verifier::PotVerifier;
12use ab_core_primitives::block::header::owned::GenericOwnedBlockHeader;
13use ab_core_primitives::block::header::{
14    BeaconChainHeader, BlockHeaderConsensusInfo, GenericBlockHeader, OwnedBlockHeaderSeal,
15    SharedBlockHeader,
16};
17use ab_core_primitives::block::owned::{GenericOwnedBlock, OwnedBeaconChainBlock};
18use ab_core_primitives::block::{BlockNumber, BlockRoot};
19use ab_core_primitives::hashes::Blake3Hash;
20use ab_core_primitives::pot::{PotCheckpoints, PotOutput, PotParametersChange, SlotNumber};
21use ab_core_primitives::sectors::SectorId;
22use ab_core_primitives::segments::HistorySize;
23use ab_core_primitives::solutions::{
24    Solution, SolutionRange, SolutionVerifyError, SolutionVerifyParams,
25    SolutionVerifyPieceCheckParams,
26};
27use ab_proof_of_space::Table;
28use futures::StreamExt;
29use futures::channel::{mpsc, oneshot};
30use std::collections::BTreeMap;
31use std::marker::PhantomData;
32use tokio::sync::broadcast;
33use tracing::{debug, error, info, trace, warn};
34
35/// Large enough size for any practical purposes, there shouldn't be even this many solutions.
36const PENDING_SOLUTIONS_CHANNEL_CAPACITY: usize = 10;
37
38/// Information about new slot that just arrived
39#[derive(Debug, Copy, Clone)]
40pub struct NewSlotInfo {
41    /// Slot number
42    pub slot: SlotNumber,
43    /// The PoT output for `slot`
44    pub proof_of_time: PotOutput,
45    /// Acceptable solution range for block authoring
46    pub solution_range: SolutionRange,
47}
48
49/// New slot notification with slot information and sender for solution for the slot.
50#[derive(Debug, Clone)]
51pub struct NewSlotNotification {
52    /// New slot information.
53    pub new_slot_info: NewSlotInfo,
54    /// Sender that can be used to send solutions for the slot.
55    pub solution_sender: mpsc::Sender<Solution>,
56}
57/// Notification with a pre-seal hash that needs to be sealed (signed) to create a block and receive
58/// a block reward
59#[derive(Debug)]
60pub struct BlockSealNotification {
61    /// Hash to be signed.
62    pub pre_seal_hash: Blake3Hash,
63    /// Public key hash of the plot identity that should create signature
64    pub public_key_hash: Blake3Hash,
65    /// Sender that can be used to send the seal
66    pub seal_sender: oneshot::Sender<OwnedBlockHeaderSeal>,
67}
68
69#[derive(Debug)]
70pub struct ClaimedSlot {
71    /// Consensus info for block header
72    pub consensus_info: BlockHeaderConsensusInfo,
73    /// Proof of time checkpoints from after future proof of parent block to current block's
74    /// future proof (inclusive)
75    pub checkpoints: Vec<PotCheckpoints>,
76}
77
78/// Parameters for [`SubspaceSlotWorker`]
79#[derive(Debug)]
80pub struct SubspaceSlotWorkerOptions<BB, BI, BCI, CI, CSS> {
81    /// Builder that can create a new block
82    pub block_builder: BB,
83    /// Block import to import the block created by block builder
84    pub block_import: BI,
85    /// Beacon chain info
86    pub beacon_chain_info: BCI,
87    /// Chain info
88    pub chain_info: CI,
89    /// Chain sync status
90    pub chain_sync_status: CSS,
91    /// Force authoring of blocks even if we are offline
92    pub force_authoring: bool,
93    /// Sender for new slot notifications
94    pub new_slot_notification_sender: mpsc::Sender<NewSlotNotification>,
95    /// Sender for block sealing notifications
96    pub block_sealing_notification_sender: mpsc::Sender<BlockSealNotification>,
97    // TODO: Should be super segments instead for verification purposes
98    /// Persistent storage of segment headers
99    pub segment_headers_store: SegmentHeadersStore,
100    /// Consensus constants
101    pub consensus_constants: ConsensusConstants,
102    /// Proof of time verifier
103    pub pot_verifier: PotVerifier,
104}
105
106/// Subspace slot worker responsible for block and vote production
107#[derive(Debug)]
108pub struct SubspaceSlotWorker<PosTable, Block, BB, BI, BCI, CI, CSS> {
109    block_builder: BB,
110    block_import: BI,
111    beacon_chain_info: BCI,
112    chain_info: CI,
113    chain_sync_status: CSS,
114    force_authoring: bool,
115    new_slot_notification_sender: mpsc::Sender<NewSlotNotification>,
116    block_sealing_notification_sender: mpsc::Sender<BlockSealNotification>,
117    segment_headers_store: SegmentHeadersStore,
118    /// Solution receivers for challenges that were sent to farmers and expected to be received
119    /// eventually
120    pending_solutions: BTreeMap<SlotNumber, mpsc::Receiver<Solution>>,
121    /// Collection of PoT slots that can be retrieved later if needed by block production
122    pot_checkpoints: BTreeMap<SlotNumber, PotCheckpoints>,
123    consensus_constants: ConsensusConstants,
124    pot_verifier: PotVerifier,
125    _pos_table: PhantomData<(PosTable, Block)>,
126}
127
128impl<PosTable, Block, BB, BI, BCI, CI, CSS>
129    SubspaceSlotWorker<PosTable, Block, BB, BI, BCI, CI, CSS>
130where
131    PosTable: Table,
132    Block: GenericOwnedBlock,
133    BB: BlockBuilder<Block>,
134    BI: BlockImport<Block>,
135    BCI: ChainInfo<OwnedBeaconChainBlock>,
136    CI: ChainInfo<Block>,
137    CSS: ChainSyncStatus,
138{
139    async fn claim_slot(
140        &mut self,
141        _parent_block_root: &BlockRoot,
142        parent_header: &SharedBlockHeader<'_>,
143        parent_beacon_chain_header: &BeaconChainHeader<'_>,
144        slot: SlotNumber,
145    ) -> Option<ClaimedSlot> {
146        let parent_number = parent_header.prefix.number;
147        let parent_slot = parent_header.consensus_info.slot;
148
149        if slot <= parent_slot {
150            debug!(
151                "Skipping claiming slot {slot} it must be higher than parent slot {parent_slot}",
152            );
153
154            return None;
155        } else {
156            debug!(%slot, "Attempting to claim slot");
157        }
158
159        let parent_consensus_parameters = parent_beacon_chain_header.consensus_parameters();
160
161        let solution_range = parent_consensus_parameters
162            .next_solution_range
163            .unwrap_or(parent_consensus_parameters.fixed_parameters.solution_range);
164
165        let parent_pot_parameters_change = parent_consensus_parameters
166            .pot_parameters_change
167            .copied()
168            .map(PotParametersChange::from);
169        let parent_future_slot = if parent_number == BlockNumber::ZERO {
170            parent_slot
171        } else {
172            parent_slot + self.consensus_constants.block_authoring_delay
173        };
174
175        let (proof_of_time, future_proof_of_time, checkpoints) = {
176            // Remove checkpoints from old slots we will not need anymore
177            self.pot_checkpoints
178                .retain(|&stored_slot, _checkpoints| stored_slot > parent_slot);
179
180            let proof_of_time = self.pot_checkpoints.get(&slot)?.output();
181
182            // Future slot for which proof must be available before authoring block at this slot
183            let future_slot = slot + self.consensus_constants.block_authoring_delay;
184
185            let pot_input = if parent_number == BlockNumber::ZERO {
186                PotNextSlotInput {
187                    slot: parent_slot + SlotNumber::ONE,
188                    slot_iterations: parent_consensus_parameters.fixed_parameters.slot_iterations,
189                    seed: self.pot_verifier.genesis_seed(),
190                }
191            } else {
192                PotNextSlotInput::derive(
193                    parent_consensus_parameters.fixed_parameters.slot_iterations,
194                    parent_slot,
195                    parent_header.consensus_info.proof_of_time,
196                    &parent_pot_parameters_change,
197                )
198            };
199
200            // Ensure proof of time is valid according to parent block
201            if !self.pot_verifier.is_output_valid(
202                pot_input,
203                slot - parent_slot,
204                proof_of_time,
205                parent_pot_parameters_change,
206            ) {
207                warn!(
208                    %slot,
209                    ?pot_input,
210                    consensus_info = ?parent_header.consensus_info,
211                    "Proof of time is invalid, skipping block authoring at slot"
212                );
213                return None;
214            }
215
216            let mut checkpoints_pot_input = if parent_number == BlockNumber::ZERO {
217                PotNextSlotInput {
218                    slot: parent_slot + SlotNumber::ONE,
219                    slot_iterations: parent_consensus_parameters.fixed_parameters.slot_iterations,
220                    seed: self.pot_verifier.genesis_seed(),
221                }
222            } else {
223                PotNextSlotInput::derive(
224                    parent_consensus_parameters.fixed_parameters.slot_iterations,
225                    parent_future_slot,
226                    parent_header.consensus_info.future_proof_of_time,
227                    &parent_pot_parameters_change,
228                )
229            };
230
231            let mut checkpoints =
232                Vec::with_capacity((future_slot - parent_future_slot).as_u64() as usize);
233
234            for slot in parent_future_slot + SlotNumber::ONE..=future_slot {
235                let maybe_slot_checkpoints = self.pot_verifier.get_checkpoints(
236                    checkpoints_pot_input.slot_iterations,
237                    checkpoints_pot_input.seed,
238                );
239                let Some(slot_checkpoints) = maybe_slot_checkpoints else {
240                    warn!("Proving failed during block authoring");
241                    return None;
242                };
243
244                checkpoints.push(slot_checkpoints);
245
246                checkpoints_pot_input = PotNextSlotInput::derive(
247                    checkpoints_pot_input.slot_iterations,
248                    slot,
249                    slot_checkpoints.output(),
250                    &parent_pot_parameters_change,
251                );
252            }
253
254            let future_proof_of_time = checkpoints
255                .last()
256                .expect("Never empty, there is at least one slot between blocks; qed")
257                .output();
258
259            (proof_of_time, future_proof_of_time, checkpoints)
260        };
261
262        let mut solution_receiver = {
263            // Remove receivers for old slots we will not need anymore
264            self.pending_solutions
265                .retain(|&stored_slot, _solution_receiver| stored_slot >= slot);
266
267            let mut solution_receiver = self.pending_solutions.remove(&slot)?;
268            // Time is out, we will not accept any more solutions
269            solution_receiver.close();
270            solution_receiver
271        };
272
273        let mut maybe_consensus_info = None;
274
275        // TODO: Consider skipping most/all checks here and do them in block import instead
276        while let Some(solution) = solution_receiver.next().await {
277            let sector_id = SectorId::new(
278                &solution.public_key_hash,
279                solution.sector_index,
280                solution.history_size,
281            );
282
283            // TODO: Query it from an actual chain
284            // let history_size = runtime_api.history_size(parent_block_root).ok()?;
285            // let max_pieces_in_sector = runtime_api.max_pieces_in_sector(parent_block_root).ok()?;
286            let history_size = HistorySize::ONE;
287            let max_pieces_in_sector = 1000;
288
289            let segment_index = sector_id
290                .derive_piece_index(
291                    solution.piece_offset,
292                    solution.history_size,
293                    max_pieces_in_sector,
294                    self.consensus_constants.recent_segments,
295                    self.consensus_constants.recent_history_fraction,
296                )
297                .segment_index();
298            let maybe_segment_root = self
299                .segment_headers_store
300                .get_segment_header(segment_index)
301                .map(|segment_header| segment_header.segment_root);
302
303            let segment_root = match maybe_segment_root {
304                Some(segment_root) => segment_root,
305                None => {
306                    warn!(
307                        %slot,
308                        %segment_index,
309                        "Segment root not found",
310                    );
311                    continue;
312                }
313            };
314            let sector_expiration_check_segment_index = match solution
315                .history_size
316                .sector_expiration_check(self.consensus_constants.min_sector_lifetime)
317            {
318                Some(sector_expiration_check) => sector_expiration_check.segment_index(),
319                None => {
320                    continue;
321                }
322            };
323            let sector_expiration_check_segment_root = self
324                .segment_headers_store
325                .get_segment_header(sector_expiration_check_segment_index)
326                .map(|segment_header| segment_header.segment_root);
327
328            let solution_verification_result = solution.verify::<PosTable>(
329                slot,
330                &SolutionVerifyParams {
331                    proof_of_time,
332                    solution_range,
333                    piece_check_params: Some(SolutionVerifyPieceCheckParams {
334                        max_pieces_in_sector,
335                        segment_root,
336                        recent_segments: self.consensus_constants.recent_segments,
337                        recent_history_fraction: self.consensus_constants.recent_history_fraction,
338                        min_sector_lifetime: self.consensus_constants.min_sector_lifetime,
339                        current_history_size: history_size,
340                        sector_expiration_check_segment_root,
341                    }),
342                },
343            );
344
345            match solution_verification_result {
346                Ok(()) => {
347                    if maybe_consensus_info.is_none() {
348                        info!(%slot, "🚜 Claimed slot");
349                        maybe_consensus_info.replace(BlockHeaderConsensusInfo {
350                            slot,
351                            proof_of_time,
352                            future_proof_of_time,
353                            solution,
354                        });
355                    } else {
356                        info!(
357                            %slot,
358                            "Skipping solution that has quality sufficient for block because \
359                            slot has already been claimed",
360                        );
361                    }
362                }
363                Err(error @ SolutionVerifyError::OutsideSolutionRange { .. }) => {
364                    // Solution range might have just adjusted, but when farmer was auditing they
365                    // didn't know about this, so downgrade warning to debug message
366                    if parent_consensus_parameters.next_solution_range.is_some() {
367                        debug!(
368                            %slot,
369                            %error,
370                            "Invalid solution received",
371                        );
372                    } else {
373                        warn!(
374                            %slot,
375                            %error,
376                            "Invalid solution received",
377                        );
378                    }
379                }
380                Err(error) => {
381                    warn!(
382                        %slot,
383                        %error,
384                        "Invalid solution received",
385                    );
386                }
387            }
388        }
389
390        maybe_consensus_info.map(|consensus_info| ClaimedSlot {
391            consensus_info,
392            checkpoints,
393        })
394    }
395
396    /// Create new Subspace slot worker
397    pub fn new(
398        SubspaceSlotWorkerOptions {
399            block_builder,
400            block_import,
401            beacon_chain_info,
402            chain_info,
403            chain_sync_status,
404            force_authoring,
405            new_slot_notification_sender,
406            block_sealing_notification_sender,
407            segment_headers_store,
408            consensus_constants,
409            pot_verifier,
410        }: SubspaceSlotWorkerOptions<BB, BI, BCI, CI, CSS>,
411    ) -> Self {
412        Self {
413            block_builder,
414            block_import,
415            beacon_chain_info,
416            chain_info,
417            chain_sync_status,
418            force_authoring,
419            new_slot_notification_sender,
420            block_sealing_notification_sender,
421            segment_headers_store,
422            pending_solutions: BTreeMap::new(),
423            pot_checkpoints: BTreeMap::new(),
424            consensus_constants,
425            pot_verifier,
426            _pos_table: PhantomData,
427        }
428    }
429
430    /// Run slot worker
431    pub async fn run(mut self, mut slot_info_stream: PotSlotInfoStream) {
432        let mut maybe_last_processed_slot = None;
433
434        loop {
435            let PotSlotInfo { slot, checkpoints } = match slot_info_stream.recv().await {
436                Ok(slot_info) => slot_info,
437                Err(error) => match error {
438                    broadcast::error::RecvError::Closed => {
439                        info!("No Slot info senders available. Exiting slot worker.");
440                        return;
441                    }
442                    broadcast::error::RecvError::Lagged(skipped_notifications) => {
443                        debug!(
444                            "Slot worker is lagging. Skipped {} slot notification(s)",
445                            skipped_notifications
446                        );
447                        continue;
448                    }
449                },
450            };
451            if let Some(last_processed_slot) = maybe_last_processed_slot
452                && last_processed_slot >= slot
453            {
454                // Already processed
455                continue;
456            }
457            maybe_last_processed_slot.replace(slot);
458
459            self.store_checkpoints(slot, checkpoints);
460
461            let best_beacon_chain_header = self.beacon_chain_info.best_header();
462            let best_beacon_chain_header = best_beacon_chain_header.header();
463            let best_header = self.chain_info.best_header();
464            let best_header = best_header.header();
465            let best_root = &*best_header.root();
466
467            self.on_new_slot(slot, checkpoints, best_root, best_beacon_chain_header);
468
469            if self.chain_sync_status.is_syncing() {
470                debug!(%slot, "Skipping proposal slot due to sync");
471                continue;
472            }
473
474            // Slots that we claim must be `block_authoring_delay` behind the best slot we know of
475            let Some(slot_to_claim) =
476                slot.checked_sub(self.consensus_constants.block_authoring_delay)
477            else {
478                trace!("Skipping very early slot during chain start");
479                continue;
480            };
481
482            let Some(block) = self
483                .produce_block(
484                    slot_to_claim,
485                    best_root,
486                    best_header,
487                    best_beacon_chain_header,
488                )
489                .await
490            else {
491                continue;
492            };
493
494            let block_import_fut = match self.block_import.import(block, BlockOrigin::Local) {
495                Ok(block_import_fut) => block_import_fut,
496                Err(error) => {
497                    error!(%best_root, %error, "Failed to queue newly produced block for import");
498                    continue;
499                }
500            };
501
502            match block_import_fut.await {
503                Ok(()) => {
504                    // Nothing else to do
505                }
506                Err(error) => {
507                    warn!(%best_root, %error, "Failed to import newly produced block");
508                }
509            }
510        }
511    }
512
513    /// Handle new slot: store checkpoints and generate notification for farmer
514    fn store_checkpoints(&mut self, slot: SlotNumber, checkpoints: PotCheckpoints) {
515        // Remove checkpoints from future slots, if present they are out of date anyway
516        self.pot_checkpoints
517            .retain(|&stored_slot, _checkpoints| stored_slot < slot);
518
519        self.pot_checkpoints.insert(slot, checkpoints);
520    }
521
522    /// Handle new slot: store checkpoints and generate notification for farmer
523    fn on_new_slot(
524        &mut self,
525        slot: SlotNumber,
526        checkpoints: PotCheckpoints,
527        _best_root: &BlockRoot,
528        best_beacon_chain_header: &BeaconChainHeader<'_>,
529    ) {
530        if self.chain_sync_status.is_syncing() {
531            debug!("Skipping farming slot {slot} due to sync");
532            return;
533        }
534
535        let proof_of_time = checkpoints.output();
536
537        // NOTE: Best hash is not necessarily going to be the parent of corresponding block, but
538        // solution range shouldn't be too far off
539        let solution_range = best_beacon_chain_header
540            .consensus_parameters()
541            .next_solution_range
542            .unwrap_or(
543                best_beacon_chain_header
544                    .consensus_parameters()
545                    .fixed_parameters
546                    .solution_range,
547            );
548        let new_slot_info = NewSlotInfo {
549            slot,
550            proof_of_time,
551            solution_range,
552        };
553        let (solution_sender, solution_receiver) =
554            mpsc::channel(PENDING_SOLUTIONS_CHANNEL_CAPACITY);
555
556        if let Err(error) = self
557            .new_slot_notification_sender
558            .try_send(NewSlotNotification {
559                new_slot_info,
560                solution_sender,
561            })
562        {
563            warn!(%error, "Failed to send new slot notification");
564        }
565
566        self.pending_solutions.insert(slot, solution_receiver);
567    }
568
569    /// Called with slot for which block needs to be produced (if suitable solution was found)
570    async fn produce_block(
571        &mut self,
572        slot: SlotNumber,
573        parent_block_root: &BlockRoot,
574        parent_header: &<Block::Header as GenericOwnedBlockHeader>::Header<'_>,
575        parent_beacon_chain_header: &BeaconChainHeader<'_>,
576    ) -> Option<Block> {
577        if !self.force_authoring && self.chain_sync_status.is_offline() {
578            debug!("Skipping slot, waiting for the network");
579
580            return None;
581        }
582
583        let claimed_slot = self
584            .claim_slot(
585                parent_block_root,
586                parent_header,
587                parent_beacon_chain_header,
588                slot,
589            )
590            .await?;
591
592        debug!(%slot, "Starting block authorship");
593
594        let seal_block = {
595            let block_sealing_notification_sender = &mut self.block_sealing_notification_sender;
596            let public_key_hash = claimed_slot.consensus_info.solution.public_key_hash;
597
598            move |pre_seal_hash| async move {
599                let (seal_sender, seal_receiver) = oneshot::channel::<OwnedBlockHeaderSeal>();
600
601                if let Err(error) =
602                    block_sealing_notification_sender.try_send(BlockSealNotification {
603                        pre_seal_hash,
604                        public_key_hash,
605                        seal_sender,
606                    })
607                {
608                    warn!(%error, "Failed to send block sealing notification");
609                }
610
611                seal_receiver.await.ok()
612            }
613        };
614        let block = match self
615            .block_builder
616            .build(
617                parent_block_root,
618                parent_header,
619                &claimed_slot.consensus_info,
620                &claimed_slot.checkpoints,
621                seal_block,
622            )
623            .await
624        {
625            Ok(block) => block,
626            Err(error) => {
627                error!(%slot, %parent_block_root, %error, "Failed to build block");
628                return None;
629            }
630        };
631
632        let header = block.header().header();
633        info!(
634            number = %header.prefix.number,
635            root = %&*header.root(),
636            "🔖 Built new block",
637        );
638
639        Some(block)
640    }
641}