1pub mod block_import;
2pub mod gossip;
3pub mod state;
4pub mod timekeeper;
5
6use crate::PotNextSlotInput;
7use crate::source::block_import::BestBlockPotInfo;
8use crate::source::gossip::{GossipProof, ToGossipMessage};
9use crate::source::state::{PotState, PotStateSetOutcome};
10use crate::source::timekeeper::TimekeeperProof;
11use crate::verifier::PotVerifier;
12use ab_client_api::ChainSyncStatus;
13use ab_core_primitives::block::BlockNumber;
14use ab_core_primitives::block::header::BeaconChainHeader;
15use ab_core_primitives::pot::{PotCheckpoints, PotParametersChange, SlotNumber};
16use derive_more::{Deref, DerefMut};
17use futures::channel::mpsc;
18use futures::{FutureExt, StreamExt, select};
19use rclite::Arc;
20use std::future;
21use tokio::sync::broadcast;
22use tracing::{debug, trace, warn};
23
24const SLOTS_CHANNEL_CAPACITY: usize = 10;
25
26pub fn init_pot_state(
28 best_beacon_chain_header: &BeaconChainHeader<'_>,
29 pot_verifier: PotVerifier,
30 block_authoring_delay: SlotNumber,
31) -> PotState {
32 let block_number = best_beacon_chain_header.prefix.number;
33 let consensus_info = best_beacon_chain_header.consensus_info;
34 let consensus_parameters = best_beacon_chain_header.consensus_parameters();
35
36 let parent_slot = if block_number == BlockNumber::ZERO {
37 SlotNumber::ZERO
38 } else {
39 consensus_info.slot + block_authoring_delay
41 };
42
43 let maybe_next_parameters_change = consensus_parameters
44 .pot_parameters_change
45 .copied()
46 .map(PotParametersChange::from);
47
48 let pot_input = if block_number == BlockNumber::ZERO {
49 PotNextSlotInput {
50 slot: parent_slot + SlotNumber::ONE,
51 slot_iterations: consensus_parameters.fixed_parameters.slot_iterations,
52 seed: pot_verifier.genesis_seed(),
53 }
54 } else {
55 PotNextSlotInput::derive(
56 consensus_parameters.fixed_parameters.slot_iterations,
57 parent_slot,
58 consensus_info.future_proof_of_time,
59 &maybe_next_parameters_change,
60 )
61 };
62
63 PotState::new(pot_input, maybe_next_parameters_change, pot_verifier)
64}
65
66#[derive(Debug, Copy, Clone)]
68pub struct PotSlotInfo {
69 pub slot: SlotNumber,
71 pub checkpoints: PotCheckpoints,
73}
74
75#[derive(Debug, Deref, DerefMut)]
77pub struct PotSlotInfoStream(broadcast::Receiver<PotSlotInfo>);
78
79impl Clone for PotSlotInfoStream {
80 #[inline]
81 fn clone(&self) -> Self {
82 Self(self.0.resubscribe())
83 }
84}
85
86#[derive(Debug)]
91#[must_use = "Proof of time source doesn't do anything unless run() method is called"]
92pub struct PotSourceWorker<CSS> {
93 chain_sync_status: CSS,
94 timekeeper_proof_receiver: Option<mpsc::Receiver<TimekeeperProof>>,
95 to_gossip_sender: mpsc::Sender<ToGossipMessage>,
96 from_gossip_receiver: mpsc::Receiver<GossipProof>,
97 best_block_pot_info_receiver: mpsc::Receiver<BestBlockPotInfo>,
98 last_slot_sent: SlotNumber,
99 slot_sender: broadcast::Sender<PotSlotInfo>,
100 pot_state: Arc<PotState>,
101}
102
103impl<CSS> PotSourceWorker<CSS>
104where
105 CSS: ChainSyncStatus,
106{
107 pub fn new(
108 timekeeper_proof_receiver: Option<mpsc::Receiver<TimekeeperProof>>,
109 to_gossip_sender: mpsc::Sender<ToGossipMessage>,
110 from_gossip_receiver: mpsc::Receiver<GossipProof>,
111 best_block_pot_info_receiver: mpsc::Receiver<BestBlockPotInfo>,
112 chain_sync_status: CSS,
113 pot_state: Arc<PotState>,
114 ) -> (Self, PotSlotInfoStream) {
115 let (slot_sender, slot_receiver) = broadcast::channel(SLOTS_CHANNEL_CAPACITY);
116
117 let source_worker = Self {
118 chain_sync_status,
119 timekeeper_proof_receiver,
120 to_gossip_sender,
121 from_gossip_receiver,
122 best_block_pot_info_receiver,
123 last_slot_sent: SlotNumber::ZERO,
124 slot_sender,
125 pot_state,
126 };
127
128 let pot_slot_info_stream = PotSlotInfoStream(slot_receiver);
129
130 (source_worker, pot_slot_info_stream)
131 }
132
133 pub async fn run(mut self) {
135 loop {
136 let timekeeper_proof = async {
137 if let Some(timekeeper_proof_receiver) = &mut self.timekeeper_proof_receiver {
138 timekeeper_proof_receiver.next().await
139 } else {
140 future::pending().await
141 }
142 };
143
144 select! {
145 maybe_timekeeper_proof = timekeeper_proof.fuse() => {
146 if let Some(timekeeper_proof) = maybe_timekeeper_proof {
147 self.handle_timekeeper_proof(timekeeper_proof);
148 } else {
149 debug!("Timekeeper proof stream ended, exiting");
150 return;
151 }
152 }
153 maybe_gossip_proof = self.from_gossip_receiver.next() => {
154 if let Some(gossip_proof) = maybe_gossip_proof {
155 self.handle_gossip_proof(gossip_proof);
156 } else {
157 debug!("Incoming gossip messages stream ended, exiting");
158 return;
159 }
160 }
161 maybe_best_block_pot_info = self.best_block_pot_info_receiver.next() => {
162 if let Some(best_block_pot_info) = maybe_best_block_pot_info {
163 self.handle_best_block_pot_info(best_block_pot_info);
164 } else {
165 debug!("Import notifications stream ended, exiting");
166 return;
167 }
168 }
169 }
170 }
171 }
172
173 fn handle_timekeeper_proof(&mut self, proof: TimekeeperProof) {
174 let TimekeeperProof {
175 slot,
176 seed,
177 slot_iterations,
178 checkpoints,
179 } = proof;
180
181 if self.chain_sync_status.is_syncing() {
182 trace!(
183 ?slot,
184 %seed,
185 %slot_iterations,
186 output = %checkpoints.output(),
187 "Ignore timekeeper proof due to major syncing",
188 );
189
190 return;
191 }
192
193 debug!(
194 ?slot,
195 %seed,
196 %slot_iterations,
197 output = %checkpoints.output(),
198 "Received timekeeper proof",
199 );
200
201 if self
202 .to_gossip_sender
203 .try_send(ToGossipMessage::Proof(GossipProof {
204 slot,
205 seed,
206 slot_iterations,
207 checkpoints,
208 }))
209 .is_err()
210 {
211 debug!(
212 %slot,
213 "Gossip is not able to keep-up with slot production (timekeeper)",
214 );
215 }
216
217 if slot > self.last_slot_sent {
218 self.last_slot_sent = slot;
219
220 let _ = self.slot_sender.send(PotSlotInfo { slot, checkpoints });
223 }
224 }
225
226 fn handle_gossip_proof(&mut self, proof: GossipProof) {
229 let expected_next_slot_input = PotNextSlotInput {
230 slot: proof.slot,
231 slot_iterations: proof.slot_iterations,
232 seed: proof.seed,
233 };
234
235 if let Ok(next_slot_input) = self.pot_state.try_extend(
236 expected_next_slot_input,
237 proof.slot,
238 proof.checkpoints.output(),
239 None,
240 ) {
241 if proof.slot > self.last_slot_sent {
242 self.last_slot_sent = proof.slot;
243
244 let _ = self.slot_sender.send(PotSlotInfo {
247 slot: proof.slot,
248 checkpoints: proof.checkpoints,
249 });
250 }
251
252 if self
253 .to_gossip_sender
254 .try_send(ToGossipMessage::NextSlotInput(next_slot_input))
255 .is_err()
256 {
257 debug!(
258 slot = %proof.slot,
259 next_slot = %next_slot_input.slot,
260 "Gossip is not able to keep-up with slot production (gossip)",
261 );
262 }
263 }
264 }
265
266 fn handle_best_block_pot_info(&mut self, best_block_pot_info: BestBlockPotInfo) {
267 match self.pot_state.set_known_good_output(
273 best_block_pot_info.slot,
274 best_block_pot_info.pot_output,
275 best_block_pot_info.pot_parameters_change,
276 ) {
277 PotStateSetOutcome::NoChange => {
278 trace!(
279 slot = %best_block_pot_info.slot,
280 "Block import didn't result in proof of time chain changes",
281 );
282 }
283 PotStateSetOutcome::Extension { from, to } => {
284 warn!(
285 from_next_slot = %from.slot,
286 to_next_slot = %to.slot,
287 "Proof of time chain was extended from block import",
288 );
289
290 if self
291 .to_gossip_sender
292 .try_send(ToGossipMessage::NextSlotInput(to))
293 .is_err()
294 {
295 debug!(
296 next_slot = %to.slot,
297 "Gossip is not able to keep-up with slot production (block import)",
298 );
299 }
300 }
301 PotStateSetOutcome::Reorg { from, to } => {
302 warn!(
303 from_next_slot = %from.slot,
304 to_next_slot = %to.slot,
305 "Proof of time chain reorg happened",
306 );
307
308 if self
309 .to_gossip_sender
310 .try_send(ToGossipMessage::NextSlotInput(to))
311 .is_err()
312 {
313 debug!(
314 next_slot = %to.slot,
315 "Gossip is not able to keep-up with slot production (block import)",
316 );
317 }
318 }
319 }
320 }
321}