ab_client_proof_of_time/
source.rs

1pub mod block_import;
2pub mod gossip;
3pub mod state;
4pub mod timekeeper;
5
6use crate::PotNextSlotInput;
7use crate::source::block_import::BestBlockPotInfo;
8use crate::source::gossip::{GossipProof, ToGossipMessage};
9use crate::source::state::{PotState, PotStateSetOutcome};
10use crate::source::timekeeper::TimekeeperProof;
11use crate::verifier::PotVerifier;
12use ab_client_api::ChainSyncStatus;
13use ab_core_primitives::block::BlockNumber;
14use ab_core_primitives::block::header::BeaconChainHeader;
15use ab_core_primitives::pot::{PotCheckpoints, PotParametersChange, SlotNumber};
16use derive_more::{Deref, DerefMut};
17use futures::channel::mpsc;
18use futures::{FutureExt, StreamExt, select};
19use rclite::Arc;
20use std::future;
21use tokio::sync::broadcast;
22use tracing::{debug, trace, warn};
23
24const SLOTS_CHANNEL_CAPACITY: usize = 10;
25
26/// Initialize [`PotState`] using the best available beacon chain header
27pub fn init_pot_state(
28    best_beacon_chain_header: &BeaconChainHeader<'_>,
29    pot_verifier: PotVerifier,
30    block_authoring_delay: SlotNumber,
31) -> PotState {
32    let block_number = best_beacon_chain_header.prefix.number;
33    let consensus_info = best_beacon_chain_header.consensus_info;
34    let consensus_parameters = best_beacon_chain_header.consensus_parameters();
35
36    let parent_slot = if block_number == BlockNumber::ZERO {
37        SlotNumber::ZERO
38    } else {
39        // The best one seen
40        consensus_info.slot + block_authoring_delay
41    };
42
43    let maybe_next_parameters_change = consensus_parameters
44        .pot_parameters_change
45        .copied()
46        .map(PotParametersChange::from);
47
48    let pot_input = if block_number == BlockNumber::ZERO {
49        PotNextSlotInput {
50            slot: parent_slot + SlotNumber::ONE,
51            slot_iterations: consensus_parameters.fixed_parameters.slot_iterations,
52            seed: pot_verifier.genesis_seed(),
53        }
54    } else {
55        PotNextSlotInput::derive(
56            consensus_parameters.fixed_parameters.slot_iterations,
57            parent_slot,
58            consensus_info.future_proof_of_time,
59            &maybe_next_parameters_change,
60        )
61    };
62
63    PotState::new(pot_input, maybe_next_parameters_change, pot_verifier)
64}
65
66/// Proof of time slot information
67#[derive(Debug, Copy, Clone)]
68pub struct PotSlotInfo {
69    /// Slot number
70    pub slot: SlotNumber,
71    /// Proof of time checkpoints
72    pub checkpoints: PotCheckpoints,
73}
74
75/// Stream with proof of time slots
76#[derive(Debug, Deref, DerefMut)]
77pub struct PotSlotInfoStream(broadcast::Receiver<PotSlotInfo>);
78
79impl Clone for PotSlotInfoStream {
80    #[inline]
81    fn clone(&self) -> Self {
82        Self(self.0.resubscribe())
83    }
84}
85
86/// Worker producing proofs of time.
87///
88/// Depending on configuration may produce proofs of time locally, send/receive via gossip and keep
89/// up to day with blockchain reorgs.
90#[derive(Debug)]
91#[must_use = "Proof of time source doesn't do anything unless run() method is called"]
92pub struct PotSourceWorker<CSS> {
93    chain_sync_status: CSS,
94    timekeeper_proof_receiver: Option<mpsc::Receiver<TimekeeperProof>>,
95    to_gossip_sender: mpsc::Sender<ToGossipMessage>,
96    from_gossip_receiver: mpsc::Receiver<GossipProof>,
97    best_block_pot_info_receiver: mpsc::Receiver<BestBlockPotInfo>,
98    last_slot_sent: SlotNumber,
99    slot_sender: broadcast::Sender<PotSlotInfo>,
100    pot_state: Arc<PotState>,
101}
102
103impl<CSS> PotSourceWorker<CSS>
104where
105    CSS: ChainSyncStatus,
106{
107    pub fn new(
108        timekeeper_proof_receiver: Option<mpsc::Receiver<TimekeeperProof>>,
109        to_gossip_sender: mpsc::Sender<ToGossipMessage>,
110        from_gossip_receiver: mpsc::Receiver<GossipProof>,
111        best_block_pot_info_receiver: mpsc::Receiver<BestBlockPotInfo>,
112        chain_sync_status: CSS,
113        pot_state: Arc<PotState>,
114    ) -> (Self, PotSlotInfoStream) {
115        let (slot_sender, slot_receiver) = broadcast::channel(SLOTS_CHANNEL_CAPACITY);
116
117        let source_worker = Self {
118            chain_sync_status,
119            timekeeper_proof_receiver,
120            to_gossip_sender,
121            from_gossip_receiver,
122            best_block_pot_info_receiver,
123            last_slot_sent: SlotNumber::ZERO,
124            slot_sender,
125            pot_state,
126        };
127
128        let pot_slot_info_stream = PotSlotInfoStream(slot_receiver);
129
130        (source_worker, pot_slot_info_stream)
131    }
132
133    /// Run proof of time source
134    pub async fn run(mut self) {
135        loop {
136            let timekeeper_proof = async {
137                if let Some(timekeeper_proof_receiver) = &mut self.timekeeper_proof_receiver {
138                    timekeeper_proof_receiver.next().await
139                } else {
140                    future::pending().await
141                }
142            };
143
144            select! {
145                maybe_timekeeper_proof = timekeeper_proof.fuse() => {
146                    if let Some(timekeeper_proof) = maybe_timekeeper_proof {
147                        self.handle_timekeeper_proof(timekeeper_proof);
148                    } else {
149                        debug!("Timekeeper proof stream ended, exiting");
150                        return;
151                    }
152                }
153                maybe_gossip_proof = self.from_gossip_receiver.next() => {
154                    if let Some(gossip_proof) = maybe_gossip_proof {
155                        self.handle_gossip_proof(gossip_proof);
156                    } else {
157                        debug!("Incoming gossip messages stream ended, exiting");
158                        return;
159                    }
160                }
161                maybe_best_block_pot_info = self.best_block_pot_info_receiver.next() => {
162                    if let Some(best_block_pot_info) = maybe_best_block_pot_info {
163                        self.handle_best_block_pot_info(best_block_pot_info);
164                    } else {
165                        debug!("Import notifications stream ended, exiting");
166                        return;
167                    }
168                }
169            }
170        }
171    }
172
173    fn handle_timekeeper_proof(&mut self, proof: TimekeeperProof) {
174        let TimekeeperProof {
175            slot,
176            seed,
177            slot_iterations,
178            checkpoints,
179        } = proof;
180
181        if self.chain_sync_status.is_syncing() {
182            trace!(
183                ?slot,
184                %seed,
185                %slot_iterations,
186                output = %checkpoints.output(),
187                "Ignore timekeeper proof due to major syncing",
188            );
189
190            return;
191        }
192
193        debug!(
194            ?slot,
195            %seed,
196            %slot_iterations,
197            output = %checkpoints.output(),
198            "Received timekeeper proof",
199        );
200
201        if self
202            .to_gossip_sender
203            .try_send(ToGossipMessage::Proof(GossipProof {
204                slot,
205                seed,
206                slot_iterations,
207                checkpoints,
208            }))
209            .is_err()
210        {
211            debug!(
212                %slot,
213                "Gossip is not able to keep-up with slot production (timekeeper)",
214            );
215        }
216
217        if slot > self.last_slot_sent {
218            self.last_slot_sent = slot;
219
220            // We don't care if block production is too slow or block production is not enabled on this
221            // node at all
222            let _ = self.slot_sender.send(PotSlotInfo { slot, checkpoints });
223        }
224    }
225
226    // TODO: Follow both verified and unverified checkpoints to start secondary timekeeper ASAP in
227    //  case verification succeeds
228    fn handle_gossip_proof(&mut self, proof: GossipProof) {
229        let expected_next_slot_input = PotNextSlotInput {
230            slot: proof.slot,
231            slot_iterations: proof.slot_iterations,
232            seed: proof.seed,
233        };
234
235        if let Ok(next_slot_input) = self.pot_state.try_extend(
236            expected_next_slot_input,
237            proof.slot,
238            proof.checkpoints.output(),
239            None,
240        ) {
241            if proof.slot > self.last_slot_sent {
242                self.last_slot_sent = proof.slot;
243
244                // We don't care if block production is too slow or block production is not enabled on
245                // this node at all
246                let _ = self.slot_sender.send(PotSlotInfo {
247                    slot: proof.slot,
248                    checkpoints: proof.checkpoints,
249                });
250            }
251
252            if self
253                .to_gossip_sender
254                .try_send(ToGossipMessage::NextSlotInput(next_slot_input))
255                .is_err()
256            {
257                debug!(
258                    slot = %proof.slot,
259                    next_slot = %next_slot_input.slot,
260                    "Gossip is not able to keep-up with slot production (gossip)",
261                );
262            }
263        }
264    }
265
266    fn handle_best_block_pot_info(&mut self, best_block_pot_info: BestBlockPotInfo) {
267        // This will do one of 3 things depending on circumstances:
268        // * if block import is ahead of timekeeper and gossip, it will update next slot input
269        // * if block import is on a different PoT chain, it will update next slot input to the
270        //   correct fork (reorg)
271        // * if block import is on the same PoT chain this will essentially do nothing
272        match self.pot_state.set_known_good_output(
273            best_block_pot_info.slot,
274            best_block_pot_info.pot_output,
275            best_block_pot_info.pot_parameters_change,
276        ) {
277            PotStateSetOutcome::NoChange => {
278                trace!(
279                    slot = %best_block_pot_info.slot,
280                    "Block import didn't result in proof of time chain changes",
281                );
282            }
283            PotStateSetOutcome::Extension { from, to } => {
284                warn!(
285                    from_next_slot = %from.slot,
286                    to_next_slot = %to.slot,
287                    "Proof of time chain was extended from block import",
288                );
289
290                if self
291                    .to_gossip_sender
292                    .try_send(ToGossipMessage::NextSlotInput(to))
293                    .is_err()
294                {
295                    debug!(
296                        next_slot = %to.slot,
297                        "Gossip is not able to keep-up with slot production (block import)",
298                    );
299                }
300            }
301            PotStateSetOutcome::Reorg { from, to } => {
302                warn!(
303                    from_next_slot = %from.slot,
304                    to_next_slot = %to.slot,
305                    "Proof of time chain reorg happened",
306                );
307
308                if self
309                    .to_gossip_sender
310                    .try_send(ToGossipMessage::NextSlotInput(to))
311                    .is_err()
312                {
313                    debug!(
314                        next_slot = %to.slot,
315                        "Gossip is not able to keep-up with slot production (block import)",
316                    );
317                }
318            }
319        }
320    }
321}