ab_client_proof_of_time/
source.rs

1pub mod block_import;
2pub mod gossip;
3pub mod state;
4pub mod timekeeper;
5
6use crate::PotNextSlotInput;
7use crate::source::block_import::BestBlockPotInfo;
8use crate::source::gossip::{GossipProof, ToGossipMessage};
9use crate::source::state::{PotState, PotStateSetOutcome};
10use crate::source::timekeeper::TimekeeperProof;
11use ab_client_api::ChainSyncStatus;
12use ab_core_primitives::pot::{PotCheckpoints, SlotNumber};
13use derive_more::{Deref, DerefMut};
14use futures::channel::mpsc;
15use futures::{FutureExt, StreamExt, select};
16use std::future;
17use std::sync::Arc;
18use tokio::sync::broadcast;
19use tracing::{debug, trace, warn};
20
21const SLOTS_CHANNEL_CAPACITY: usize = 10;
22
23/// Proof of time slot information
24#[derive(Debug, Copy, Clone)]
25pub struct PotSlotInfo {
26    /// Slot number
27    pub slot: SlotNumber,
28    /// Proof of time checkpoints
29    pub checkpoints: PotCheckpoints,
30}
31
32/// Stream with proof of time slots
33#[derive(Debug, Deref, DerefMut)]
34pub struct PotSlotInfoStream(broadcast::Receiver<PotSlotInfo>);
35
36impl Clone for PotSlotInfoStream {
37    #[inline]
38    fn clone(&self) -> Self {
39        Self(self.0.resubscribe())
40    }
41}
42
43/// Worker producing proofs of time.
44///
45/// Depending on configuration may produce proofs of time locally, send/receive via gossip and keep
46/// up to day with blockchain reorgs.
47#[derive(Debug)]
48#[must_use = "Proof of time source doesn't do anything unless run() method is called"]
49pub struct PotSourceWorker<CSS> {
50    chain_sync_status: CSS,
51    timekeeper_proof_receiver: Option<mpsc::Receiver<TimekeeperProof>>,
52    to_gossip_sender: mpsc::Sender<ToGossipMessage>,
53    from_gossip_receiver: mpsc::Receiver<GossipProof>,
54    best_block_pot_info_receiver: mpsc::Receiver<BestBlockPotInfo>,
55    last_slot_sent: SlotNumber,
56    slot_sender: broadcast::Sender<PotSlotInfo>,
57    pot_state: Arc<PotState>,
58}
59
60impl<CSS> PotSourceWorker<CSS>
61where
62    CSS: ChainSyncStatus,
63{
64    pub fn new(
65        timekeeper_proof_receiver: Option<mpsc::Receiver<TimekeeperProof>>,
66        to_gossip_sender: mpsc::Sender<ToGossipMessage>,
67        from_gossip_receiver: mpsc::Receiver<GossipProof>,
68        best_block_pot_info_receiver: mpsc::Receiver<BestBlockPotInfo>,
69        chain_sync_status: CSS,
70        pot_state: Arc<PotState>,
71    ) -> (Self, PotSlotInfoStream) {
72        let (slot_sender, slot_receiver) = broadcast::channel(SLOTS_CHANNEL_CAPACITY);
73
74        let source_worker = Self {
75            chain_sync_status,
76            timekeeper_proof_receiver,
77            to_gossip_sender,
78            from_gossip_receiver,
79            best_block_pot_info_receiver,
80            last_slot_sent: SlotNumber::ZERO,
81            slot_sender,
82            pot_state,
83        };
84
85        let pot_slot_info_stream = PotSlotInfoStream(slot_receiver);
86
87        (source_worker, pot_slot_info_stream)
88    }
89
90    /// Run proof of time source
91    pub async fn run(mut self) {
92        loop {
93            let timekeeper_proof = async {
94                if let Some(timekeeper_proof_receiver) = &mut self.timekeeper_proof_receiver {
95                    timekeeper_proof_receiver.next().await
96                } else {
97                    future::pending().await
98                }
99            };
100
101            select! {
102                maybe_timekeeper_proof = timekeeper_proof.fuse() => {
103                    if let Some(timekeeper_proof) = maybe_timekeeper_proof {
104                        self.handle_timekeeper_proof(timekeeper_proof);
105                    } else {
106                        debug!("Timekeeper proof stream ended, exiting");
107                        return;
108                    }
109                }
110                maybe_gossip_proof = self.from_gossip_receiver.next() => {
111                    if let Some(gossip_proof) = maybe_gossip_proof {
112                        self.handle_gossip_proof(gossip_proof);
113                    } else {
114                        debug!("Incoming gossip messages stream ended, exiting");
115                        return;
116                    }
117                }
118                maybe_best_block_pot_info = self.best_block_pot_info_receiver.next() => {
119                    if let Some(best_block_pot_info) = maybe_best_block_pot_info {
120                        self.handle_best_block_pot_info(best_block_pot_info);
121                    } else {
122                        debug!("Import notifications stream ended, exiting");
123                        return;
124                    }
125                }
126            }
127        }
128    }
129
130    fn handle_timekeeper_proof(&mut self, proof: TimekeeperProof) {
131        let TimekeeperProof {
132            slot,
133            seed,
134            slot_iterations,
135            checkpoints,
136        } = proof;
137
138        if self.chain_sync_status.is_syncing() {
139            trace!(
140                ?slot,
141                %seed,
142                %slot_iterations,
143                output = %checkpoints.output(),
144                "Ignore timekeeper proof due to major syncing",
145            );
146
147            return;
148        }
149
150        debug!(
151            ?slot,
152            %seed,
153            %slot_iterations,
154            output = %checkpoints.output(),
155            "Received timekeeper proof",
156        );
157
158        if self
159            .to_gossip_sender
160            .try_send(ToGossipMessage::Proof(GossipProof {
161                slot,
162                seed,
163                slot_iterations,
164                checkpoints,
165            }))
166            .is_err()
167        {
168            debug!(
169                %slot,
170                "Gossip is not able to keep-up with slot production (timekeeper)",
171            );
172        }
173
174        if slot > self.last_slot_sent {
175            self.last_slot_sent = slot;
176
177            // We don't care if block production is too slow or block production is not enabled on this
178            // node at all
179            let _ = self.slot_sender.send(PotSlotInfo { slot, checkpoints });
180        }
181    }
182
183    // TODO: Follow both verified and unverified checkpoints to start secondary timekeeper ASAP in
184    //  case verification succeeds
185    fn handle_gossip_proof(&mut self, proof: GossipProof) {
186        let expected_next_slot_input = PotNextSlotInput {
187            slot: proof.slot,
188            slot_iterations: proof.slot_iterations,
189            seed: proof.seed,
190        };
191
192        if let Ok(next_slot_input) = self.pot_state.try_extend(
193            expected_next_slot_input,
194            proof.slot,
195            proof.checkpoints.output(),
196            None,
197        ) {
198            if proof.slot > self.last_slot_sent {
199                self.last_slot_sent = proof.slot;
200
201                // We don't care if block production is too slow or block production is not enabled on
202                // this node at all
203                let _ = self.slot_sender.send(PotSlotInfo {
204                    slot: proof.slot,
205                    checkpoints: proof.checkpoints,
206                });
207            }
208
209            if self
210                .to_gossip_sender
211                .try_send(ToGossipMessage::NextSlotInput(next_slot_input))
212                .is_err()
213            {
214                debug!(
215                    slot = %proof.slot,
216                    next_slot = %next_slot_input.slot,
217                    "Gossip is not able to keep-up with slot production (gossip)",
218                );
219            }
220        }
221    }
222
223    fn handle_best_block_pot_info(&mut self, best_block_pot_info: BestBlockPotInfo) {
224        // This will do one of 3 things depending on circumstances:
225        // * if block import is ahead of timekeeper and gossip, it will update next slot input
226        // * if block import is on a different PoT chain, it will update next slot input to the
227        //   correct fork (reorg)
228        // * if block import is on the same PoT chain this will essentially do nothing
229        match self.pot_state.set_known_good_output(
230            best_block_pot_info.slot,
231            best_block_pot_info.pot_output,
232            best_block_pot_info.pot_parameters_change,
233        ) {
234            PotStateSetOutcome::NoChange => {
235                trace!(
236                    slot = %best_block_pot_info.slot,
237                    "Block import didn't result in proof of time chain changes",
238                );
239            }
240            PotStateSetOutcome::Extension { from, to } => {
241                warn!(
242                    from_next_slot = %from.slot,
243                    to_next_slot = %to.slot,
244                    "Proof of time chain was extended from block import",
245                );
246
247                if self
248                    .to_gossip_sender
249                    .try_send(ToGossipMessage::NextSlotInput(to))
250                    .is_err()
251                {
252                    debug!(
253                        next_slot = %to.slot,
254                        "Gossip is not able to keep-up with slot production (block import)",
255                    );
256                }
257            }
258            PotStateSetOutcome::Reorg { from, to } => {
259                warn!(
260                    from_next_slot = %from.slot,
261                    to_next_slot = %to.slot,
262                    "Proof of time chain reorg happened",
263                );
264
265                if self
266                    .to_gossip_sender
267                    .try_send(ToGossipMessage::NextSlotInput(to))
268                    .is_err()
269                {
270                    debug!(
271                        next_slot = %to.slot,
272                        "Gossip is not able to keep-up with slot production (block import)",
273                    );
274                }
275            }
276        }
277    }
278}