1use ab_client_api::{BlockOrigin, ChainInfo, ChainSyncStatus};
5use ab_client_archiving::segment_headers_store::SegmentHeadersStore;
6use ab_client_block_builder::BlockBuilder;
7use ab_client_block_import::BlockImport;
8use ab_client_consensus_common::ConsensusConstants;
9use ab_client_proof_of_time::PotNextSlotInput;
10use ab_client_proof_of_time::source::{PotSlotInfo, PotSlotInfoStream};
11use ab_client_proof_of_time::verifier::PotVerifier;
12use ab_core_primitives::block::header::owned::GenericOwnedBlockHeader;
13use ab_core_primitives::block::header::{
14 BeaconChainHeader, BlockHeaderConsensusInfo, GenericBlockHeader, OwnedBlockHeaderSeal,
15 SharedBlockHeader,
16};
17use ab_core_primitives::block::owned::{GenericOwnedBlock, OwnedBeaconChainBlock};
18use ab_core_primitives::block::{BlockNumber, BlockRoot};
19use ab_core_primitives::hashes::Blake3Hash;
20use ab_core_primitives::pot::{PotCheckpoints, PotOutput, PotParametersChange, SlotNumber};
21use ab_core_primitives::sectors::SectorId;
22use ab_core_primitives::segments::HistorySize;
23use ab_core_primitives::solutions::{
24 Solution, SolutionRange, SolutionVerifyError, SolutionVerifyParams,
25 SolutionVerifyPieceCheckParams,
26};
27use ab_proof_of_space::Table;
28use futures::StreamExt;
29use futures::channel::{mpsc, oneshot};
30use std::collections::BTreeMap;
31use std::marker::PhantomData;
32use tokio::sync::broadcast;
33use tracing::{debug, error, info, trace, warn};
34
35const PENDING_SOLUTIONS_CHANNEL_CAPACITY: usize = 10;
37
38#[derive(Debug, Copy, Clone)]
40pub struct NewSlotInfo {
41 pub slot: SlotNumber,
43 pub proof_of_time: PotOutput,
45 pub solution_range: SolutionRange,
47}
48
49#[derive(Debug, Clone)]
51pub struct NewSlotNotification {
52 pub new_slot_info: NewSlotInfo,
54 pub solution_sender: mpsc::Sender<Solution>,
56}
57#[derive(Debug)]
60pub struct BlockSealNotification {
61 pub pre_seal_hash: Blake3Hash,
63 pub public_key_hash: Blake3Hash,
65 pub seal_sender: oneshot::Sender<OwnedBlockHeaderSeal>,
67}
68
69#[derive(Debug)]
70pub struct ClaimedSlot {
71 pub consensus_info: BlockHeaderConsensusInfo,
73 pub checkpoints: Vec<PotCheckpoints>,
76}
77
78#[derive(Debug)]
80pub struct SubspaceSlotWorkerOptions<BB, BI, BCI, CI, CSS> {
81 pub block_builder: BB,
83 pub block_import: BI,
85 pub beacon_chain_info: BCI,
87 pub chain_info: CI,
89 pub chain_sync_status: CSS,
91 pub force_authoring: bool,
93 pub new_slot_notification_sender: mpsc::Sender<NewSlotNotification>,
95 pub block_sealing_notification_sender: mpsc::Sender<BlockSealNotification>,
97 pub segment_headers_store: SegmentHeadersStore,
100 pub consensus_constants: ConsensusConstants,
102 pub pot_verifier: PotVerifier,
104}
105
106#[derive(Debug)]
108pub struct SubspaceSlotWorker<PosTable, Block, BB, BI, BCI, CI, CSS> {
109 block_builder: BB,
110 block_import: BI,
111 beacon_chain_info: BCI,
112 chain_info: CI,
113 chain_sync_status: CSS,
114 force_authoring: bool,
115 new_slot_notification_sender: mpsc::Sender<NewSlotNotification>,
116 block_sealing_notification_sender: mpsc::Sender<BlockSealNotification>,
117 segment_headers_store: SegmentHeadersStore,
118 pending_solutions: BTreeMap<SlotNumber, mpsc::Receiver<Solution>>,
121 pot_checkpoints: BTreeMap<SlotNumber, PotCheckpoints>,
123 consensus_constants: ConsensusConstants,
124 pot_verifier: PotVerifier,
125 _pos_table: PhantomData<(PosTable, Block)>,
126}
127
128impl<PosTable, Block, BB, BI, BCI, CI, CSS>
129 SubspaceSlotWorker<PosTable, Block, BB, BI, BCI, CI, CSS>
130where
131 PosTable: Table,
132 Block: GenericOwnedBlock,
133 BB: BlockBuilder<Block>,
134 BI: BlockImport<Block>,
135 BCI: ChainInfo<OwnedBeaconChainBlock>,
136 CI: ChainInfo<Block>,
137 CSS: ChainSyncStatus,
138{
139 async fn claim_slot(
140 &mut self,
141 _parent_block_root: &BlockRoot,
142 parent_header: &SharedBlockHeader<'_>,
143 parent_beacon_chain_header: &BeaconChainHeader<'_>,
144 slot: SlotNumber,
145 ) -> Option<ClaimedSlot> {
146 let parent_number = parent_header.prefix.number;
147 let parent_slot = parent_header.consensus_info.slot;
148
149 if slot <= parent_slot {
150 debug!(
151 "Skipping claiming slot {slot} it must be higher than parent slot {parent_slot}",
152 );
153
154 return None;
155 } else {
156 debug!(%slot, "Attempting to claim slot");
157 }
158
159 let parent_consensus_parameters = parent_beacon_chain_header.consensus_parameters();
160
161 let solution_range = parent_consensus_parameters
162 .next_solution_range
163 .unwrap_or(parent_consensus_parameters.fixed_parameters.solution_range);
164
165 let parent_pot_parameters_change = parent_consensus_parameters
166 .pot_parameters_change
167 .copied()
168 .map(PotParametersChange::from);
169 let parent_future_slot = if parent_number == BlockNumber::ZERO {
170 parent_slot
171 } else {
172 parent_slot + self.consensus_constants.block_authoring_delay
173 };
174
175 let (proof_of_time, future_proof_of_time, checkpoints) = {
176 self.pot_checkpoints
178 .retain(|&stored_slot, _checkpoints| stored_slot > parent_slot);
179
180 let proof_of_time = self.pot_checkpoints.get(&slot)?.output();
181
182 let future_slot = slot + self.consensus_constants.block_authoring_delay;
184
185 let pot_input = if parent_number == BlockNumber::ZERO {
186 PotNextSlotInput {
187 slot: parent_slot + SlotNumber::ONE,
188 slot_iterations: parent_consensus_parameters.fixed_parameters.slot_iterations,
189 seed: self.pot_verifier.genesis_seed(),
190 }
191 } else {
192 PotNextSlotInput::derive(
193 parent_consensus_parameters.fixed_parameters.slot_iterations,
194 parent_slot,
195 parent_header.consensus_info.proof_of_time,
196 &parent_pot_parameters_change,
197 )
198 };
199
200 if !self.pot_verifier.is_output_valid(
202 pot_input,
203 slot - parent_slot,
204 proof_of_time,
205 parent_pot_parameters_change,
206 ) {
207 warn!(
208 %slot,
209 ?pot_input,
210 consensus_info = ?parent_header.consensus_info,
211 "Proof of time is invalid, skipping block authoring at slot"
212 );
213 return None;
214 }
215
216 let mut checkpoints_pot_input = if parent_number == BlockNumber::ZERO {
217 PotNextSlotInput {
218 slot: parent_slot + SlotNumber::ONE,
219 slot_iterations: parent_consensus_parameters.fixed_parameters.slot_iterations,
220 seed: self.pot_verifier.genesis_seed(),
221 }
222 } else {
223 PotNextSlotInput::derive(
224 parent_consensus_parameters.fixed_parameters.slot_iterations,
225 parent_future_slot,
226 parent_header.consensus_info.future_proof_of_time,
227 &parent_pot_parameters_change,
228 )
229 };
230
231 let mut checkpoints =
232 Vec::with_capacity((future_slot - parent_future_slot).as_u64() as usize);
233
234 for slot in parent_future_slot + SlotNumber::ONE..=future_slot {
235 let maybe_slot_checkpoints = self.pot_verifier.get_checkpoints(
236 checkpoints_pot_input.slot_iterations,
237 checkpoints_pot_input.seed,
238 );
239 let Some(slot_checkpoints) = maybe_slot_checkpoints else {
240 warn!("Proving failed during block authoring");
241 return None;
242 };
243
244 checkpoints.push(slot_checkpoints);
245
246 checkpoints_pot_input = PotNextSlotInput::derive(
247 checkpoints_pot_input.slot_iterations,
248 slot,
249 slot_checkpoints.output(),
250 &parent_pot_parameters_change,
251 );
252 }
253
254 let future_proof_of_time = checkpoints
255 .last()
256 .expect("Never empty, there is at least one slot between blocks; qed")
257 .output();
258
259 (proof_of_time, future_proof_of_time, checkpoints)
260 };
261
262 let mut solution_receiver = {
263 self.pending_solutions
265 .retain(|&stored_slot, _solution_receiver| stored_slot >= slot);
266
267 let mut solution_receiver = self.pending_solutions.remove(&slot)?;
268 solution_receiver.close();
270 solution_receiver
271 };
272
273 let mut maybe_consensus_info = None;
274
275 while let Some(solution) = solution_receiver.next().await {
277 let sector_id = SectorId::new(
278 &solution.public_key_hash,
279 solution.sector_index,
280 solution.history_size,
281 );
282
283 let history_size = HistorySize::ONE;
287 let max_pieces_in_sector = 1000;
288
289 let segment_index = sector_id
290 .derive_piece_index(
291 solution.piece_offset,
292 solution.history_size,
293 max_pieces_in_sector,
294 self.consensus_constants.recent_segments,
295 self.consensus_constants.recent_history_fraction,
296 )
297 .segment_index();
298 let maybe_segment_root = self
299 .segment_headers_store
300 .get_segment_header(segment_index)
301 .map(|segment_header| segment_header.segment_root);
302
303 let segment_root = match maybe_segment_root {
304 Some(segment_root) => segment_root,
305 None => {
306 warn!(
307 %slot,
308 %segment_index,
309 "Segment root not found",
310 );
311 continue;
312 }
313 };
314 let sector_expiration_check_segment_index = match solution
315 .history_size
316 .sector_expiration_check(self.consensus_constants.min_sector_lifetime)
317 {
318 Some(sector_expiration_check) => sector_expiration_check.segment_index(),
319 None => {
320 continue;
321 }
322 };
323 let sector_expiration_check_segment_root = self
324 .segment_headers_store
325 .get_segment_header(sector_expiration_check_segment_index)
326 .map(|segment_header| segment_header.segment_root);
327
328 let solution_verification_result = solution.verify::<PosTable>(
329 slot,
330 &SolutionVerifyParams {
331 proof_of_time,
332 solution_range,
333 piece_check_params: Some(SolutionVerifyPieceCheckParams {
334 max_pieces_in_sector,
335 segment_root,
336 recent_segments: self.consensus_constants.recent_segments,
337 recent_history_fraction: self.consensus_constants.recent_history_fraction,
338 min_sector_lifetime: self.consensus_constants.min_sector_lifetime,
339 current_history_size: history_size,
340 sector_expiration_check_segment_root,
341 }),
342 },
343 );
344
345 match solution_verification_result {
346 Ok(()) => {
347 if maybe_consensus_info.is_none() {
348 info!(%slot, "🚜 Claimed slot");
349 maybe_consensus_info.replace(BlockHeaderConsensusInfo {
350 slot,
351 proof_of_time,
352 future_proof_of_time,
353 solution,
354 });
355 } else {
356 info!(
357 %slot,
358 "Skipping solution that has quality sufficient for block because \
359 slot has already been claimed",
360 );
361 }
362 }
363 Err(error @ SolutionVerifyError::OutsideSolutionRange { .. }) => {
364 if parent_consensus_parameters.next_solution_range.is_some() {
367 debug!(
368 %slot,
369 %error,
370 "Invalid solution received",
371 );
372 } else {
373 warn!(
374 %slot,
375 %error,
376 "Invalid solution received",
377 );
378 }
379 }
380 Err(error) => {
381 warn!(
382 %slot,
383 %error,
384 "Invalid solution received",
385 );
386 }
387 }
388 }
389
390 maybe_consensus_info.map(|consensus_info| ClaimedSlot {
391 consensus_info,
392 checkpoints,
393 })
394 }
395
396 pub fn new(
398 SubspaceSlotWorkerOptions {
399 block_builder,
400 block_import,
401 beacon_chain_info,
402 chain_info,
403 chain_sync_status,
404 force_authoring,
405 new_slot_notification_sender,
406 block_sealing_notification_sender,
407 segment_headers_store,
408 consensus_constants,
409 pot_verifier,
410 }: SubspaceSlotWorkerOptions<BB, BI, BCI, CI, CSS>,
411 ) -> Self {
412 Self {
413 block_builder,
414 block_import,
415 beacon_chain_info,
416 chain_info,
417 chain_sync_status,
418 force_authoring,
419 new_slot_notification_sender,
420 block_sealing_notification_sender,
421 segment_headers_store,
422 pending_solutions: BTreeMap::new(),
423 pot_checkpoints: BTreeMap::new(),
424 consensus_constants,
425 pot_verifier,
426 _pos_table: PhantomData,
427 }
428 }
429
430 pub async fn run(mut self, mut slot_info_stream: PotSlotInfoStream) {
432 let mut maybe_last_processed_slot = None;
433
434 loop {
435 let PotSlotInfo { slot, checkpoints } = match slot_info_stream.recv().await {
436 Ok(slot_info) => slot_info,
437 Err(error) => match error {
438 broadcast::error::RecvError::Closed => {
439 info!("No Slot info senders available. Exiting slot worker.");
440 return;
441 }
442 broadcast::error::RecvError::Lagged(skipped_notifications) => {
443 debug!(
444 "Slot worker is lagging. Skipped {} slot notification(s)",
445 skipped_notifications
446 );
447 continue;
448 }
449 },
450 };
451 if let Some(last_processed_slot) = maybe_last_processed_slot
452 && last_processed_slot >= slot
453 {
454 continue;
456 }
457 maybe_last_processed_slot.replace(slot);
458
459 self.store_checkpoints(slot, checkpoints);
460
461 let best_beacon_chain_header = self.beacon_chain_info.best_header();
462 let best_beacon_chain_header = best_beacon_chain_header.header();
463 let best_header = self.chain_info.best_header();
464 let best_header = best_header.header();
465 let best_root = &*best_header.root();
466
467 self.on_new_slot(slot, checkpoints, best_root, best_beacon_chain_header);
468
469 if self.chain_sync_status.is_syncing() {
470 debug!(%slot, "Skipping proposal slot due to sync");
471 continue;
472 }
473
474 let Some(slot_to_claim) =
476 slot.checked_sub(self.consensus_constants.block_authoring_delay)
477 else {
478 trace!("Skipping very early slot during chain start");
479 continue;
480 };
481
482 let Some(block) = self
483 .produce_block(
484 slot_to_claim,
485 best_root,
486 best_header,
487 best_beacon_chain_header,
488 )
489 .await
490 else {
491 continue;
492 };
493
494 let block_import_fut = match self.block_import.import(block, BlockOrigin::Local) {
495 Ok(block_import_fut) => block_import_fut,
496 Err(error) => {
497 error!(%best_root, %error, "Failed to queue newly produced block for import");
498 continue;
499 }
500 };
501
502 match block_import_fut.await {
503 Ok(()) => {
504 }
506 Err(error) => {
507 warn!(%best_root, %error, "Failed to import newly produced block");
508 }
509 }
510 }
511 }
512
513 fn store_checkpoints(&mut self, slot: SlotNumber, checkpoints: PotCheckpoints) {
515 self.pot_checkpoints
517 .retain(|&stored_slot, _checkpoints| stored_slot < slot);
518
519 self.pot_checkpoints.insert(slot, checkpoints);
520 }
521
522 fn on_new_slot(
524 &mut self,
525 slot: SlotNumber,
526 checkpoints: PotCheckpoints,
527 _best_root: &BlockRoot,
528 best_beacon_chain_header: &BeaconChainHeader<'_>,
529 ) {
530 if self.chain_sync_status.is_syncing() {
531 debug!("Skipping farming slot {slot} due to sync");
532 return;
533 }
534
535 let proof_of_time = checkpoints.output();
536
537 let solution_range = best_beacon_chain_header
540 .consensus_parameters()
541 .next_solution_range
542 .unwrap_or(
543 best_beacon_chain_header
544 .consensus_parameters()
545 .fixed_parameters
546 .solution_range,
547 );
548 let new_slot_info = NewSlotInfo {
549 slot,
550 proof_of_time,
551 solution_range,
552 };
553 let (solution_sender, solution_receiver) =
554 mpsc::channel(PENDING_SOLUTIONS_CHANNEL_CAPACITY);
555
556 if let Err(error) = self
557 .new_slot_notification_sender
558 .try_send(NewSlotNotification {
559 new_slot_info,
560 solution_sender,
561 })
562 {
563 warn!(%error, "Failed to send new slot notification");
564 }
565
566 self.pending_solutions.insert(slot, solution_receiver);
567 }
568
569 async fn produce_block(
571 &mut self,
572 slot: SlotNumber,
573 parent_block_root: &BlockRoot,
574 parent_header: &<Block::Header as GenericOwnedBlockHeader>::Header<'_>,
575 parent_beacon_chain_header: &BeaconChainHeader<'_>,
576 ) -> Option<Block> {
577 if !self.force_authoring && self.chain_sync_status.is_offline() {
578 debug!("Skipping slot, waiting for the network");
579
580 return None;
581 }
582
583 let claimed_slot = self
584 .claim_slot(
585 parent_block_root,
586 parent_header,
587 parent_beacon_chain_header,
588 slot,
589 )
590 .await?;
591
592 debug!(%slot, "Starting block authorship");
593
594 let seal_block = {
595 let block_sealing_notification_sender = &mut self.block_sealing_notification_sender;
596 let public_key_hash = claimed_slot.consensus_info.solution.public_key_hash;
597
598 move |pre_seal_hash| async move {
599 let (seal_sender, seal_receiver) = oneshot::channel::<OwnedBlockHeaderSeal>();
600
601 if let Err(error) =
602 block_sealing_notification_sender.try_send(BlockSealNotification {
603 pre_seal_hash,
604 public_key_hash,
605 seal_sender,
606 })
607 {
608 warn!(%error, "Failed to send block sealing notification");
609 }
610
611 seal_receiver.await.ok()
612 }
613 };
614 let block = match self
615 .block_builder
616 .build(
617 parent_block_root,
618 parent_header,
619 &claimed_slot.consensus_info,
620 &claimed_slot.checkpoints,
621 seal_block,
622 )
623 .await
624 {
625 Ok(block) => block,
626 Err(error) => {
627 error!(%slot, %parent_block_root, %error, "Failed to build block");
628 return None;
629 }
630 };
631
632 let header = block.header().header();
633 info!(
634 number = %header.prefix.number,
635 root = %&*header.root(),
636 "🔖 Built new block",
637 );
638
639 Some(block)
640 }
641}