1use ab_client_api::{BlockDetails, BlockOrigin, ChainInfo, ChainSyncStatus};
5use ab_client_archiving::segment_headers_store::SegmentHeadersStore;
6use ab_client_block_builder::{BlockBuilder, BlockBuilderResult};
7use ab_client_block_import::BlockImport;
8use ab_client_consensus_common::ConsensusConstants;
9use ab_client_proof_of_time::PotNextSlotInput;
10use ab_client_proof_of_time::source::{PotSlotInfo, PotSlotInfoStream};
11use ab_client_proof_of_time::verifier::PotVerifier;
12use ab_core_primitives::block::BlockNumber;
13use ab_core_primitives::block::header::owned::GenericOwnedBlockHeader;
14use ab_core_primitives::block::header::{
15 BeaconChainHeader, BlockHeaderConsensusInfo, GenericBlockHeader, OwnedBlockHeaderSeal,
16 SharedBlockHeader,
17};
18use ab_core_primitives::block::owned::{GenericOwnedBlock, OwnedBeaconChainBlock};
19use ab_core_primitives::hashes::Blake3Hash;
20use ab_core_primitives::pot::{PotCheckpoints, PotOutput, PotParametersChange, SlotNumber};
21use ab_core_primitives::sectors::SectorId;
22use ab_core_primitives::segments::HistorySize;
23use ab_core_primitives::solutions::{
24 Solution, SolutionRange, SolutionVerifyError, SolutionVerifyParams,
25 SolutionVerifyPieceCheckParams,
26};
27use ab_proof_of_space::Table;
28use futures::StreamExt;
29use futures::channel::{mpsc, oneshot};
30use send_future::SendFuture;
31use std::collections::BTreeMap;
32use std::marker::PhantomData;
33use tokio::sync::broadcast;
34use tracing::{debug, error, info, trace, warn};
35
36const PENDING_SOLUTIONS_CHANNEL_CAPACITY: usize = 10;
38
39#[derive(Debug, Copy, Clone)]
41pub struct NewSlotInfo {
42 pub slot: SlotNumber,
44 pub proof_of_time: PotOutput,
46 pub solution_range: SolutionRange,
48}
49
50#[derive(Debug, Clone)]
52pub struct NewSlotNotification {
53 pub new_slot_info: NewSlotInfo,
55 pub solution_sender: mpsc::Sender<Solution>,
57}
58#[derive(Debug)]
61pub struct BlockSealNotification {
62 pub pre_seal_hash: Blake3Hash,
64 pub public_key_hash: Blake3Hash,
66 pub seal_sender: oneshot::Sender<OwnedBlockHeaderSeal>,
68}
69
70#[derive(Debug)]
71pub struct ClaimedSlot {
72 pub consensus_info: BlockHeaderConsensusInfo,
74 pub checkpoints: Vec<PotCheckpoints>,
77}
78
79#[derive(Debug)]
81pub struct SubspaceSlotWorkerOptions<BB, BI, BCI, CI, CSS> {
82 pub block_builder: BB,
84 pub block_import: BI,
86 pub beacon_chain_info: BCI,
88 pub chain_info: CI,
90 pub chain_sync_status: CSS,
92 pub force_authoring: bool,
94 pub new_slot_notification_sender: mpsc::Sender<NewSlotNotification>,
96 pub block_sealing_notification_sender: mpsc::Sender<BlockSealNotification>,
98 pub segment_headers_store: SegmentHeadersStore,
101 pub consensus_constants: ConsensusConstants,
103 pub pot_verifier: PotVerifier,
105}
106
107#[derive(Debug)]
109pub struct SubspaceSlotWorker<PosTable, Block, BB, BI, BCI, CI, CSS> {
110 block_builder: BB,
111 block_import: BI,
112 beacon_chain_info: BCI,
113 chain_info: CI,
114 chain_sync_status: CSS,
115 force_authoring: bool,
116 new_slot_notification_sender: mpsc::Sender<NewSlotNotification>,
117 block_sealing_notification_sender: mpsc::Sender<BlockSealNotification>,
118 segment_headers_store: SegmentHeadersStore,
119 pending_solutions: BTreeMap<SlotNumber, mpsc::Receiver<Solution>>,
122 pot_checkpoints: BTreeMap<SlotNumber, PotCheckpoints>,
124 consensus_constants: ConsensusConstants,
125 pot_verifier: PotVerifier,
126 _pos_table: PhantomData<(PosTable, Block)>,
127}
128
129impl<PosTable, Block, BB, BI, BCI, CI, CSS>
130 SubspaceSlotWorker<PosTable, Block, BB, BI, BCI, CI, CSS>
131where
132 PosTable: Table,
133 Block: GenericOwnedBlock,
134 BB: BlockBuilder<Block>,
135 BI: BlockImport<Block>,
136 BCI: ChainInfo<OwnedBeaconChainBlock>,
137 CI: ChainInfo<Block>,
138 CSS: ChainSyncStatus,
139{
140 async fn claim_slot(
141 &mut self,
142 parent_header: &SharedBlockHeader<'_>,
143 parent_beacon_chain_header: &BeaconChainHeader<'_>,
144 slot: SlotNumber,
145 ) -> Option<ClaimedSlot> {
146 let parent_number = parent_header.prefix.number;
147 let parent_slot = parent_header.consensus_info.slot;
148
149 if slot <= parent_slot {
150 debug!(
151 "Skipping claiming slot {slot}, it must be higher than parent slot {parent_slot}",
152 );
153
154 return None;
155 } else {
156 debug!(%slot, "Attempting to claim a slot");
157 }
158
159 let parent_consensus_parameters = parent_beacon_chain_header.consensus_parameters();
160
161 let solution_range = parent_consensus_parameters
162 .next_solution_range
163 .unwrap_or(parent_consensus_parameters.fixed_parameters.solution_range);
164
165 let parent_pot_parameters_change = parent_consensus_parameters
166 .pot_parameters_change
167 .copied()
168 .map(PotParametersChange::from);
169 let parent_future_slot = if parent_number == BlockNumber::ZERO {
170 parent_slot
171 } else {
172 parent_slot + self.consensus_constants.block_authoring_delay
173 };
174
175 let (proof_of_time, future_proof_of_time, checkpoints) = {
176 self.pot_checkpoints
178 .retain(|&stored_slot, _checkpoints| stored_slot > parent_slot);
179
180 let proof_of_time = self.pot_checkpoints.get(&slot)?.output();
181
182 let future_slot = slot + self.consensus_constants.block_authoring_delay;
184
185 let pot_input = if parent_number == BlockNumber::ZERO {
186 PotNextSlotInput {
187 slot: parent_slot + SlotNumber::ONE,
188 slot_iterations: parent_consensus_parameters.fixed_parameters.slot_iterations,
189 seed: self.pot_verifier.genesis_seed(),
190 }
191 } else {
192 PotNextSlotInput::derive(
193 parent_consensus_parameters.fixed_parameters.slot_iterations,
194 parent_slot,
195 parent_header.consensus_info.proof_of_time,
196 &parent_pot_parameters_change,
197 )
198 };
199
200 if !self.pot_verifier.is_output_valid(
202 pot_input,
203 slot - parent_slot,
204 proof_of_time,
205 parent_pot_parameters_change,
206 ) {
207 warn!(
208 %slot,
209 ?pot_input,
210 consensus_info = ?parent_header.consensus_info,
211 "Proof of time is invalid, skipping block authoring at the slot"
212 );
213 return None;
214 }
215
216 let mut checkpoints_pot_input = if parent_number == BlockNumber::ZERO {
217 PotNextSlotInput {
218 slot: parent_slot + SlotNumber::ONE,
219 slot_iterations: parent_consensus_parameters.fixed_parameters.slot_iterations,
220 seed: self.pot_verifier.genesis_seed(),
221 }
222 } else {
223 PotNextSlotInput::derive(
224 parent_consensus_parameters.fixed_parameters.slot_iterations,
225 parent_future_slot,
226 parent_header.consensus_info.future_proof_of_time,
227 &parent_pot_parameters_change,
228 )
229 };
230
231 let mut checkpoints =
232 Vec::with_capacity((future_slot - parent_future_slot).as_u64() as usize);
233
234 for slot in parent_future_slot + SlotNumber::ONE..=future_slot {
235 let maybe_slot_checkpoints = self.pot_verifier.get_checkpoints(
236 checkpoints_pot_input.slot_iterations,
237 checkpoints_pot_input.seed,
238 );
239 let Some(slot_checkpoints) = maybe_slot_checkpoints else {
240 warn!("Proving failed during block authoring");
241 return None;
242 };
243
244 checkpoints.push(slot_checkpoints);
245
246 checkpoints_pot_input = PotNextSlotInput::derive(
247 checkpoints_pot_input.slot_iterations,
248 slot,
249 slot_checkpoints.output(),
250 &parent_pot_parameters_change,
251 );
252 }
253
254 let future_proof_of_time = checkpoints
255 .last()
256 .expect("Never empty, there is at least one slot between blocks; qed")
257 .output();
258
259 (proof_of_time, future_proof_of_time, checkpoints)
260 };
261
262 let mut solution_receiver = {
263 self.pending_solutions
265 .retain(|&stored_slot, _solution_receiver| stored_slot >= slot);
266
267 let mut solution_receiver = self.pending_solutions.remove(&slot)?;
268 solution_receiver.close();
270 solution_receiver
271 };
272
273 let mut maybe_consensus_info = None;
274
275 while let Some(solution) = solution_receiver.next().await {
277 let sector_id = SectorId::new(
278 &solution.public_key_hash,
279 solution.sector_index,
280 solution.history_size,
281 );
282
283 let history_size = HistorySize::ONE;
287 let max_pieces_in_sector = 1000;
288
289 let segment_index = sector_id
290 .derive_piece_index(
291 solution.piece_offset,
292 solution.history_size,
293 max_pieces_in_sector,
294 self.consensus_constants.recent_segments,
295 self.consensus_constants.recent_history_fraction,
296 )
297 .segment_index();
298 let maybe_segment_root = self
299 .segment_headers_store
300 .get_segment_header(segment_index)
301 .map(|segment_header| segment_header.segment_root);
302
303 let segment_root = match maybe_segment_root {
304 Some(segment_root) => segment_root,
305 None => {
306 warn!(
307 %slot,
308 %segment_index,
309 "Segment root not found",
310 );
311 continue;
312 }
313 };
314 let sector_expiration_check_segment_index = match solution
315 .history_size
316 .sector_expiration_check(self.consensus_constants.min_sector_lifetime)
317 {
318 Some(sector_expiration_check) => sector_expiration_check.segment_index(),
319 None => {
320 continue;
321 }
322 };
323 let sector_expiration_check_segment_root = self
324 .segment_headers_store
325 .get_segment_header(sector_expiration_check_segment_index)
326 .map(|segment_header| segment_header.segment_root);
327
328 let solution_verification_result = solution.verify::<PosTable>(
329 slot,
330 &SolutionVerifyParams {
331 proof_of_time,
332 solution_range,
333 piece_check_params: Some(SolutionVerifyPieceCheckParams {
334 max_pieces_in_sector,
335 segment_root,
336 recent_segments: self.consensus_constants.recent_segments,
337 recent_history_fraction: self.consensus_constants.recent_history_fraction,
338 min_sector_lifetime: self.consensus_constants.min_sector_lifetime,
339 current_history_size: history_size,
340 sector_expiration_check_segment_root,
341 }),
342 },
343 );
344
345 match solution_verification_result {
346 Ok(()) => {
347 if maybe_consensus_info.is_none() {
348 info!(%slot, "🚜 Claimed slot");
349 maybe_consensus_info.replace(BlockHeaderConsensusInfo {
350 slot,
351 proof_of_time,
352 future_proof_of_time,
353 solution,
354 });
355 } else {
356 info!(
357 %slot,
358 "Skipping a solution that has quality sufficient for block because \
359 slot has already been claimed",
360 );
361 }
362 }
363 Err(error @ SolutionVerifyError::OutsideSolutionRange { .. }) => {
364 if parent_consensus_parameters.next_solution_range.is_some() {
367 debug!(
368 %slot,
369 %error,
370 "Invalid solution received",
371 );
372 } else {
373 warn!(
374 %slot,
375 %error,
376 "Invalid solution received",
377 );
378 }
379 }
380 Err(error) => {
381 warn!(
382 %slot,
383 %error,
384 "Invalid solution received",
385 );
386 }
387 }
388 }
389
390 maybe_consensus_info.map(|consensus_info| ClaimedSlot {
391 consensus_info,
392 checkpoints,
393 })
394 }
395
396 pub fn new(
398 SubspaceSlotWorkerOptions {
399 block_builder,
400 block_import,
401 beacon_chain_info,
402 chain_info,
403 chain_sync_status,
404 force_authoring,
405 new_slot_notification_sender,
406 block_sealing_notification_sender,
407 segment_headers_store,
408 consensus_constants,
409 pot_verifier,
410 }: SubspaceSlotWorkerOptions<BB, BI, BCI, CI, CSS>,
411 ) -> Self {
412 Self {
413 block_builder,
414 block_import,
415 beacon_chain_info,
416 chain_info,
417 chain_sync_status,
418 force_authoring,
419 new_slot_notification_sender,
420 block_sealing_notification_sender,
421 segment_headers_store,
422 pending_solutions: BTreeMap::new(),
423 pot_checkpoints: BTreeMap::new(),
424 consensus_constants,
425 pot_verifier,
426 _pos_table: PhantomData,
427 }
428 }
429
430 pub async fn run(mut self, mut slot_info_stream: PotSlotInfoStream) {
432 let mut maybe_last_processed_slot = None;
433
434 loop {
435 let PotSlotInfo { slot, checkpoints } = match slot_info_stream.recv().await {
436 Ok(slot_info) => slot_info,
437 Err(error) => match error {
438 broadcast::error::RecvError::Closed => {
439 info!("No Slot info senders available. Exiting slot worker.");
440 return;
441 }
442 broadcast::error::RecvError::Lagged(skipped_notifications) => {
443 debug!(
444 "Slot worker is lagging. Skipped {} slot notification(s)",
445 skipped_notifications
446 );
447 continue;
448 }
449 },
450 };
451 if let Some(last_processed_slot) = maybe_last_processed_slot
452 && last_processed_slot >= slot
453 {
454 continue;
456 }
457 maybe_last_processed_slot.replace(slot);
458
459 self.store_checkpoints(slot, checkpoints);
460
461 let best_beacon_chain_header = self.beacon_chain_info.best_header();
462 let best_beacon_chain_header = best_beacon_chain_header.header();
463 let (best_header, best_block_details) = self.chain_info.best_header_with_details();
464 let best_header = best_header.header();
465
466 self.on_new_slot(slot, checkpoints, best_beacon_chain_header);
467
468 if self.chain_sync_status.is_syncing() {
469 debug!(%slot, "Skipping proposal slot due to sync");
470 continue;
471 }
472
473 let Some(slot_to_claim) =
475 slot.checked_sub(self.consensus_constants.block_authoring_delay)
476 else {
477 trace!("Skipping a very early slot during chain start");
478 continue;
479 };
480
481 let Some(block_builder_result) = self
484 .produce_block(
485 slot_to_claim,
486 best_header,
487 &best_block_details,
488 best_beacon_chain_header,
489 )
490 .send()
491 .await
492 else {
493 continue;
494 };
495
496 let block_import_fut = match self.block_import.import(
497 block_builder_result.block,
498 BlockOrigin::LocalBlockBuilder {
499 block_details: block_builder_result.block_details,
500 },
501 ) {
502 Ok(block_import_fut) => block_import_fut,
503 Err(error) => {
504 error!(
505 best_root = %*best_header.root(),
506 %error,
507 "Failed to queue a newly produced block for import"
508 );
509 continue;
510 }
511 };
512
513 match block_import_fut.await {
514 Ok(()) => {
515 }
517 Err(error) => {
518 warn!(
519 best_root = %*best_header.root(),
520 %error,
521 "Failed to import a newly produced block"
522 );
523 }
524 }
525 }
526 }
527
528 fn store_checkpoints(&mut self, slot: SlotNumber, checkpoints: PotCheckpoints) {
530 self.pot_checkpoints
532 .retain(|&stored_slot, _checkpoints| stored_slot < slot);
533
534 self.pot_checkpoints.insert(slot, checkpoints);
535 }
536
537 fn on_new_slot(
539 &mut self,
540 slot: SlotNumber,
541 checkpoints: PotCheckpoints,
542 best_beacon_chain_header: &BeaconChainHeader<'_>,
543 ) {
544 if self.chain_sync_status.is_syncing() {
545 debug!("Skipping farming slot {slot} due to sync");
546 return;
547 }
548
549 let proof_of_time = checkpoints.output();
550
551 let solution_range = best_beacon_chain_header
554 .consensus_parameters()
555 .next_solution_range
556 .unwrap_or(
557 best_beacon_chain_header
558 .consensus_parameters()
559 .fixed_parameters
560 .solution_range,
561 );
562 let new_slot_info = NewSlotInfo {
563 slot,
564 proof_of_time,
565 solution_range,
566 };
567 let (solution_sender, solution_receiver) =
568 mpsc::channel(PENDING_SOLUTIONS_CHANNEL_CAPACITY);
569
570 if let Err(error) = self
571 .new_slot_notification_sender
572 .try_send(NewSlotNotification {
573 new_slot_info,
574 solution_sender,
575 })
576 {
577 warn!(%error, "Failed to send a new slot notification");
578 }
579
580 self.pending_solutions.insert(slot, solution_receiver);
581 }
582
583 async fn produce_block(
585 &mut self,
586 slot: SlotNumber,
587 parent_header: &<Block::Header as GenericOwnedBlockHeader>::Header<'_>,
588 parent_block_details: &BlockDetails,
589 parent_beacon_chain_header: &BeaconChainHeader<'_>,
590 ) -> Option<BlockBuilderResult<Block>> {
591 if !self.force_authoring && self.chain_sync_status.is_offline() {
592 debug!("Skipping slot, waiting for the network");
593
594 return None;
595 }
596
597 let claimed_slot = self
598 .claim_slot(parent_header, parent_beacon_chain_header, slot)
599 .await?;
600
601 debug!(%slot, "Starting block authorship");
602
603 let seal_block = {
604 let block_sealing_notification_sender = &mut self.block_sealing_notification_sender;
605 let public_key_hash = claimed_slot.consensus_info.solution.public_key_hash;
606
607 move |pre_seal_hash| async move {
608 let (seal_sender, seal_receiver) = oneshot::channel::<OwnedBlockHeaderSeal>();
609
610 if let Err(error) =
611 block_sealing_notification_sender.try_send(BlockSealNotification {
612 pre_seal_hash,
613 public_key_hash,
614 seal_sender,
615 })
616 {
617 warn!(%error, "Failed to send block sealing notification");
618 }
619
620 seal_receiver.await.ok()
621 }
622 };
623
624 let parent_block_root = *parent_header.root();
625
626 let block_builder_result = match self
629 .block_builder
630 .build(
631 &parent_block_root,
632 parent_header,
633 parent_block_details,
634 &claimed_slot.consensus_info,
635 &claimed_slot.checkpoints,
636 seal_block,
637 )
638 .send()
639 .await
640 {
641 Ok(block_builder_result) => block_builder_result,
642 Err(error) => {
643 error!(%slot, %parent_block_root, %error, "Failed to build a block");
644 return None;
645 }
646 };
647
648 let header = block_builder_result.block.header().header();
649 info!(
650 number = %header.prefix.number,
651 root = %&*header.root(),
652 "🔖 Built new block",
653 );
654
655 Some(block_builder_result)
656 }
657}