1use crate::{BlockProducer, ClaimedSlot};
5use ab_client_api::{ChainInfo, ChainSyncStatus};
6use ab_client_consensus_common::ConsensusConstants;
7use ab_client_proof_of_time::PotNextSlotInput;
8use ab_client_proof_of_time::source::{PotSlotInfo, PotSlotInfoStream};
9use ab_client_proof_of_time::verifier::PotVerifier;
10use ab_core_primitives::block::BlockNumber;
11use ab_core_primitives::block::header::{
12 BeaconChainHeader, BlockHeaderConsensusInfo, OwnedBlockHeaderSeal,
13};
14use ab_core_primitives::block::owned::OwnedBeaconChainBlock;
15use ab_core_primitives::hashes::Blake3Hash;
16use ab_core_primitives::pot::{PotCheckpoints, PotOutput, PotParametersChange, SlotNumber};
17use ab_core_primitives::sectors::SectorId;
18use ab_core_primitives::segments::HistorySize;
19use ab_core_primitives::solutions::{
20 Solution, SolutionRange, SolutionVerifyError, SolutionVerifyParams,
21 SolutionVerifyPieceCheckParams,
22};
23use ab_proof_of_space::Table;
24use futures::StreamExt;
25use futures::channel::{mpsc, oneshot};
26use send_future::SendFuture;
27use std::collections::BTreeMap;
28use std::marker::PhantomData;
29use std::time::Duration;
30use tokio::sync::broadcast;
31use tracing::{debug, info, trace, warn};
32
33const PENDING_SOLUTIONS_CHANNEL_CAPACITY: usize = 10;
35const BLOCK_SEALING_TIMEOUT: Duration = Duration::from_millis(500);
36
37#[derive(Debug, Copy, Clone)]
39pub struct NewSlotInfo {
40 pub slot: SlotNumber,
42 pub proof_of_time: PotOutput,
44 pub solution_range: SolutionRange,
46}
47
48#[derive(Debug, Clone)]
50pub struct NewSlotNotification {
51 pub new_slot_info: NewSlotInfo,
53 pub solution_sender: mpsc::Sender<Solution>,
55}
56#[derive(Debug)]
59pub struct BlockSealNotification {
60 pub pre_seal_hash: Blake3Hash,
62 pub public_key_hash: Blake3Hash,
64 pub seal_sender: oneshot::Sender<OwnedBlockHeaderSeal>,
66}
67
68#[derive(Debug)]
70pub struct SlotWorkerOptions<BP, BCI, CSS> {
71 pub block_producer: BP,
73 pub beacon_chain_info: BCI,
75 pub chain_sync_status: CSS,
77 pub force_authoring: bool,
79 pub new_slot_notification_sender: mpsc::Sender<NewSlotNotification>,
81 pub block_sealing_notification_sender: mpsc::Sender<BlockSealNotification>,
83 pub consensus_constants: ConsensusConstants,
85 pub pot_verifier: PotVerifier,
87}
88
89#[derive(Debug)]
91pub struct SlotWorker<PosTable, BP, BCI, CSS> {
92 block_producer: BP,
93 beacon_chain_info: BCI,
94 chain_sync_status: CSS,
95 force_authoring: bool,
96 new_slot_notification_sender: mpsc::Sender<NewSlotNotification>,
97 block_sealing_notification_sender: mpsc::Sender<BlockSealNotification>,
98 pending_solutions: BTreeMap<SlotNumber, mpsc::Receiver<Solution>>,
101 pot_checkpoints: BTreeMap<SlotNumber, PotCheckpoints>,
103 consensus_constants: ConsensusConstants,
104 pot_verifier: PotVerifier,
105 _pos_table: PhantomData<PosTable>,
106}
107
108impl<PosTable, BP, BCI, CSS> SlotWorker<PosTable, BP, BCI, CSS>
109where
110 PosTable: Table,
111 BP: BlockProducer,
112 BCI: ChainInfo<OwnedBeaconChainBlock>,
113 CSS: ChainSyncStatus,
114{
115 pub fn new(
117 SlotWorkerOptions {
118 block_producer,
119 beacon_chain_info,
120 chain_sync_status,
121 force_authoring,
122 new_slot_notification_sender,
123 block_sealing_notification_sender,
124 consensus_constants,
125 pot_verifier,
126 }: SlotWorkerOptions<BP, BCI, CSS>,
127 ) -> Self {
128 Self {
129 block_producer,
130 beacon_chain_info,
131 chain_sync_status,
132 force_authoring,
133 new_slot_notification_sender,
134 block_sealing_notification_sender,
135 pending_solutions: BTreeMap::new(),
136 pot_checkpoints: BTreeMap::new(),
137 consensus_constants,
138 pot_verifier,
139 _pos_table: PhantomData,
140 }
141 }
142
143 pub async fn run(mut self, mut slot_info_stream: PotSlotInfoStream) {
145 let mut maybe_last_processed_slot = None;
146
147 loop {
148 let PotSlotInfo { slot, checkpoints } = match slot_info_stream.recv().await {
149 Ok(slot_info) => slot_info,
150 Err(error) => match error {
151 broadcast::error::RecvError::Closed => {
152 info!("No Slot info senders available. Exiting slot worker.");
153 return;
154 }
155 broadcast::error::RecvError::Lagged(skipped_notifications) => {
156 debug!(
157 "Slot worker is lagging. Skipped {} slot notification(s)",
158 skipped_notifications
159 );
160 continue;
161 }
162 },
163 };
164 if let Some(last_processed_slot) = maybe_last_processed_slot
165 && last_processed_slot >= slot
166 {
167 continue;
169 }
170 maybe_last_processed_slot.replace(slot);
171
172 self.store_checkpoints(slot, checkpoints);
173
174 let best_beacon_chain_header = self.beacon_chain_info.best_header();
175 let best_beacon_chain_header = best_beacon_chain_header.header();
176 self.on_new_slot(slot, checkpoints, best_beacon_chain_header);
177
178 if self.chain_sync_status.is_syncing() {
179 debug!(%slot, "Skipping proposal slot due to sync");
180 continue;
181 }
182
183 let Some(slot_to_claim) =
185 slot.checked_sub(self.consensus_constants.block_authoring_delay)
186 else {
187 trace!("Skipping a very early slot during chain start");
188 continue;
189 };
190
191 if !self.force_authoring && self.chain_sync_status.is_offline() {
192 debug!("Skipping slot, waiting for the network");
193
194 continue;
195 }
196
197 let Some(claimed_slot) = self
198 .claim_slot(best_beacon_chain_header, slot_to_claim)
199 .await
200 else {
201 continue;
202 };
203
204 debug!(
205 slot = %claimed_slot.consensus_info.slot,
206 "Starting block authorship"
207 );
208
209 let seal_block = {
210 let block_sealing_notification_sender = &mut self.block_sealing_notification_sender;
211 let public_key_hash = claimed_slot.consensus_info.solution.public_key_hash;
212
213 move |pre_seal_hash| async move {
214 let (seal_sender, seal_receiver) = oneshot::channel::<OwnedBlockHeaderSeal>();
215
216 if let Err(error) =
217 block_sealing_notification_sender.try_send(BlockSealNotification {
218 pre_seal_hash,
219 public_key_hash,
220 seal_sender,
221 })
222 {
223 warn!(%error, "Failed to send block sealing notification");
224 }
225
226 match tokio::time::timeout(BLOCK_SEALING_TIMEOUT, seal_receiver).await {
227 Ok(Ok(seal)) => Some(seal),
228 _ => None,
229 }
230 }
231 };
232
233 self.block_producer
236 .produce_block(claimed_slot, best_beacon_chain_header, seal_block)
237 .send()
238 .await;
239 }
240 }
241
242 fn store_checkpoints(&mut self, slot: SlotNumber, checkpoints: PotCheckpoints) {
244 self.pot_checkpoints
246 .retain(|&stored_slot, _checkpoints| stored_slot < slot);
247
248 self.pot_checkpoints.insert(slot, checkpoints);
249 }
250
251 fn on_new_slot(
253 &mut self,
254 slot: SlotNumber,
255 checkpoints: PotCheckpoints,
256 best_beacon_chain_header: &BeaconChainHeader<'_>,
257 ) {
258 if self.chain_sync_status.is_syncing() {
259 debug!("Skipping farming slot {slot} due to sync");
260 return;
261 }
262
263 let proof_of_time = checkpoints.output();
264
265 let solution_range = best_beacon_chain_header
268 .consensus_parameters()
269 .next_solution_range
270 .unwrap_or(
271 best_beacon_chain_header
272 .consensus_parameters()
273 .fixed_parameters
274 .solution_range,
275 );
276 let new_slot_info = NewSlotInfo {
277 slot,
278 proof_of_time,
279 solution_range,
280 };
281 let (solution_sender, solution_receiver) =
282 mpsc::channel(PENDING_SOLUTIONS_CHANNEL_CAPACITY);
283
284 if let Err(error) = self
285 .new_slot_notification_sender
286 .try_send(NewSlotNotification {
287 new_slot_info,
288 solution_sender,
289 })
290 {
291 warn!(%error, "Failed to send a new slot notification");
292 }
293
294 self.pending_solutions.insert(slot, solution_receiver);
295 }
296 async fn claim_slot(
297 &mut self,
298 parent_beacon_chain_header: &BeaconChainHeader<'_>,
299 slot: SlotNumber,
300 ) -> Option<ClaimedSlot> {
301 let parent_number = parent_beacon_chain_header.prefix.number;
302 let parent_slot = parent_beacon_chain_header.consensus_info.slot;
303
304 if slot <= parent_slot {
305 debug!(
306 "Skipping claiming slot {slot}, it must be higher than parent slot {parent_slot}",
307 );
308
309 return None;
310 } else {
311 debug!(%slot, "Attempting to claim a slot");
312 }
313
314 let parent_consensus_parameters = parent_beacon_chain_header.consensus_parameters();
315
316 let solution_range = parent_consensus_parameters
317 .next_solution_range
318 .unwrap_or(parent_consensus_parameters.fixed_parameters.solution_range);
319
320 let parent_pot_parameters_change = parent_consensus_parameters
321 .pot_parameters_change
322 .copied()
323 .map(PotParametersChange::from);
324 let parent_future_slot = if parent_number == BlockNumber::ZERO {
325 parent_slot
326 } else {
327 parent_slot + self.consensus_constants.block_authoring_delay
328 };
329
330 let (proof_of_time, future_proof_of_time, checkpoints) = {
331 self.pot_checkpoints
333 .retain(|&stored_slot, _checkpoints| stored_slot > parent_slot);
334
335 let proof_of_time = self.pot_checkpoints.get(&slot)?.output();
336
337 let future_slot = slot + self.consensus_constants.block_authoring_delay;
339
340 let pot_input = if parent_number == BlockNumber::ZERO {
341 PotNextSlotInput {
342 slot: parent_slot + SlotNumber::ONE,
343 slot_iterations: parent_consensus_parameters.fixed_parameters.slot_iterations,
344 seed: self.pot_verifier.genesis_seed(),
345 }
346 } else {
347 PotNextSlotInput::derive(
348 parent_consensus_parameters.fixed_parameters.slot_iterations,
349 parent_slot,
350 parent_beacon_chain_header.consensus_info.proof_of_time,
351 &parent_pot_parameters_change,
352 )
353 };
354
355 if !self.pot_verifier.is_output_valid(
357 pot_input,
358 slot - parent_slot,
359 proof_of_time,
360 parent_pot_parameters_change,
361 ) {
362 warn!(
363 %slot,
364 ?pot_input,
365 consensus_info = ?parent_beacon_chain_header.consensus_info,
366 "Proof of time is invalid, skipping block authoring at the slot"
367 );
368 return None;
369 }
370
371 let mut checkpoints_pot_input = if parent_number == BlockNumber::ZERO {
372 PotNextSlotInput {
373 slot: parent_slot + SlotNumber::ONE,
374 slot_iterations: parent_consensus_parameters.fixed_parameters.slot_iterations,
375 seed: self.pot_verifier.genesis_seed(),
376 }
377 } else {
378 PotNextSlotInput::derive(
379 parent_consensus_parameters.fixed_parameters.slot_iterations,
380 parent_future_slot,
381 parent_beacon_chain_header
382 .consensus_info
383 .future_proof_of_time,
384 &parent_pot_parameters_change,
385 )
386 };
387
388 let mut checkpoints =
389 Vec::with_capacity((future_slot - parent_future_slot).as_u64() as usize);
390
391 for slot in parent_future_slot + SlotNumber::ONE..=future_slot {
392 let maybe_slot_checkpoints = self.pot_verifier.get_checkpoints(
393 checkpoints_pot_input.slot_iterations,
394 checkpoints_pot_input.seed,
395 );
396 let Some(slot_checkpoints) = maybe_slot_checkpoints else {
397 warn!("Proving failed during block authoring");
398 return None;
399 };
400
401 checkpoints.push(slot_checkpoints);
402
403 checkpoints_pot_input = PotNextSlotInput::derive(
404 checkpoints_pot_input.slot_iterations,
405 slot,
406 slot_checkpoints.output(),
407 &parent_pot_parameters_change,
408 );
409 }
410
411 let future_proof_of_time = checkpoints
412 .last()
413 .expect("Never empty, there is at least one slot between blocks; qed")
414 .output();
415
416 (proof_of_time, future_proof_of_time, checkpoints)
417 };
418
419 let mut solution_receiver = {
420 self.pending_solutions
422 .retain(|&stored_slot, _solution_receiver| stored_slot >= slot);
423
424 let mut solution_receiver = self.pending_solutions.remove(&slot)?;
425 solution_receiver.close();
427 solution_receiver
428 };
429
430 let mut maybe_consensus_info = None;
431
432 while let Some(solution) = solution_receiver.next().await {
434 let sector_id = SectorId::new(
435 &solution.public_key_hash,
436 solution.sector_index,
437 solution.history_size,
438 );
439
440 let history_size = HistorySize::ONE;
444 let max_pieces_in_sector = 1000;
445
446 let segment_index = sector_id
447 .derive_piece_index(
448 solution.piece_offset,
449 solution.history_size,
450 max_pieces_in_sector,
451 self.consensus_constants.recent_segments,
452 self.consensus_constants.recent_history_fraction,
453 )
454 .segment_index();
455 let maybe_segment_root = self
456 .beacon_chain_info
457 .get_segment_header(segment_index)
458 .map(|segment_header| segment_header.segment_root);
459
460 let segment_root = match maybe_segment_root {
461 Some(segment_root) => segment_root,
462 None => {
463 warn!(
464 %slot,
465 %segment_index,
466 "Segment root not found",
467 );
468 continue;
469 }
470 };
471 let sector_expiration_check_segment_index = match solution
472 .history_size
473 .sector_expiration_check(self.consensus_constants.min_sector_lifetime)
474 {
475 Some(sector_expiration_check) => sector_expiration_check.segment_index(),
476 None => {
477 continue;
478 }
479 };
480 let sector_expiration_check_segment_root = self
481 .beacon_chain_info
482 .get_segment_header(sector_expiration_check_segment_index)
483 .map(|segment_header| segment_header.segment_root);
484
485 let solution_verification_result = solution.verify::<PosTable>(
486 slot,
487 &SolutionVerifyParams {
488 proof_of_time,
489 solution_range,
490 piece_check_params: Some(SolutionVerifyPieceCheckParams {
491 max_pieces_in_sector,
492 segment_root,
493 recent_segments: self.consensus_constants.recent_segments,
494 recent_history_fraction: self.consensus_constants.recent_history_fraction,
495 min_sector_lifetime: self.consensus_constants.min_sector_lifetime,
496 current_history_size: history_size,
497 sector_expiration_check_segment_root,
498 }),
499 },
500 );
501
502 match solution_verification_result {
503 Ok(()) => {
504 if maybe_consensus_info.is_none() {
505 debug!(%slot, "🚜 Claimed slot");
506 maybe_consensus_info.replace(BlockHeaderConsensusInfo {
507 slot,
508 proof_of_time,
509 future_proof_of_time,
510 solution,
511 });
512 } else {
513 debug!(
514 %slot,
515 "Skipping a solution that has quality sufficient for block because \
516 slot has already been claimed",
517 );
518 }
519 }
520 Err(error @ SolutionVerifyError::OutsideSolutionRange { .. }) => {
521 if parent_consensus_parameters.next_solution_range.is_some() {
524 debug!(
525 %slot,
526 %error,
527 "Invalid solution received",
528 );
529 } else {
530 warn!(
531 %slot,
532 %error,
533 "Invalid solution received",
534 );
535 }
536 }
537 Err(error) => {
538 warn!(
539 %slot,
540 %error,
541 "Invalid solution received",
542 );
543 }
544 }
545 }
546
547 maybe_consensus_info.map(|consensus_info| ClaimedSlot {
548 consensus_info,
549 checkpoints,
550 })
551 }
552}