1use crate::{BlockProducer, ClaimedSlot};
5use ab_client_api::{ChainInfo, ChainSyncStatus};
6use ab_client_consensus_common::ConsensusConstants;
7use ab_client_consensus_common::consensus_parameters::shard_membership_entropy_source;
8use ab_client_proof_of_time::PotNextSlotInput;
9use ab_client_proof_of_time::source::{PotSlotInfo, PotSlotInfoStream};
10use ab_client_proof_of_time::verifier::PotVerifier;
11use ab_core_primitives::block::BlockNumber;
12use ab_core_primitives::block::header::{
13 BeaconChainHeader, BlockHeaderConsensusInfo, OwnedBlockHeaderSeal,
14};
15use ab_core_primitives::block::owned::OwnedBeaconChainBlock;
16use ab_core_primitives::hashes::Blake3Hash;
17use ab_core_primitives::pot::{PotCheckpoints, PotOutput, PotParametersChange, SlotNumber};
18use ab_core_primitives::shard::NumShards;
19use ab_core_primitives::solutions::{ShardMembershipEntropy, Solution, SolutionRange};
20use ab_proof_of_space::Table;
21use futures::StreamExt;
22use futures::channel::{mpsc, oneshot};
23use send_future::SendFuture;
24use std::collections::BTreeMap;
25use std::marker::PhantomData;
26use std::time::Duration;
27use tokio::sync::broadcast;
28use tracing::{debug, error, info, trace, warn};
29
30const PENDING_SOLUTIONS_CHANNEL_CAPACITY: usize = 10;
32const BLOCK_SEALING_TIMEOUT: Duration = Duration::from_millis(500);
33
34#[derive(Debug, Copy, Clone)]
36pub struct NewSlotInfo {
37 pub slot: SlotNumber,
39 pub proof_of_time: PotOutput,
41 pub solution_range: SolutionRange,
43 pub shard_membership_entropy: ShardMembershipEntropy,
45 pub num_shards: NumShards,
47}
48
49#[derive(Debug, Clone)]
51pub struct NewSlotNotification {
52 pub new_slot_info: NewSlotInfo,
54 pub solution_sender: mpsc::Sender<Solution>,
56}
57#[derive(Debug)]
60pub struct BlockSealNotification {
61 pub pre_seal_hash: Blake3Hash,
63 pub public_key_hash: Blake3Hash,
65 pub seal_sender: oneshot::Sender<OwnedBlockHeaderSeal>,
67}
68
69#[derive(Debug)]
71pub struct SlotWorkerOptions<BP, BCI, CSS> {
72 pub block_producer: BP,
74 pub beacon_chain_info: BCI,
76 pub chain_sync_status: CSS,
78 pub force_authoring: bool,
80 pub new_slot_notification_sender: mpsc::Sender<NewSlotNotification>,
82 pub block_sealing_notification_sender: mpsc::Sender<BlockSealNotification>,
84 pub consensus_constants: ConsensusConstants,
86 pub pot_verifier: PotVerifier,
88}
89
90#[derive(Debug)]
92pub struct SlotWorker<PosTable, BP, BCI, CSS> {
93 block_producer: BP,
94 beacon_chain_info: BCI,
95 chain_sync_status: CSS,
96 force_authoring: bool,
97 new_slot_notification_sender: mpsc::Sender<NewSlotNotification>,
98 block_sealing_notification_sender: mpsc::Sender<BlockSealNotification>,
99 pending_solutions: BTreeMap<SlotNumber, mpsc::Receiver<Solution>>,
102 pot_checkpoints: BTreeMap<SlotNumber, PotCheckpoints>,
104 consensus_constants: ConsensusConstants,
105 pot_verifier: PotVerifier,
106 _pos_table: PhantomData<PosTable>,
107}
108
109impl<PosTable, BP, BCI, CSS> SlotWorker<PosTable, BP, BCI, CSS>
110where
111 PosTable: Table,
112 BP: BlockProducer,
113 BCI: ChainInfo<OwnedBeaconChainBlock>,
114 CSS: ChainSyncStatus,
115{
116 pub fn new(
118 SlotWorkerOptions {
119 block_producer,
120 beacon_chain_info,
121 chain_sync_status,
122 force_authoring,
123 new_slot_notification_sender,
124 block_sealing_notification_sender,
125 consensus_constants,
126 pot_verifier,
127 }: SlotWorkerOptions<BP, BCI, CSS>,
128 ) -> Self {
129 Self {
130 block_producer,
131 beacon_chain_info,
132 chain_sync_status,
133 force_authoring,
134 new_slot_notification_sender,
135 block_sealing_notification_sender,
136 pending_solutions: BTreeMap::new(),
137 pot_checkpoints: BTreeMap::new(),
138 consensus_constants,
139 pot_verifier,
140 _pos_table: PhantomData,
141 }
142 }
143
144 pub async fn run(mut self, mut slot_info_stream: PotSlotInfoStream) {
146 let mut last_processed_slot = SlotNumber::ZERO;
147
148 loop {
149 let PotSlotInfo { slot, checkpoints } = match slot_info_stream.recv().await {
150 Ok(slot_info) => slot_info,
151 Err(error) => match error {
152 broadcast::error::RecvError::Closed => {
153 info!("No Slot info senders available. Exiting slot worker.");
154 return;
155 }
156 broadcast::error::RecvError::Lagged(skipped_notifications) => {
157 debug!(
158 "Slot worker is lagging. Skipped {} slot notification(s)",
159 skipped_notifications
160 );
161 continue;
162 }
163 },
164 };
165
166 if last_processed_slot >= slot {
167 continue;
169 }
170 last_processed_slot = slot;
171
172 let best_beacon_chain_header = self.beacon_chain_info.best_header();
173 let best_beacon_chain_header = best_beacon_chain_header.header();
174
175 {
177 self.pot_checkpoints
179 .retain(|&stored_slot, _checkpoints| stored_slot < slot);
180
181 self.pot_checkpoints.insert(slot, checkpoints);
182 }
183
184 if self.chain_sync_status.is_syncing() {
185 debug!(%slot, "Skipping farming due to syncing");
186 return;
187 }
188
189 let shard_membership_entropy = match shard_membership_entropy_source(
195 best_beacon_chain_header.prefix.number + BlockNumber::ONE,
196 best_beacon_chain_header,
197 self.consensus_constants.shard_rotation_interval,
198 self.consensus_constants.shard_rotation_delay,
199 &self.beacon_chain_info,
200 ) {
201 Ok(shard_membership_entropy) => shard_membership_entropy,
202 Err(error) => {
203 error!(%error, "Failed to find shard membership entropy");
204 break;
205 }
206 };
207
208 let proof_of_time = checkpoints.output();
209
210 {
212 let consensus_parameters = best_beacon_chain_header.consensus_parameters();
213 let solution_range = consensus_parameters
217 .next_solution_range
218 .unwrap_or(consensus_parameters.fixed_parameters.solution_range);
219 let new_slot_info = NewSlotInfo {
220 slot,
221 proof_of_time,
222 solution_range,
223 shard_membership_entropy,
224 num_shards: consensus_parameters.fixed_parameters.num_shards,
225 };
226 let (solution_sender, solution_receiver) =
227 mpsc::channel(PENDING_SOLUTIONS_CHANNEL_CAPACITY);
228
229 if let Err(error) =
230 self.new_slot_notification_sender
231 .try_send(NewSlotNotification {
232 new_slot_info,
233 solution_sender,
234 })
235 {
236 warn!(%error, "Failed to send a new slot notification");
237 }
238
239 self.pending_solutions.insert(slot, solution_receiver);
240 }
241
242 let Some(slot_to_claim) =
244 slot.checked_sub(self.consensus_constants.block_authoring_delay)
245 else {
246 trace!("Skipping a very early slot during chain start");
247 continue;
248 };
249
250 if !self.force_authoring && self.chain_sync_status.is_offline() {
251 debug!("Skipping slot, waiting for the network");
252
253 continue;
254 }
255
256 let Some(claimed_slot) = self
257 .claim_slot(best_beacon_chain_header, slot_to_claim)
258 .await
259 else {
260 continue;
261 };
262
263 debug!(
264 slot = %claimed_slot.consensus_info.slot,
265 "Starting block authorship"
266 );
267
268 let seal_block = {
269 let block_sealing_notification_sender = &mut self.block_sealing_notification_sender;
270 let public_key_hash = claimed_slot.consensus_info.solution.public_key_hash;
271
272 move |pre_seal_hash| async move {
273 let (seal_sender, seal_receiver) = oneshot::channel::<OwnedBlockHeaderSeal>();
274
275 if let Err(error) =
276 block_sealing_notification_sender.try_send(BlockSealNotification {
277 pre_seal_hash,
278 public_key_hash,
279 seal_sender,
280 })
281 {
282 warn!(%error, "Failed to send block sealing notification");
283 }
284
285 match tokio::time::timeout(BLOCK_SEALING_TIMEOUT, seal_receiver).await {
286 Ok(Ok(seal)) => Some(seal),
287 _ => None,
288 }
289 }
290 };
291
292 self.block_producer
295 .produce_block(claimed_slot, best_beacon_chain_header, seal_block)
296 .send()
297 .await;
298 }
299 }
300
301 async fn claim_slot(
302 &mut self,
303 parent_beacon_chain_header: &BeaconChainHeader<'_>,
304 slot: SlotNumber,
305 ) -> Option<ClaimedSlot> {
306 let parent_number = parent_beacon_chain_header.prefix.number;
307 let parent_slot = parent_beacon_chain_header.consensus_info.slot;
308
309 if slot <= parent_slot {
310 debug!(
311 "Skipping claiming slot {slot}, it must be higher than parent slot {parent_slot}",
312 );
313
314 return None;
315 } else {
316 debug!(%slot, "Attempting to claim a slot");
317 }
318
319 let parent_consensus_parameters = parent_beacon_chain_header.consensus_parameters();
320
321 let parent_pot_parameters_change = parent_consensus_parameters
322 .pot_parameters_change
323 .copied()
324 .map(PotParametersChange::from);
325 let parent_future_slot = if parent_number == BlockNumber::ZERO {
326 parent_slot
327 } else {
328 parent_slot + self.consensus_constants.block_authoring_delay
329 };
330
331 let (proof_of_time, future_proof_of_time, checkpoints) = {
332 self.pot_checkpoints
334 .retain(|&stored_slot, _checkpoints| stored_slot > parent_slot);
335
336 let proof_of_time = self.pot_checkpoints.get(&slot)?.output();
337
338 let future_slot = slot + self.consensus_constants.block_authoring_delay;
340
341 let pot_input = if parent_number == BlockNumber::ZERO {
342 PotNextSlotInput {
343 slot: parent_slot + SlotNumber::ONE,
344 slot_iterations: parent_consensus_parameters.fixed_parameters.slot_iterations,
345 seed: self.pot_verifier.genesis_seed(),
346 }
347 } else {
348 PotNextSlotInput::derive(
349 parent_consensus_parameters.fixed_parameters.slot_iterations,
350 parent_slot,
351 parent_beacon_chain_header.consensus_info.proof_of_time,
352 &parent_pot_parameters_change,
353 )
354 };
355
356 if !self.pot_verifier.is_output_valid(
358 pot_input,
359 slot - parent_slot,
360 proof_of_time,
361 parent_pot_parameters_change,
362 ) {
363 warn!(
364 %slot,
365 ?pot_input,
366 consensus_info = ?parent_beacon_chain_header.consensus_info,
367 "Proof of time is invalid, skipping block authoring at the slot"
368 );
369 return None;
370 }
371
372 let mut checkpoints_pot_input = if parent_number == BlockNumber::ZERO {
373 PotNextSlotInput {
374 slot: parent_slot + SlotNumber::ONE,
375 slot_iterations: parent_consensus_parameters.fixed_parameters.slot_iterations,
376 seed: self.pot_verifier.genesis_seed(),
377 }
378 } else {
379 PotNextSlotInput::derive(
380 parent_consensus_parameters.fixed_parameters.slot_iterations,
381 parent_future_slot,
382 parent_beacon_chain_header
383 .consensus_info
384 .future_proof_of_time,
385 &parent_pot_parameters_change,
386 )
387 };
388
389 let mut checkpoints =
390 Vec::with_capacity((future_slot - parent_future_slot).as_u64() as usize);
391
392 for slot in parent_future_slot + SlotNumber::ONE..=future_slot {
393 let maybe_slot_checkpoints = self.pot_verifier.get_checkpoints(
394 checkpoints_pot_input.slot_iterations,
395 checkpoints_pot_input.seed,
396 );
397 let Some(slot_checkpoints) = maybe_slot_checkpoints else {
398 warn!("Proving failed during block authoring");
399 return None;
400 };
401
402 checkpoints.push(slot_checkpoints);
403
404 checkpoints_pot_input = PotNextSlotInput::derive(
405 checkpoints_pot_input.slot_iterations,
406 slot,
407 slot_checkpoints.output(),
408 &parent_pot_parameters_change,
409 );
410 }
411
412 let future_proof_of_time = checkpoints
413 .last()
414 .expect("Never empty, there is at least one slot between blocks; qed")
415 .output();
416
417 (proof_of_time, future_proof_of_time, checkpoints)
418 };
419
420 let mut solution_receiver = {
421 self.pending_solutions
423 .retain(|&stored_slot, _solution_receiver| stored_slot >= slot);
424
425 let mut solution_receiver = self.pending_solutions.remove(&slot)?;
426 solution_receiver.close();
428 solution_receiver
429 };
430
431 let mut maybe_consensus_info = None;
432
433 while let Some(solution) = solution_receiver.next().await {
434 if maybe_consensus_info.is_none() {
435 debug!(%slot, "🚜 Claimed slot");
436 maybe_consensus_info.replace(BlockHeaderConsensusInfo {
437 slot,
438 proof_of_time,
439 future_proof_of_time,
440 solution,
441 });
442 } else {
443 debug!(
444 %slot,
445 "Skipping a solution that has quality sufficient for block because \
446 slot has already been claimed",
447 );
448 }
449 }
450
451 maybe_consensus_info.map(|consensus_info| ClaimedSlot {
452 consensus_info,
453 checkpoints,
454 })
455 }
456}