ab_client_proof_of_time/
source.rs

1pub mod block_import;
2pub mod gossip;
3pub mod state;
4pub mod timekeeper;
5
6use crate::PotNextSlotInput;
7use crate::source::block_import::BestBlockPotInfo;
8use crate::source::gossip::{GossipProof, ToGossipMessage};
9use crate::source::state::{PotState, PotStateSetOutcome};
10use crate::source::timekeeper::TimekeeperProof;
11use crate::verifier::PotVerifier;
12use ab_client_api::ChainSyncStatus;
13use ab_core_primitives::block::BlockNumber;
14use ab_core_primitives::block::header::BeaconChainHeader;
15use ab_core_primitives::pot::{PotCheckpoints, PotParametersChange, SlotNumber};
16use derive_more::{Deref, DerefMut};
17use futures::channel::mpsc;
18use futures::{FutureExt, StreamExt, select};
19use rclite::Arc;
20use std::future;
21use tokio::sync::broadcast;
22use tracing::{debug, trace, warn};
23
24const SLOTS_CHANNEL_CAPACITY: usize = 10;
25
26/// Initialize [`PotState`] using the best available beacon chain header
27pub fn init_pot_state(
28    best_beacon_chain_header: &BeaconChainHeader<'_>,
29    pot_verifier: PotVerifier,
30    block_authoring_delay: SlotNumber,
31) -> PotState {
32    let block_number = best_beacon_chain_header.prefix.number;
33    let consensus_info = best_beacon_chain_header.consensus_info;
34    let consensus_parameters = best_beacon_chain_header.consensus_parameters();
35
36    let parent_slot = if block_number == BlockNumber::ZERO {
37        SlotNumber::ZERO
38    } else {
39        // The best one seen
40        consensus_info.slot + block_authoring_delay
41    };
42
43    let maybe_next_parameters_change = consensus_parameters
44        .pot_parameters_change
45        .copied()
46        .map(PotParametersChange::from);
47
48    let pot_input = if block_number == BlockNumber::ZERO {
49        PotNextSlotInput {
50            slot: parent_slot + SlotNumber::ONE,
51            slot_iterations: consensus_parameters.fixed_parameters.slot_iterations,
52            seed: pot_verifier.genesis_seed(),
53        }
54    } else {
55        PotNextSlotInput::derive(
56            consensus_parameters.fixed_parameters.slot_iterations,
57            parent_slot,
58            consensus_info.future_proof_of_time,
59            &maybe_next_parameters_change,
60        )
61    };
62
63    PotState::new(pot_input, maybe_next_parameters_change, pot_verifier)
64}
65
66/// Proof of time slot information
67#[derive(Debug, Copy, Clone)]
68pub struct PotSlotInfo {
69    /// Slot number
70    pub slot: SlotNumber,
71    /// Proof of time checkpoints
72    pub checkpoints: PotCheckpoints,
73}
74
75/// Stream with proof of time slots
76#[derive(Debug, Deref, DerefMut)]
77pub struct PotSlotInfoStream(broadcast::Receiver<PotSlotInfo>);
78
79impl Clone for PotSlotInfoStream {
80    #[inline]
81    fn clone(&self) -> Self {
82        Self(self.0.resubscribe())
83    }
84}
85
86impl PotSlotInfoStream {
87    fn new(receiver: broadcast::Receiver<PotSlotInfo>) -> Self {
88        Self(receiver)
89    }
90}
91
92/// Worker producing proofs of time.
93///
94/// Depending on configuration may produce proofs of time locally, send/receive via gossip and keep
95/// up to day with blockchain reorgs.
96#[derive(Debug)]
97#[must_use = "Proof of time source doesn't do anything unless run() method is called"]
98pub struct PotSourceWorker<CSS> {
99    chain_sync_status: CSS,
100    timekeeper_proof_receiver: Option<mpsc::Receiver<TimekeeperProof>>,
101    to_gossip_sender: mpsc::Sender<ToGossipMessage>,
102    from_gossip_receiver: mpsc::Receiver<GossipProof>,
103    best_block_pot_info_receiver: mpsc::Receiver<BestBlockPotInfo>,
104    last_slot_sent: SlotNumber,
105    slot_sender: broadcast::Sender<PotSlotInfo>,
106    pot_state: Arc<PotState>,
107}
108
109impl<CSS> PotSourceWorker<CSS>
110where
111    CSS: ChainSyncStatus,
112{
113    pub fn new(
114        timekeeper_proof_receiver: Option<mpsc::Receiver<TimekeeperProof>>,
115        to_gossip_sender: mpsc::Sender<ToGossipMessage>,
116        from_gossip_receiver: mpsc::Receiver<GossipProof>,
117        best_block_pot_info_receiver: mpsc::Receiver<BestBlockPotInfo>,
118        chain_sync_status: CSS,
119        pot_state: Arc<PotState>,
120    ) -> (Self, PotSlotInfoStream) {
121        let (slot_sender, slot_receiver) = broadcast::channel(SLOTS_CHANNEL_CAPACITY);
122
123        let source_worker = Self {
124            chain_sync_status,
125            timekeeper_proof_receiver,
126            to_gossip_sender,
127            from_gossip_receiver,
128            best_block_pot_info_receiver,
129            last_slot_sent: SlotNumber::ZERO,
130            slot_sender,
131            pot_state,
132        };
133
134        let pot_slot_info_stream = PotSlotInfoStream::new(slot_receiver);
135
136        (source_worker, pot_slot_info_stream)
137    }
138
139    /// Run proof of time source
140    pub async fn run(mut self) {
141        loop {
142            let timekeeper_proof = async {
143                if let Some(timekeeper_proof_receiver) = &mut self.timekeeper_proof_receiver {
144                    timekeeper_proof_receiver.next().await
145                } else {
146                    future::pending().await
147                }
148            };
149
150            select! {
151                maybe_timekeeper_proof = timekeeper_proof.fuse() => {
152                    if let Some(timekeeper_proof) = maybe_timekeeper_proof {
153                        self.handle_timekeeper_proof(timekeeper_proof);
154                    } else {
155                        debug!("Timekeeper proof stream ended, exiting");
156                        return;
157                    }
158                }
159                maybe_gossip_proof = self.from_gossip_receiver.next() => {
160                    if let Some(gossip_proof) = maybe_gossip_proof {
161                        self.handle_gossip_proof(gossip_proof);
162                    } else {
163                        debug!("Incoming gossip messages stream ended, exiting");
164                        return;
165                    }
166                }
167                maybe_best_block_pot_info = self.best_block_pot_info_receiver.next() => {
168                    if let Some(best_block_pot_info) = maybe_best_block_pot_info {
169                        self.handle_best_block_pot_info(best_block_pot_info);
170                    } else {
171                        debug!("Import notifications stream ended, exiting");
172                        return;
173                    }
174                }
175            }
176        }
177    }
178
179    fn handle_timekeeper_proof(&mut self, proof: TimekeeperProof) {
180        let TimekeeperProof {
181            slot,
182            seed,
183            slot_iterations,
184            checkpoints,
185        } = proof;
186
187        if self.chain_sync_status.is_syncing() {
188            trace!(
189                ?slot,
190                %seed,
191                %slot_iterations,
192                output = %checkpoints.output(),
193                "Ignore timekeeper proof due to major syncing",
194            );
195
196            return;
197        }
198
199        debug!(
200            ?slot,
201            %seed,
202            %slot_iterations,
203            output = %checkpoints.output(),
204            "Received timekeeper proof",
205        );
206
207        if self
208            .to_gossip_sender
209            .try_send(ToGossipMessage::Proof(GossipProof {
210                slot,
211                seed,
212                slot_iterations,
213                checkpoints,
214            }))
215            .is_err()
216        {
217            debug!(
218                %slot,
219                "Gossip is not able to keep-up with slot production (timekeeper)",
220            );
221        }
222
223        if slot > self.last_slot_sent {
224            self.last_slot_sent = slot;
225
226            // We don't care if block production is too slow or block production is not enabled on this
227            // node at all
228            let _ = self.slot_sender.send(PotSlotInfo { slot, checkpoints });
229        }
230    }
231
232    // TODO: Follow both verified and unverified checkpoints to start secondary timekeeper ASAP in
233    //  case verification succeeds
234    fn handle_gossip_proof(&mut self, proof: GossipProof) {
235        let expected_next_slot_input = PotNextSlotInput {
236            slot: proof.slot,
237            slot_iterations: proof.slot_iterations,
238            seed: proof.seed,
239        };
240
241        if let Ok(next_slot_input) = self.pot_state.try_extend(
242            expected_next_slot_input,
243            proof.slot,
244            proof.checkpoints.output(),
245            None,
246        ) {
247            if proof.slot > self.last_slot_sent {
248                self.last_slot_sent = proof.slot;
249
250                // We don't care if block production is too slow or block production is not enabled on
251                // this node at all
252                let _ = self.slot_sender.send(PotSlotInfo {
253                    slot: proof.slot,
254                    checkpoints: proof.checkpoints,
255                });
256            }
257
258            if self
259                .to_gossip_sender
260                .try_send(ToGossipMessage::NextSlotInput(next_slot_input))
261                .is_err()
262            {
263                debug!(
264                    slot = %proof.slot,
265                    next_slot = %next_slot_input.slot,
266                    "Gossip is not able to keep-up with slot production (gossip)",
267                );
268            }
269        }
270    }
271
272    fn handle_best_block_pot_info(&mut self, best_block_pot_info: BestBlockPotInfo) {
273        // This will do one of 3 things depending on circumstances:
274        // * if block import is ahead of timekeeper and gossip, it will update next slot input
275        // * if block import is on a different PoT chain, it will update next slot input to the
276        //   correct fork (reorg)
277        // * if block import is on the same PoT chain this will essentially do nothing
278        match self.pot_state.set_known_good_output(
279            best_block_pot_info.slot,
280            best_block_pot_info.pot_output,
281            best_block_pot_info.pot_parameters_change,
282        ) {
283            PotStateSetOutcome::NoChange => {
284                trace!(
285                    slot = %best_block_pot_info.slot,
286                    "Block import didn't result in proof of time chain changes",
287                );
288            }
289            PotStateSetOutcome::Extension { from, to } => {
290                warn!(
291                    from_next_slot = %from.slot,
292                    to_next_slot = %to.slot,
293                    "Proof of time chain was extended from block import",
294                );
295
296                if self
297                    .to_gossip_sender
298                    .try_send(ToGossipMessage::NextSlotInput(to))
299                    .is_err()
300                {
301                    debug!(
302                        next_slot = %to.slot,
303                        "Gossip is not able to keep-up with slot production (block import)",
304                    );
305                }
306            }
307            PotStateSetOutcome::Reorg { from, to } => {
308                warn!(
309                    from_next_slot = %from.slot,
310                    to_next_slot = %to.slot,
311                    "Proof of time chain reorg happened",
312                );
313
314                if self
315                    .to_gossip_sender
316                    .try_send(ToGossipMessage::NextSlotInput(to))
317                    .is_err()
318                {
319                    debug!(
320                        next_slot = %to.slot,
321                        "Gossip is not able to keep-up with slot production (block import)",
322                    );
323                }
324            }
325        }
326    }
327}