ab_client_block_authoring/
slot_worker.rs

1//! Slot worker drives block and vote production based on slots produced in
2//! [`ab_client_proof_of_time`].
3
4use crate::{BlockProducer, ClaimedSlot};
5use ab_client_api::{ChainInfo, ChainSyncStatus};
6use ab_client_consensus_common::ConsensusConstants;
7use ab_client_proof_of_time::PotNextSlotInput;
8use ab_client_proof_of_time::source::{PotSlotInfo, PotSlotInfoStream};
9use ab_client_proof_of_time::verifier::PotVerifier;
10use ab_core_primitives::block::BlockNumber;
11use ab_core_primitives::block::header::{
12    BeaconChainHeader, BlockHeaderConsensusInfo, OwnedBlockHeaderSeal,
13};
14use ab_core_primitives::block::owned::OwnedBeaconChainBlock;
15use ab_core_primitives::hashes::Blake3Hash;
16use ab_core_primitives::pot::{PotCheckpoints, PotOutput, PotParametersChange, SlotNumber};
17use ab_core_primitives::sectors::SectorId;
18use ab_core_primitives::segments::HistorySize;
19use ab_core_primitives::solutions::{
20    Solution, SolutionRange, SolutionVerifyError, SolutionVerifyParams,
21    SolutionVerifyPieceCheckParams,
22};
23use ab_proof_of_space::Table;
24use futures::StreamExt;
25use futures::channel::{mpsc, oneshot};
26use send_future::SendFuture;
27use std::collections::BTreeMap;
28use std::marker::PhantomData;
29use std::time::Duration;
30use tokio::sync::broadcast;
31use tracing::{debug, info, trace, warn};
32
33/// Large enough size for any practical purposes, there shouldn't be even this many solutions.
34const PENDING_SOLUTIONS_CHANNEL_CAPACITY: usize = 10;
35const BLOCK_SEALING_TIMEOUT: Duration = Duration::from_millis(500);
36
37/// Information about a new slot that just arrived
38#[derive(Debug, Copy, Clone)]
39pub struct NewSlotInfo {
40    /// Slot number
41    pub slot: SlotNumber,
42    /// The PoT output for `slot`
43    pub proof_of_time: PotOutput,
44    /// Acceptable solution range for block authoring
45    pub solution_range: SolutionRange,
46}
47
48/// New slot notification with slot information and sender for a solution for the slot.
49#[derive(Debug, Clone)]
50pub struct NewSlotNotification {
51    /// New slot information.
52    pub new_slot_info: NewSlotInfo,
53    /// Sender that can be used to send solutions for the slot.
54    pub solution_sender: mpsc::Sender<Solution>,
55}
56/// Notification with a pre-seal hash that needs to be sealed (signed) to create a block and receive
57/// a block reward
58#[derive(Debug)]
59pub struct BlockSealNotification {
60    /// Hash to be signed.
61    pub pre_seal_hash: Blake3Hash,
62    /// Public key hash of the plot identity that should create signature
63    pub public_key_hash: Blake3Hash,
64    /// Sender that can be used to send the seal
65    pub seal_sender: oneshot::Sender<OwnedBlockHeaderSeal>,
66}
67
68/// Options for [`SlotWorker`]
69#[derive(Debug)]
70pub struct SlotWorkerOptions<BP, BCI, CSS> {
71    /// Producer of a new block
72    pub block_producer: BP,
73    /// Beacon chain info
74    pub beacon_chain_info: BCI,
75    /// Chain sync status
76    pub chain_sync_status: CSS,
77    /// Force authoring of blocks even if we are offline
78    pub force_authoring: bool,
79    /// Sender for new slot notifications
80    pub new_slot_notification_sender: mpsc::Sender<NewSlotNotification>,
81    /// Sender for block sealing notifications
82    pub block_sealing_notification_sender: mpsc::Sender<BlockSealNotification>,
83    /// Consensus constants
84    pub consensus_constants: ConsensusConstants,
85    /// Proof of time verifier
86    pub pot_verifier: PotVerifier,
87}
88
89/// Slot worker responsible for block production
90#[derive(Debug)]
91pub struct SlotWorker<PosTable, BP, BCI, CSS> {
92    block_producer: BP,
93    beacon_chain_info: BCI,
94    chain_sync_status: CSS,
95    force_authoring: bool,
96    new_slot_notification_sender: mpsc::Sender<NewSlotNotification>,
97    block_sealing_notification_sender: mpsc::Sender<BlockSealNotification>,
98    /// Solution receivers for challenges that were sent to farmers and expected to be received
99    /// eventually
100    pending_solutions: BTreeMap<SlotNumber, mpsc::Receiver<Solution>>,
101    /// Collection of PoT slots that can be retrieved later if needed by block production
102    pot_checkpoints: BTreeMap<SlotNumber, PotCheckpoints>,
103    consensus_constants: ConsensusConstants,
104    pot_verifier: PotVerifier,
105    _pos_table: PhantomData<PosTable>,
106}
107
108impl<PosTable, BP, BCI, CSS> SlotWorker<PosTable, BP, BCI, CSS>
109where
110    PosTable: Table,
111    BP: BlockProducer,
112    BCI: ChainInfo<OwnedBeaconChainBlock>,
113    CSS: ChainSyncStatus,
114{
115    /// Create a new slot worker
116    pub fn new(
117        SlotWorkerOptions {
118            block_producer,
119            beacon_chain_info,
120            chain_sync_status,
121            force_authoring,
122            new_slot_notification_sender,
123            block_sealing_notification_sender,
124            consensus_constants,
125            pot_verifier,
126        }: SlotWorkerOptions<BP, BCI, CSS>,
127    ) -> Self {
128        Self {
129            block_producer,
130            beacon_chain_info,
131            chain_sync_status,
132            force_authoring,
133            new_slot_notification_sender,
134            block_sealing_notification_sender,
135            pending_solutions: BTreeMap::new(),
136            pot_checkpoints: BTreeMap::new(),
137            consensus_constants,
138            pot_verifier,
139            _pos_table: PhantomData,
140        }
141    }
142
143    /// Run slot worker
144    pub async fn run(mut self, mut slot_info_stream: PotSlotInfoStream) {
145        let mut maybe_last_processed_slot = None;
146
147        loop {
148            let PotSlotInfo { slot, checkpoints } = match slot_info_stream.recv().await {
149                Ok(slot_info) => slot_info,
150                Err(error) => match error {
151                    broadcast::error::RecvError::Closed => {
152                        info!("No Slot info senders available. Exiting slot worker.");
153                        return;
154                    }
155                    broadcast::error::RecvError::Lagged(skipped_notifications) => {
156                        debug!(
157                            "Slot worker is lagging. Skipped {} slot notification(s)",
158                            skipped_notifications
159                        );
160                        continue;
161                    }
162                },
163            };
164            if let Some(last_processed_slot) = maybe_last_processed_slot
165                && last_processed_slot >= slot
166            {
167                // Already processed
168                continue;
169            }
170            maybe_last_processed_slot.replace(slot);
171
172            self.store_checkpoints(slot, checkpoints);
173
174            let best_beacon_chain_header = self.beacon_chain_info.best_header();
175            let best_beacon_chain_header = best_beacon_chain_header.header();
176            self.on_new_slot(slot, checkpoints, best_beacon_chain_header);
177
178            if self.chain_sync_status.is_syncing() {
179                debug!(%slot, "Skipping proposal slot due to sync");
180                continue;
181            }
182
183            // Slots that we claim must be `block_authoring_delay` behind the best slot we know of
184            let Some(slot_to_claim) =
185                slot.checked_sub(self.consensus_constants.block_authoring_delay)
186            else {
187                trace!("Skipping a very early slot during chain start");
188                continue;
189            };
190
191            if !self.force_authoring && self.chain_sync_status.is_offline() {
192                debug!("Skipping slot, waiting for the network");
193
194                continue;
195            }
196
197            let Some(claimed_slot) = self
198                .claim_slot(best_beacon_chain_header, slot_to_claim)
199                .await
200            else {
201                continue;
202            };
203
204            debug!(
205                slot = %claimed_slot.consensus_info.slot,
206                "Starting block authorship"
207            );
208
209            let seal_block = {
210                let block_sealing_notification_sender = &mut self.block_sealing_notification_sender;
211                let public_key_hash = claimed_slot.consensus_info.solution.public_key_hash;
212
213                move |pre_seal_hash| async move {
214                    let (seal_sender, seal_receiver) = oneshot::channel::<OwnedBlockHeaderSeal>();
215
216                    if let Err(error) =
217                        block_sealing_notification_sender.try_send(BlockSealNotification {
218                            pre_seal_hash,
219                            public_key_hash,
220                            seal_sender,
221                        })
222                    {
223                        warn!(%error, "Failed to send block sealing notification");
224                    }
225
226                    match tokio::time::timeout(BLOCK_SEALING_TIMEOUT, seal_receiver).await {
227                        Ok(Ok(seal)) => Some(seal),
228                        _ => None,
229                    }
230                }
231            };
232
233            // TODO: `.send()` is a hack for compiler bug, see:
234            //  https://github.com/rust-lang/rust/issues/100013#issuecomment-2210995259
235            self.block_producer
236                .produce_block(claimed_slot, best_beacon_chain_header, seal_block)
237                .send()
238                .await;
239        }
240    }
241
242    /// Handle new slot: store checkpoints and generate notification for a farmer
243    fn store_checkpoints(&mut self, slot: SlotNumber, checkpoints: PotCheckpoints) {
244        // Remove checkpoints from future slots, if present they are out of date anyway
245        self.pot_checkpoints
246            .retain(|&stored_slot, _checkpoints| stored_slot < slot);
247
248        self.pot_checkpoints.insert(slot, checkpoints);
249    }
250
251    /// Handle new slot: store checkpoints and generate notification for a farmer
252    fn on_new_slot(
253        &mut self,
254        slot: SlotNumber,
255        checkpoints: PotCheckpoints,
256        best_beacon_chain_header: &BeaconChainHeader<'_>,
257    ) {
258        if self.chain_sync_status.is_syncing() {
259            debug!("Skipping farming slot {slot} due to sync");
260            return;
261        }
262
263        let proof_of_time = checkpoints.output();
264
265        // NOTE: Best hash is not necessarily going to be the parent of the corresponding block, but
266        // solution range shouldn't be too far off
267        let solution_range = best_beacon_chain_header
268            .consensus_parameters()
269            .next_solution_range
270            .unwrap_or(
271                best_beacon_chain_header
272                    .consensus_parameters()
273                    .fixed_parameters
274                    .solution_range,
275            );
276        let new_slot_info = NewSlotInfo {
277            slot,
278            proof_of_time,
279            solution_range,
280        };
281        let (solution_sender, solution_receiver) =
282            mpsc::channel(PENDING_SOLUTIONS_CHANNEL_CAPACITY);
283
284        if let Err(error) = self
285            .new_slot_notification_sender
286            .try_send(NewSlotNotification {
287                new_slot_info,
288                solution_sender,
289            })
290        {
291            warn!(%error, "Failed to send a new slot notification");
292        }
293
294        self.pending_solutions.insert(slot, solution_receiver);
295    }
296    async fn claim_slot(
297        &mut self,
298        parent_beacon_chain_header: &BeaconChainHeader<'_>,
299        slot: SlotNumber,
300    ) -> Option<ClaimedSlot> {
301        let parent_number = parent_beacon_chain_header.prefix.number;
302        let parent_slot = parent_beacon_chain_header.consensus_info.slot;
303
304        if slot <= parent_slot {
305            debug!(
306                "Skipping claiming slot {slot}, it must be higher than parent slot {parent_slot}",
307            );
308
309            return None;
310        } else {
311            debug!(%slot, "Attempting to claim a slot");
312        }
313
314        let parent_consensus_parameters = parent_beacon_chain_header.consensus_parameters();
315
316        let solution_range = parent_consensus_parameters
317            .next_solution_range
318            .unwrap_or(parent_consensus_parameters.fixed_parameters.solution_range);
319
320        let parent_pot_parameters_change = parent_consensus_parameters
321            .pot_parameters_change
322            .copied()
323            .map(PotParametersChange::from);
324        let parent_future_slot = if parent_number == BlockNumber::ZERO {
325            parent_slot
326        } else {
327            parent_slot + self.consensus_constants.block_authoring_delay
328        };
329
330        let (proof_of_time, future_proof_of_time, checkpoints) = {
331            // Remove checkpoints from old slots we will not need anymore
332            self.pot_checkpoints
333                .retain(|&stored_slot, _checkpoints| stored_slot > parent_slot);
334
335            let proof_of_time = self.pot_checkpoints.get(&slot)?.output();
336
337            // Future slot for which proof must be available before authoring a block at this slot
338            let future_slot = slot + self.consensus_constants.block_authoring_delay;
339
340            let pot_input = if parent_number == BlockNumber::ZERO {
341                PotNextSlotInput {
342                    slot: parent_slot + SlotNumber::ONE,
343                    slot_iterations: parent_consensus_parameters.fixed_parameters.slot_iterations,
344                    seed: self.pot_verifier.genesis_seed(),
345                }
346            } else {
347                PotNextSlotInput::derive(
348                    parent_consensus_parameters.fixed_parameters.slot_iterations,
349                    parent_slot,
350                    parent_beacon_chain_header.consensus_info.proof_of_time,
351                    &parent_pot_parameters_change,
352                )
353            };
354
355            // Ensure proof of time is valid, according to parent block
356            if !self.pot_verifier.is_output_valid(
357                pot_input,
358                slot - parent_slot,
359                proof_of_time,
360                parent_pot_parameters_change,
361            ) {
362                warn!(
363                    %slot,
364                    ?pot_input,
365                    consensus_info = ?parent_beacon_chain_header.consensus_info,
366                    "Proof of time is invalid, skipping block authoring at the slot"
367                );
368                return None;
369            }
370
371            let mut checkpoints_pot_input = if parent_number == BlockNumber::ZERO {
372                PotNextSlotInput {
373                    slot: parent_slot + SlotNumber::ONE,
374                    slot_iterations: parent_consensus_parameters.fixed_parameters.slot_iterations,
375                    seed: self.pot_verifier.genesis_seed(),
376                }
377            } else {
378                PotNextSlotInput::derive(
379                    parent_consensus_parameters.fixed_parameters.slot_iterations,
380                    parent_future_slot,
381                    parent_beacon_chain_header
382                        .consensus_info
383                        .future_proof_of_time,
384                    &parent_pot_parameters_change,
385                )
386            };
387
388            let mut checkpoints =
389                Vec::with_capacity((future_slot - parent_future_slot).as_u64() as usize);
390
391            for slot in parent_future_slot + SlotNumber::ONE..=future_slot {
392                let maybe_slot_checkpoints = self.pot_verifier.get_checkpoints(
393                    checkpoints_pot_input.slot_iterations,
394                    checkpoints_pot_input.seed,
395                );
396                let Some(slot_checkpoints) = maybe_slot_checkpoints else {
397                    warn!("Proving failed during block authoring");
398                    return None;
399                };
400
401                checkpoints.push(slot_checkpoints);
402
403                checkpoints_pot_input = PotNextSlotInput::derive(
404                    checkpoints_pot_input.slot_iterations,
405                    slot,
406                    slot_checkpoints.output(),
407                    &parent_pot_parameters_change,
408                );
409            }
410
411            let future_proof_of_time = checkpoints
412                .last()
413                .expect("Never empty, there is at least one slot between blocks; qed")
414                .output();
415
416            (proof_of_time, future_proof_of_time, checkpoints)
417        };
418
419        let mut solution_receiver = {
420            // Remove receivers for old slots we will not need anymore
421            self.pending_solutions
422                .retain(|&stored_slot, _solution_receiver| stored_slot >= slot);
423
424            let mut solution_receiver = self.pending_solutions.remove(&slot)?;
425            // Time is out, we will not accept any more solutions
426            solution_receiver.close();
427            solution_receiver
428        };
429
430        let mut maybe_consensus_info = None;
431
432        // TODO: Consider skipping most/all checks here and do them in block import instead
433        while let Some(solution) = solution_receiver.next().await {
434            let sector_id = SectorId::new(
435                &solution.public_key_hash,
436                solution.sector_index,
437                solution.history_size,
438            );
439
440            // TODO: Query it from an actual chain
441            // let history_size = runtime_api.history_size(parent_block_root).ok()?;
442            // let max_pieces_in_sector = runtime_api.max_pieces_in_sector(parent_block_root).ok()?;
443            let history_size = HistorySize::ONE;
444            let max_pieces_in_sector = 1000;
445
446            let segment_index = sector_id
447                .derive_piece_index(
448                    solution.piece_offset,
449                    solution.history_size,
450                    max_pieces_in_sector,
451                    self.consensus_constants.recent_segments,
452                    self.consensus_constants.recent_history_fraction,
453                )
454                .segment_index();
455            let maybe_segment_root = self
456                .beacon_chain_info
457                .get_segment_header(segment_index)
458                .map(|segment_header| segment_header.segment_root);
459
460            let segment_root = match maybe_segment_root {
461                Some(segment_root) => segment_root,
462                None => {
463                    warn!(
464                        %slot,
465                        %segment_index,
466                        "Segment root not found",
467                    );
468                    continue;
469                }
470            };
471            let sector_expiration_check_segment_index = match solution
472                .history_size
473                .sector_expiration_check(self.consensus_constants.min_sector_lifetime)
474            {
475                Some(sector_expiration_check) => sector_expiration_check.segment_index(),
476                None => {
477                    continue;
478                }
479            };
480            let sector_expiration_check_segment_root = self
481                .beacon_chain_info
482                .get_segment_header(sector_expiration_check_segment_index)
483                .map(|segment_header| segment_header.segment_root);
484
485            let solution_verification_result = solution.verify::<PosTable>(
486                slot,
487                &SolutionVerifyParams {
488                    proof_of_time,
489                    solution_range,
490                    piece_check_params: Some(SolutionVerifyPieceCheckParams {
491                        max_pieces_in_sector,
492                        segment_root,
493                        recent_segments: self.consensus_constants.recent_segments,
494                        recent_history_fraction: self.consensus_constants.recent_history_fraction,
495                        min_sector_lifetime: self.consensus_constants.min_sector_lifetime,
496                        current_history_size: history_size,
497                        sector_expiration_check_segment_root,
498                    }),
499                },
500            );
501
502            match solution_verification_result {
503                Ok(()) => {
504                    if maybe_consensus_info.is_none() {
505                        debug!(%slot, "🚜 Claimed slot");
506                        maybe_consensus_info.replace(BlockHeaderConsensusInfo {
507                            slot,
508                            proof_of_time,
509                            future_proof_of_time,
510                            solution,
511                        });
512                    } else {
513                        debug!(
514                            %slot,
515                            "Skipping a solution that has quality sufficient for block because \
516                            slot has already been claimed",
517                        );
518                    }
519                }
520                Err(error @ SolutionVerifyError::OutsideSolutionRange { .. }) => {
521                    // Solution range might have just adjusted, but when a farmer was auditing it
522                    // didn't know about this, so downgrade the warning to a debug message
523                    if parent_consensus_parameters.next_solution_range.is_some() {
524                        debug!(
525                            %slot,
526                            %error,
527                            "Invalid solution received",
528                        );
529                    } else {
530                        warn!(
531                            %slot,
532                            %error,
533                            "Invalid solution received",
534                        );
535                    }
536                }
537                Err(error) => {
538                    warn!(
539                        %slot,
540                        %error,
541                        "Invalid solution received",
542                    );
543                }
544            }
545        }
546
547        maybe_consensus_info.map(|consensus_info| ClaimedSlot {
548            consensus_info,
549            checkpoints,
550        })
551    }
552}