1pub mod block_import;
2pub mod gossip;
3pub mod state;
4pub mod timekeeper;
5
6use crate::PotNextSlotInput;
7use crate::source::block_import::BestBlockPotInfo;
8use crate::source::gossip::{GossipProof, ToGossipMessage};
9use crate::source::state::{PotState, PotStateSetOutcome};
10use crate::source::timekeeper::TimekeeperProof;
11use ab_client_api::ChainSyncStatus;
12use ab_core_primitives::pot::{PotCheckpoints, SlotNumber};
13use derive_more::{Deref, DerefMut};
14use futures::channel::mpsc;
15use futures::{FutureExt, StreamExt, select};
16use std::future;
17use std::sync::Arc;
18use tokio::sync::broadcast;
19use tracing::{debug, trace, warn};
20
21const SLOTS_CHANNEL_CAPACITY: usize = 10;
22
23#[derive(Debug, Copy, Clone)]
25pub struct PotSlotInfo {
26 pub slot: SlotNumber,
28 pub checkpoints: PotCheckpoints,
30}
31
32#[derive(Debug, Deref, DerefMut)]
34pub struct PotSlotInfoStream(broadcast::Receiver<PotSlotInfo>);
35
36impl Clone for PotSlotInfoStream {
37 #[inline]
38 fn clone(&self) -> Self {
39 Self(self.0.resubscribe())
40 }
41}
42
43#[derive(Debug)]
48#[must_use = "Proof of time source doesn't do anything unless run() method is called"]
49pub struct PotSourceWorker<CSS> {
50 chain_sync_status: CSS,
51 timekeeper_proof_receiver: Option<mpsc::Receiver<TimekeeperProof>>,
52 to_gossip_sender: mpsc::Sender<ToGossipMessage>,
53 from_gossip_receiver: mpsc::Receiver<GossipProof>,
54 best_block_pot_info_receiver: mpsc::Receiver<BestBlockPotInfo>,
55 last_slot_sent: SlotNumber,
56 slot_sender: broadcast::Sender<PotSlotInfo>,
57 pot_state: Arc<PotState>,
58}
59
60impl<CSS> PotSourceWorker<CSS>
61where
62 CSS: ChainSyncStatus,
63{
64 pub fn new(
65 timekeeper_proof_receiver: Option<mpsc::Receiver<TimekeeperProof>>,
66 to_gossip_sender: mpsc::Sender<ToGossipMessage>,
67 from_gossip_receiver: mpsc::Receiver<GossipProof>,
68 best_block_pot_info_receiver: mpsc::Receiver<BestBlockPotInfo>,
69 chain_sync_status: CSS,
70 pot_state: Arc<PotState>,
71 ) -> (Self, PotSlotInfoStream) {
72 let (slot_sender, slot_receiver) = broadcast::channel(SLOTS_CHANNEL_CAPACITY);
73
74 let source_worker = Self {
75 chain_sync_status,
76 timekeeper_proof_receiver,
77 to_gossip_sender,
78 from_gossip_receiver,
79 best_block_pot_info_receiver,
80 last_slot_sent: SlotNumber::ZERO,
81 slot_sender,
82 pot_state,
83 };
84
85 let pot_slot_info_stream = PotSlotInfoStream(slot_receiver);
86
87 (source_worker, pot_slot_info_stream)
88 }
89
90 pub async fn run(mut self) {
92 loop {
93 let timekeeper_proof = async {
94 if let Some(timekeeper_proof_receiver) = &mut self.timekeeper_proof_receiver {
95 timekeeper_proof_receiver.next().await
96 } else {
97 future::pending().await
98 }
99 };
100
101 select! {
102 maybe_timekeeper_proof = timekeeper_proof.fuse() => {
103 if let Some(timekeeper_proof) = maybe_timekeeper_proof {
104 self.handle_timekeeper_proof(timekeeper_proof);
105 } else {
106 debug!("Timekeeper proof stream ended, exiting");
107 return;
108 }
109 }
110 maybe_gossip_proof = self.from_gossip_receiver.next() => {
111 if let Some(gossip_proof) = maybe_gossip_proof {
112 self.handle_gossip_proof(gossip_proof);
113 } else {
114 debug!("Incoming gossip messages stream ended, exiting");
115 return;
116 }
117 }
118 maybe_best_block_pot_info = self.best_block_pot_info_receiver.next() => {
119 if let Some(best_block_pot_info) = maybe_best_block_pot_info {
120 self.handle_best_block_pot_info(best_block_pot_info);
121 } else {
122 debug!("Import notifications stream ended, exiting");
123 return;
124 }
125 }
126 }
127 }
128 }
129
130 fn handle_timekeeper_proof(&mut self, proof: TimekeeperProof) {
131 let TimekeeperProof {
132 slot,
133 seed,
134 slot_iterations,
135 checkpoints,
136 } = proof;
137
138 if self.chain_sync_status.is_syncing() {
139 trace!(
140 ?slot,
141 %seed,
142 %slot_iterations,
143 output = %checkpoints.output(),
144 "Ignore timekeeper proof due to major syncing",
145 );
146
147 return;
148 }
149
150 debug!(
151 ?slot,
152 %seed,
153 %slot_iterations,
154 output = %checkpoints.output(),
155 "Received timekeeper proof",
156 );
157
158 if self
159 .to_gossip_sender
160 .try_send(ToGossipMessage::Proof(GossipProof {
161 slot,
162 seed,
163 slot_iterations,
164 checkpoints,
165 }))
166 .is_err()
167 {
168 debug!(
169 %slot,
170 "Gossip is not able to keep-up with slot production (timekeeper)",
171 );
172 }
173
174 if slot > self.last_slot_sent {
175 self.last_slot_sent = slot;
176
177 let _ = self.slot_sender.send(PotSlotInfo { slot, checkpoints });
180 }
181 }
182
183 fn handle_gossip_proof(&mut self, proof: GossipProof) {
186 let expected_next_slot_input = PotNextSlotInput {
187 slot: proof.slot,
188 slot_iterations: proof.slot_iterations,
189 seed: proof.seed,
190 };
191
192 if let Ok(next_slot_input) = self.pot_state.try_extend(
193 expected_next_slot_input,
194 proof.slot,
195 proof.checkpoints.output(),
196 None,
197 ) {
198 if proof.slot > self.last_slot_sent {
199 self.last_slot_sent = proof.slot;
200
201 let _ = self.slot_sender.send(PotSlotInfo {
204 slot: proof.slot,
205 checkpoints: proof.checkpoints,
206 });
207 }
208
209 if self
210 .to_gossip_sender
211 .try_send(ToGossipMessage::NextSlotInput(next_slot_input))
212 .is_err()
213 {
214 debug!(
215 slot = %proof.slot,
216 next_slot = %next_slot_input.slot,
217 "Gossip is not able to keep-up with slot production (gossip)",
218 );
219 }
220 }
221 }
222
223 fn handle_best_block_pot_info(&mut self, best_block_pot_info: BestBlockPotInfo) {
224 match self.pot_state.set_known_good_output(
230 best_block_pot_info.slot,
231 best_block_pot_info.pot_output,
232 best_block_pot_info.pot_parameters_change,
233 ) {
234 PotStateSetOutcome::NoChange => {
235 trace!(
236 slot = %best_block_pot_info.slot,
237 "Block import didn't result in proof of time chain changes",
238 );
239 }
240 PotStateSetOutcome::Extension { from, to } => {
241 warn!(
242 from_next_slot = %from.slot,
243 to_next_slot = %to.slot,
244 "Proof of time chain was extended from block import",
245 );
246
247 if self
248 .to_gossip_sender
249 .try_send(ToGossipMessage::NextSlotInput(to))
250 .is_err()
251 {
252 debug!(
253 next_slot = %to.slot,
254 "Gossip is not able to keep-up with slot production (block import)",
255 );
256 }
257 }
258 PotStateSetOutcome::Reorg { from, to } => {
259 warn!(
260 from_next_slot = %from.slot,
261 to_next_slot = %to.slot,
262 "Proof of time chain reorg happened",
263 );
264
265 if self
266 .to_gossip_sender
267 .try_send(ToGossipMessage::NextSlotInput(to))
268 .is_err()
269 {
270 debug!(
271 next_slot = %to.slot,
272 "Gossip is not able to keep-up with slot production (block import)",
273 );
274 }
275 }
276 }
277 }
278}