1pub mod block_import;
2pub mod gossip;
3pub mod state;
4pub mod timekeeper;
5
6use crate::PotNextSlotInput;
7use crate::source::block_import::BestBlockPotInfo;
8use crate::source::gossip::{GossipProof, ToGossipMessage};
9use crate::source::state::{PotState, PotStateSetOutcome};
10use crate::source::timekeeper::TimekeeperProof;
11use crate::verifier::PotVerifier;
12use ab_client_api::ChainSyncStatus;
13use ab_core_primitives::block::BlockNumber;
14use ab_core_primitives::block::header::BeaconChainHeader;
15use ab_core_primitives::pot::{PotCheckpoints, PotParametersChange, SlotNumber};
16use derive_more::{Deref, DerefMut};
17use futures::channel::mpsc;
18use futures::{FutureExt, StreamExt, select};
19use rclite::Arc;
20use std::future;
21use tokio::sync::broadcast;
22use tracing::{debug, trace, warn};
23
24const SLOTS_CHANNEL_CAPACITY: usize = 10;
25
26pub fn init_pot_state(
28 best_beacon_chain_header: &BeaconChainHeader<'_>,
29 pot_verifier: PotVerifier,
30 block_authoring_delay: SlotNumber,
31) -> PotState {
32 let block_number = best_beacon_chain_header.prefix.number;
33 let consensus_info = best_beacon_chain_header.consensus_info;
34 let consensus_parameters = best_beacon_chain_header.consensus_parameters();
35
36 let parent_slot = if block_number == BlockNumber::ZERO {
37 SlotNumber::ZERO
38 } else {
39 consensus_info.slot + block_authoring_delay
41 };
42
43 let maybe_next_parameters_change = consensus_parameters
44 .pot_parameters_change
45 .copied()
46 .map(PotParametersChange::from);
47
48 let pot_input = if block_number == BlockNumber::ZERO {
49 PotNextSlotInput {
50 slot: parent_slot + SlotNumber::ONE,
51 slot_iterations: consensus_parameters.fixed_parameters.slot_iterations,
52 seed: pot_verifier.genesis_seed(),
53 }
54 } else {
55 PotNextSlotInput::derive(
56 consensus_parameters.fixed_parameters.slot_iterations,
57 parent_slot,
58 consensus_info.future_proof_of_time,
59 &maybe_next_parameters_change,
60 )
61 };
62
63 PotState::new(pot_input, maybe_next_parameters_change, pot_verifier)
64}
65
66#[derive(Debug, Copy, Clone)]
68pub struct PotSlotInfo {
69 pub slot: SlotNumber,
71 pub checkpoints: PotCheckpoints,
73}
74
75#[derive(Debug, Deref, DerefMut)]
77pub struct PotSlotInfoStream(broadcast::Receiver<PotSlotInfo>);
78
79impl Clone for PotSlotInfoStream {
80 #[inline]
81 fn clone(&self) -> Self {
82 Self(self.0.resubscribe())
83 }
84}
85
86impl PotSlotInfoStream {
87 fn new(receiver: broadcast::Receiver<PotSlotInfo>) -> Self {
88 Self(receiver)
89 }
90}
91
92#[derive(Debug)]
97#[must_use = "Proof of time source doesn't do anything unless run() method is called"]
98pub struct PotSourceWorker<CSS> {
99 chain_sync_status: CSS,
100 timekeeper_proof_receiver: Option<mpsc::Receiver<TimekeeperProof>>,
101 to_gossip_sender: mpsc::Sender<ToGossipMessage>,
102 from_gossip_receiver: mpsc::Receiver<GossipProof>,
103 best_block_pot_info_receiver: mpsc::Receiver<BestBlockPotInfo>,
104 last_slot_sent: SlotNumber,
105 slot_sender: broadcast::Sender<PotSlotInfo>,
106 pot_state: Arc<PotState>,
107}
108
109impl<CSS> PotSourceWorker<CSS>
110where
111 CSS: ChainSyncStatus,
112{
113 pub fn new(
114 timekeeper_proof_receiver: Option<mpsc::Receiver<TimekeeperProof>>,
115 to_gossip_sender: mpsc::Sender<ToGossipMessage>,
116 from_gossip_receiver: mpsc::Receiver<GossipProof>,
117 best_block_pot_info_receiver: mpsc::Receiver<BestBlockPotInfo>,
118 chain_sync_status: CSS,
119 pot_state: Arc<PotState>,
120 ) -> (Self, PotSlotInfoStream) {
121 let (slot_sender, slot_receiver) = broadcast::channel(SLOTS_CHANNEL_CAPACITY);
122
123 let source_worker = Self {
124 chain_sync_status,
125 timekeeper_proof_receiver,
126 to_gossip_sender,
127 from_gossip_receiver,
128 best_block_pot_info_receiver,
129 last_slot_sent: SlotNumber::ZERO,
130 slot_sender,
131 pot_state,
132 };
133
134 let pot_slot_info_stream = PotSlotInfoStream::new(slot_receiver);
135
136 (source_worker, pot_slot_info_stream)
137 }
138
139 pub async fn run(mut self) {
141 loop {
142 let timekeeper_proof = async {
143 if let Some(timekeeper_proof_receiver) = &mut self.timekeeper_proof_receiver {
144 timekeeper_proof_receiver.next().await
145 } else {
146 future::pending().await
147 }
148 };
149
150 select! {
151 maybe_timekeeper_proof = timekeeper_proof.fuse() => {
152 if let Some(timekeeper_proof) = maybe_timekeeper_proof {
153 self.handle_timekeeper_proof(timekeeper_proof);
154 } else {
155 debug!("Timekeeper proof stream ended, exiting");
156 return;
157 }
158 }
159 maybe_gossip_proof = self.from_gossip_receiver.next() => {
160 if let Some(gossip_proof) = maybe_gossip_proof {
161 self.handle_gossip_proof(gossip_proof);
162 } else {
163 debug!("Incoming gossip messages stream ended, exiting");
164 return;
165 }
166 }
167 maybe_best_block_pot_info = self.best_block_pot_info_receiver.next() => {
168 if let Some(best_block_pot_info) = maybe_best_block_pot_info {
169 self.handle_best_block_pot_info(best_block_pot_info);
170 } else {
171 debug!("Import notifications stream ended, exiting");
172 return;
173 }
174 }
175 }
176 }
177 }
178
179 fn handle_timekeeper_proof(&mut self, proof: TimekeeperProof) {
180 let TimekeeperProof {
181 slot,
182 seed,
183 slot_iterations,
184 checkpoints,
185 } = proof;
186
187 if self.chain_sync_status.is_syncing() {
188 trace!(
189 ?slot,
190 %seed,
191 %slot_iterations,
192 output = %checkpoints.output(),
193 "Ignore timekeeper proof due to major syncing",
194 );
195
196 return;
197 }
198
199 debug!(
200 ?slot,
201 %seed,
202 %slot_iterations,
203 output = %checkpoints.output(),
204 "Received timekeeper proof",
205 );
206
207 if self
208 .to_gossip_sender
209 .try_send(ToGossipMessage::Proof(GossipProof {
210 slot,
211 seed,
212 slot_iterations,
213 checkpoints,
214 }))
215 .is_err()
216 {
217 debug!(
218 %slot,
219 "Gossip is not able to keep-up with slot production (timekeeper)",
220 );
221 }
222
223 if slot > self.last_slot_sent {
224 self.last_slot_sent = slot;
225
226 let _ = self.slot_sender.send(PotSlotInfo { slot, checkpoints });
229 }
230 }
231
232 fn handle_gossip_proof(&mut self, proof: GossipProof) {
235 let expected_next_slot_input = PotNextSlotInput {
236 slot: proof.slot,
237 slot_iterations: proof.slot_iterations,
238 seed: proof.seed,
239 };
240
241 if let Ok(next_slot_input) = self.pot_state.try_extend(
242 expected_next_slot_input,
243 proof.slot,
244 proof.checkpoints.output(),
245 None,
246 ) {
247 if proof.slot > self.last_slot_sent {
248 self.last_slot_sent = proof.slot;
249
250 let _ = self.slot_sender.send(PotSlotInfo {
253 slot: proof.slot,
254 checkpoints: proof.checkpoints,
255 });
256 }
257
258 if self
259 .to_gossip_sender
260 .try_send(ToGossipMessage::NextSlotInput(next_slot_input))
261 .is_err()
262 {
263 debug!(
264 slot = %proof.slot,
265 next_slot = %next_slot_input.slot,
266 "Gossip is not able to keep-up with slot production (gossip)",
267 );
268 }
269 }
270 }
271
272 fn handle_best_block_pot_info(&mut self, best_block_pot_info: BestBlockPotInfo) {
273 match self.pot_state.set_known_good_output(
279 best_block_pot_info.slot,
280 best_block_pot_info.pot_output,
281 best_block_pot_info.pot_parameters_change,
282 ) {
283 PotStateSetOutcome::NoChange => {
284 trace!(
285 slot = %best_block_pot_info.slot,
286 "Block import didn't result in proof of time chain changes",
287 );
288 }
289 PotStateSetOutcome::Extension { from, to } => {
290 warn!(
291 from_next_slot = %from.slot,
292 to_next_slot = %to.slot,
293 "Proof of time chain was extended from block import",
294 );
295
296 if self
297 .to_gossip_sender
298 .try_send(ToGossipMessage::NextSlotInput(to))
299 .is_err()
300 {
301 debug!(
302 next_slot = %to.slot,
303 "Gossip is not able to keep-up with slot production (block import)",
304 );
305 }
306 }
307 PotStateSetOutcome::Reorg { from, to } => {
308 warn!(
309 from_next_slot = %from.slot,
310 to_next_slot = %to.slot,
311 "Proof of time chain reorg happened",
312 );
313
314 if self
315 .to_gossip_sender
316 .try_send(ToGossipMessage::NextSlotInput(to))
317 .is_err()
318 {
319 debug!(
320 next_slot = %to.slot,
321 "Gossip is not able to keep-up with slot production (block import)",
322 );
323 }
324 }
325 }
326 }
327}