ab_client_block_authoring/
slot_worker.rs

1//! Slot worker drives block and vote production based on slots produced in
2//! [`ab_client_proof_of_time`].
3
4use crate::{BlockProducer, ClaimedSlot};
5use ab_client_api::{ChainInfo, ChainSyncStatus};
6use ab_client_consensus_common::ConsensusConstants;
7use ab_client_consensus_common::consensus_parameters::shard_membership_entropy_source;
8use ab_client_proof_of_time::PotNextSlotInput;
9use ab_client_proof_of_time::source::{PotSlotInfo, PotSlotInfoStream};
10use ab_client_proof_of_time::verifier::PotVerifier;
11use ab_core_primitives::block::BlockNumber;
12use ab_core_primitives::block::header::{
13    BeaconChainHeader, BlockHeaderConsensusInfo, OwnedBlockHeaderSeal,
14};
15use ab_core_primitives::block::owned::OwnedBeaconChainBlock;
16use ab_core_primitives::hashes::Blake3Hash;
17use ab_core_primitives::pot::{PotCheckpoints, PotOutput, PotParametersChange, SlotNumber};
18use ab_core_primitives::shard::NumShards;
19use ab_core_primitives::solutions::{ShardMembershipEntropy, Solution, SolutionRange};
20use ab_proof_of_space::Table;
21use futures::StreamExt;
22use futures::channel::{mpsc, oneshot};
23use send_future::SendFuture;
24use std::collections::BTreeMap;
25use std::marker::PhantomData;
26use std::time::Duration;
27use tokio::sync::broadcast;
28use tracing::{debug, error, info, trace, warn};
29
30/// Large enough size for any practical purposes, there shouldn't be even this many solutions.
31const PENDING_SOLUTIONS_CHANNEL_CAPACITY: usize = 10;
32const BLOCK_SEALING_TIMEOUT: Duration = Duration::from_millis(500);
33
34/// Information about a new slot that just arrived
35#[derive(Debug, Copy, Clone)]
36pub struct NewSlotInfo {
37    /// Slot number
38    pub slot: SlotNumber,
39    /// The PoT output for `slot`
40    pub proof_of_time: PotOutput,
41    /// Acceptable solution range for block authoring
42    pub solution_range: SolutionRange,
43    /// Shard membership entropy
44    pub shard_membership_entropy: ShardMembershipEntropy,
45    /// The number of shards in the network
46    pub num_shards: NumShards,
47}
48
49/// New slot notification with slot information and sender for a solution for the slot.
50#[derive(Debug, Clone)]
51pub struct NewSlotNotification {
52    /// New slot information.
53    pub new_slot_info: NewSlotInfo,
54    /// Sender that can be used to send solutions for the slot.
55    pub solution_sender: mpsc::Sender<Solution>,
56}
57/// Notification with a pre-seal hash that needs to be sealed (signed) to create a block and receive
58/// a block reward
59#[derive(Debug)]
60pub struct BlockSealNotification {
61    /// Hash to be signed.
62    pub pre_seal_hash: Blake3Hash,
63    /// Public key hash of the plot identity that should create signature
64    pub public_key_hash: Blake3Hash,
65    /// Sender that can be used to send the seal
66    pub seal_sender: oneshot::Sender<OwnedBlockHeaderSeal>,
67}
68
69/// Options for [`SlotWorker`]
70#[derive(Debug)]
71pub struct SlotWorkerOptions<BP, BCI, CSS> {
72    /// Producer of a new block
73    pub block_producer: BP,
74    /// Beacon chain info
75    pub beacon_chain_info: BCI,
76    /// Chain sync status
77    pub chain_sync_status: CSS,
78    /// Force authoring of blocks even if we are offline
79    pub force_authoring: bool,
80    /// Sender for new slot notifications
81    pub new_slot_notification_sender: mpsc::Sender<NewSlotNotification>,
82    /// Sender for block sealing notifications
83    pub block_sealing_notification_sender: mpsc::Sender<BlockSealNotification>,
84    /// Consensus constants
85    pub consensus_constants: ConsensusConstants,
86    /// Proof of time verifier
87    pub pot_verifier: PotVerifier,
88}
89
90/// Slot worker responsible for block production
91#[derive(Debug)]
92pub struct SlotWorker<PosTable, BP, BCI, CSS> {
93    block_producer: BP,
94    beacon_chain_info: BCI,
95    chain_sync_status: CSS,
96    force_authoring: bool,
97    new_slot_notification_sender: mpsc::Sender<NewSlotNotification>,
98    block_sealing_notification_sender: mpsc::Sender<BlockSealNotification>,
99    /// Solution receivers for challenges that were sent to farmers and expected to be received
100    /// eventually
101    pending_solutions: BTreeMap<SlotNumber, mpsc::Receiver<Solution>>,
102    /// Collection of PoT slots that can be retrieved later if needed by block production
103    pot_checkpoints: BTreeMap<SlotNumber, PotCheckpoints>,
104    consensus_constants: ConsensusConstants,
105    pot_verifier: PotVerifier,
106    _pos_table: PhantomData<PosTable>,
107}
108
109impl<PosTable, BP, BCI, CSS> SlotWorker<PosTable, BP, BCI, CSS>
110where
111    PosTable: Table,
112    BP: BlockProducer,
113    BCI: ChainInfo<OwnedBeaconChainBlock>,
114    CSS: ChainSyncStatus,
115{
116    /// Create a new slot worker
117    pub fn new(
118        SlotWorkerOptions {
119            block_producer,
120            beacon_chain_info,
121            chain_sync_status,
122            force_authoring,
123            new_slot_notification_sender,
124            block_sealing_notification_sender,
125            consensus_constants,
126            pot_verifier,
127        }: SlotWorkerOptions<BP, BCI, CSS>,
128    ) -> Self {
129        Self {
130            block_producer,
131            beacon_chain_info,
132            chain_sync_status,
133            force_authoring,
134            new_slot_notification_sender,
135            block_sealing_notification_sender,
136            pending_solutions: BTreeMap::new(),
137            pot_checkpoints: BTreeMap::new(),
138            consensus_constants,
139            pot_verifier,
140            _pos_table: PhantomData,
141        }
142    }
143
144    /// Run slot worker
145    pub async fn run(mut self, mut slot_info_stream: PotSlotInfoStream) {
146        let mut last_processed_slot = SlotNumber::ZERO;
147
148        loop {
149            let PotSlotInfo { slot, checkpoints } = match slot_info_stream.recv().await {
150                Ok(slot_info) => slot_info,
151                Err(error) => match error {
152                    broadcast::error::RecvError::Closed => {
153                        info!("No Slot info senders available. Exiting slot worker.");
154                        return;
155                    }
156                    broadcast::error::RecvError::Lagged(skipped_notifications) => {
157                        debug!(
158                            "Slot worker is lagging. Skipped {} slot notification(s)",
159                            skipped_notifications
160                        );
161                        continue;
162                    }
163                },
164            };
165
166            if last_processed_slot >= slot {
167                // Already processed
168                continue;
169            }
170            last_processed_slot = slot;
171
172            let best_beacon_chain_header = self.beacon_chain_info.best_header();
173            let best_beacon_chain_header = best_beacon_chain_header.header();
174
175            // Store checkpoints
176            {
177                // Remove checkpoints from future slots, if present they are out of date anyway
178                self.pot_checkpoints
179                    .retain(|&stored_slot, _checkpoints| stored_slot < slot);
180
181                self.pot_checkpoints.insert(slot, checkpoints);
182            }
183
184            if self.chain_sync_status.is_syncing() {
185                debug!(%slot, "Skipping farming due to syncing");
186                return;
187            }
188
189            // TODO: Maybe handle the boundary in some way, like checking already received
190            //  solutions (which are waiting for future PoT to produce a block) or send both entropy
191            //  sources at the interval boundary
192            // NOTE: Beacon chain block number may change before the next block is produced,
193            // rendering the entropy source invalid, but it should not happen often.
194            let shard_membership_entropy = match shard_membership_entropy_source(
195                best_beacon_chain_header.prefix.number + BlockNumber::ONE,
196                best_beacon_chain_header,
197                self.consensus_constants.shard_rotation_interval,
198                self.consensus_constants.shard_rotation_delay,
199                &self.beacon_chain_info,
200            ) {
201                Ok(shard_membership_entropy) => shard_membership_entropy,
202                Err(error) => {
203                    error!(%error, "Failed to find shard membership entropy");
204                    break;
205                }
206            };
207
208            let proof_of_time = checkpoints.output();
209
210            // Send slot notification to farmers
211            {
212                let consensus_parameters = best_beacon_chain_header.consensus_parameters();
213                // NOTE: Best bock is not necessarily going to be the parent of the corresponding
214                // block once it is created, but solution range and number of shards should be the
215                // same most of the time
216                let solution_range = consensus_parameters
217                    .next_solution_range
218                    .unwrap_or(consensus_parameters.fixed_parameters.solution_range);
219                let new_slot_info = NewSlotInfo {
220                    slot,
221                    proof_of_time,
222                    solution_range,
223                    shard_membership_entropy,
224                    num_shards: consensus_parameters.fixed_parameters.num_shards,
225                };
226                let (solution_sender, solution_receiver) =
227                    mpsc::channel(PENDING_SOLUTIONS_CHANNEL_CAPACITY);
228
229                if let Err(error) =
230                    self.new_slot_notification_sender
231                        .try_send(NewSlotNotification {
232                            new_slot_info,
233                            solution_sender,
234                        })
235                {
236                    warn!(%error, "Failed to send a new slot notification");
237                }
238
239                self.pending_solutions.insert(slot, solution_receiver);
240            }
241
242            // Slots that we claim must be `block_authoring_delay` behind the best slot we know of
243            let Some(slot_to_claim) =
244                slot.checked_sub(self.consensus_constants.block_authoring_delay)
245            else {
246                trace!("Skipping a very early slot during chain start");
247                continue;
248            };
249
250            if !self.force_authoring && self.chain_sync_status.is_offline() {
251                debug!("Skipping slot, waiting for the network");
252
253                continue;
254            }
255
256            let Some(claimed_slot) = self
257                .claim_slot(best_beacon_chain_header, slot_to_claim)
258                .await
259            else {
260                continue;
261            };
262
263            debug!(
264                slot = %claimed_slot.consensus_info.slot,
265                "Starting block authorship"
266            );
267
268            let seal_block = {
269                let block_sealing_notification_sender = &mut self.block_sealing_notification_sender;
270                let public_key_hash = claimed_slot.consensus_info.solution.public_key_hash;
271
272                move |pre_seal_hash| async move {
273                    let (seal_sender, seal_receiver) = oneshot::channel::<OwnedBlockHeaderSeal>();
274
275                    if let Err(error) =
276                        block_sealing_notification_sender.try_send(BlockSealNotification {
277                            pre_seal_hash,
278                            public_key_hash,
279                            seal_sender,
280                        })
281                    {
282                        warn!(%error, "Failed to send block sealing notification");
283                    }
284
285                    match tokio::time::timeout(BLOCK_SEALING_TIMEOUT, seal_receiver).await {
286                        Ok(Ok(seal)) => Some(seal),
287                        _ => None,
288                    }
289                }
290            };
291
292            // TODO: `.send()` is a hack for compiler bug, see:
293            //  https://github.com/rust-lang/rust/issues/100013#issuecomment-2210995259
294            self.block_producer
295                .produce_block(claimed_slot, best_beacon_chain_header, seal_block)
296                .send()
297                .await;
298        }
299    }
300
301    async fn claim_slot(
302        &mut self,
303        parent_beacon_chain_header: &BeaconChainHeader<'_>,
304        slot: SlotNumber,
305    ) -> Option<ClaimedSlot> {
306        let parent_number = parent_beacon_chain_header.prefix.number;
307        let parent_slot = parent_beacon_chain_header.consensus_info.slot;
308
309        if slot <= parent_slot {
310            debug!(
311                "Skipping claiming slot {slot}, it must be higher than parent slot {parent_slot}",
312            );
313
314            return None;
315        } else {
316            debug!(%slot, "Attempting to claim a slot");
317        }
318
319        let parent_consensus_parameters = parent_beacon_chain_header.consensus_parameters();
320
321        let parent_pot_parameters_change = parent_consensus_parameters
322            .pot_parameters_change
323            .copied()
324            .map(PotParametersChange::from);
325        let parent_future_slot = if parent_number == BlockNumber::ZERO {
326            parent_slot
327        } else {
328            parent_slot + self.consensus_constants.block_authoring_delay
329        };
330
331        let (proof_of_time, future_proof_of_time, checkpoints) = {
332            // Remove checkpoints from old slots we will not need anymore
333            self.pot_checkpoints
334                .retain(|&stored_slot, _checkpoints| stored_slot > parent_slot);
335
336            let proof_of_time = self.pot_checkpoints.get(&slot)?.output();
337
338            // Future slot for which proof must be available before authoring a block at this slot
339            let future_slot = slot + self.consensus_constants.block_authoring_delay;
340
341            let pot_input = if parent_number == BlockNumber::ZERO {
342                PotNextSlotInput {
343                    slot: parent_slot + SlotNumber::ONE,
344                    slot_iterations: parent_consensus_parameters.fixed_parameters.slot_iterations,
345                    seed: self.pot_verifier.genesis_seed(),
346                }
347            } else {
348                PotNextSlotInput::derive(
349                    parent_consensus_parameters.fixed_parameters.slot_iterations,
350                    parent_slot,
351                    parent_beacon_chain_header.consensus_info.proof_of_time,
352                    &parent_pot_parameters_change,
353                )
354            };
355
356            // Ensure proof of time is valid, according to parent block
357            if !self.pot_verifier.is_output_valid(
358                pot_input,
359                slot - parent_slot,
360                proof_of_time,
361                parent_pot_parameters_change,
362            ) {
363                warn!(
364                    %slot,
365                    ?pot_input,
366                    consensus_info = ?parent_beacon_chain_header.consensus_info,
367                    "Proof of time is invalid, skipping block authoring at the slot"
368                );
369                return None;
370            }
371
372            let mut checkpoints_pot_input = if parent_number == BlockNumber::ZERO {
373                PotNextSlotInput {
374                    slot: parent_slot + SlotNumber::ONE,
375                    slot_iterations: parent_consensus_parameters.fixed_parameters.slot_iterations,
376                    seed: self.pot_verifier.genesis_seed(),
377                }
378            } else {
379                PotNextSlotInput::derive(
380                    parent_consensus_parameters.fixed_parameters.slot_iterations,
381                    parent_future_slot,
382                    parent_beacon_chain_header
383                        .consensus_info
384                        .future_proof_of_time,
385                    &parent_pot_parameters_change,
386                )
387            };
388
389            let mut checkpoints =
390                Vec::with_capacity((future_slot - parent_future_slot).as_u64() as usize);
391
392            for slot in parent_future_slot + SlotNumber::ONE..=future_slot {
393                let maybe_slot_checkpoints = self.pot_verifier.get_checkpoints(
394                    checkpoints_pot_input.slot_iterations,
395                    checkpoints_pot_input.seed,
396                );
397                let Some(slot_checkpoints) = maybe_slot_checkpoints else {
398                    warn!("Proving failed during block authoring");
399                    return None;
400                };
401
402                checkpoints.push(slot_checkpoints);
403
404                checkpoints_pot_input = PotNextSlotInput::derive(
405                    checkpoints_pot_input.slot_iterations,
406                    slot,
407                    slot_checkpoints.output(),
408                    &parent_pot_parameters_change,
409                );
410            }
411
412            let future_proof_of_time = checkpoints
413                .last()
414                .expect("Never empty, there is at least one slot between blocks; qed")
415                .output();
416
417            (proof_of_time, future_proof_of_time, checkpoints)
418        };
419
420        let mut solution_receiver = {
421            // Remove receivers for old slots we will not need anymore
422            self.pending_solutions
423                .retain(|&stored_slot, _solution_receiver| stored_slot >= slot);
424
425            let mut solution_receiver = self.pending_solutions.remove(&slot)?;
426            // Time is out, we will not accept any more solutions
427            solution_receiver.close();
428            solution_receiver
429        };
430
431        let mut maybe_consensus_info = None;
432
433        while let Some(solution) = solution_receiver.next().await {
434            if maybe_consensus_info.is_none() {
435                debug!(%slot, "🚜 Claimed slot");
436                maybe_consensus_info.replace(BlockHeaderConsensusInfo {
437                    slot,
438                    proof_of_time,
439                    future_proof_of_time,
440                    solution,
441                });
442            } else {
443                debug!(
444                    %slot,
445                    "Skipping a solution that has quality sufficient for block because \
446                    slot has already been claimed",
447                );
448            }
449        }
450
451        maybe_consensus_info.map(|consensus_info| ClaimedSlot {
452            consensus_info,
453            checkpoints,
454        })
455    }
456}