ab_client_proof_of_time/source/
timekeeper.rs

1use crate::source::state::PotState;
2use crate::verifier::PotVerifier;
3use ab_core_primitives::pot::{PotCheckpoints, PotSeed, SlotDuration, SlotNumber};
4use ab_proof_of_time::PotError;
5use futures::SinkExt;
6use futures::channel::mpsc;
7use futures::executor::block_on;
8use rclite::Arc;
9use std::num::NonZeroU32;
10use std::thread::sleep;
11use std::time::Instant;
12use tracing::{debug, trace};
13
14/// Poof generated by timekeeper
15#[derive(Debug, Copy, Clone)]
16pub struct TimekeeperProof {
17    /// Slot number
18    pub slot: SlotNumber,
19    /// Proof of time seed
20    pub seed: PotSeed,
21    /// Iterations per slot
22    pub slot_iterations: NonZeroU32,
23    /// Proof of time checkpoints
24    pub checkpoints: PotCheckpoints,
25}
26
27/// Timekeeper source
28#[derive(Debug)]
29#[must_use = "Doesn't do anything unless run() method is called"]
30pub struct Timekeeper {
31    state: Arc<PotState>,
32    pot_verifier: PotVerifier,
33    proof_sender: mpsc::Sender<TimekeeperProof>,
34    slot_duration: SlotDuration,
35}
36
37impl Timekeeper {
38    /// Create a new timekeeper source
39    pub fn new(
40        state: Arc<PotState>,
41        pot_verifier: PotVerifier,
42        slot_duration: SlotDuration,
43    ) -> (Self, mpsc::Receiver<TimekeeperProof>) {
44        let (proof_sender, proof_receiver) = mpsc::channel(1);
45
46        (
47            Self {
48                state,
49                pot_verifier,
50                proof_sender,
51                slot_duration,
52            },
53            proof_receiver,
54        )
55    }
56
57    /// Run timekeeper until receiver returned from constructor is dropped.
58    ///
59    /// Must be running on a dedicated high-frequency CPU core.
60    pub fn run(self) -> Result<(), PotError> {
61        let Self {
62            state,
63            pot_verifier,
64            mut proof_sender,
65            slot_duration,
66        } = self;
67
68        Self::try_debug_run(&state, &pot_verifier, &mut proof_sender, slot_duration)?;
69
70        let mut next_slot_input = state.next_slot_input();
71
72        loop {
73            trace!(
74                "Proving for slot {} with {} iterations",
75                next_slot_input.slot, next_slot_input.slot_iterations
76            );
77            let checkpoints =
78                ab_proof_of_time::prove(next_slot_input.seed, next_slot_input.slot_iterations)?;
79
80            let proof = TimekeeperProof {
81                seed: next_slot_input.seed,
82                slot_iterations: next_slot_input.slot_iterations,
83                slot: next_slot_input.slot,
84                checkpoints,
85            };
86
87            pot_verifier.inject_verified_checkpoints(
88                next_slot_input.seed,
89                next_slot_input.slot_iterations,
90                checkpoints,
91            );
92
93            next_slot_input = state
94                .try_extend(
95                    next_slot_input,
96                    next_slot_input.slot,
97                    checkpoints.output(),
98                    None,
99                )
100                .unwrap_or_else(|next_slot_input| next_slot_input);
101
102            if let Err(error) = proof_sender.try_send(proof)
103                && let Err(error) = block_on(proof_sender.send(error.into_inner()))
104            {
105                debug!(%error, "Couldn't send proof, the channel is closed");
106                return Ok(());
107            }
108        }
109    }
110
111    /// A special version of [`Self::run()`] designed for debug purposes.
112    ///
113    /// It is used when the specified number of iterations is way too low for slot duration, in
114    /// which case artificial delay is applied to maintain slot duration at approximately specified
115    /// rate. This allows debugging things without burning a full CPU core at all times.
116    #[inline(never)]
117    fn try_debug_run(
118        state: &Arc<PotState>,
119        pot_verifier: &PotVerifier,
120        proof_sender: &mut mpsc::Sender<TimekeeperProof>,
121        slot_duration: SlotDuration,
122    ) -> Result<(), PotError> {
123        let mut next_slot_input = state.next_slot_input();
124
125        let delay = {
126            let start = Instant::now();
127            ab_proof_of_time::prove(next_slot_input.seed, next_slot_input.slot_iterations)?;
128            let duration = start.elapsed();
129
130            if duration.is_zero() || duration > slot_duration.as_duration() / 2 {
131                // Either can't identify or relatively fast
132                return Ok(());
133            }
134
135            slot_duration.as_duration() - duration
136        };
137
138        loop {
139            trace!(
140                "Proving for slot {} with {} iterations",
141                next_slot_input.slot, next_slot_input.slot_iterations
142            );
143            let checkpoints =
144                ab_proof_of_time::prove(next_slot_input.seed, next_slot_input.slot_iterations)?;
145
146            let proof = TimekeeperProof {
147                seed: next_slot_input.seed,
148                slot_iterations: next_slot_input.slot_iterations,
149                slot: next_slot_input.slot,
150                checkpoints,
151            };
152
153            pot_verifier.inject_verified_checkpoints(
154                next_slot_input.seed,
155                next_slot_input.slot_iterations,
156                checkpoints,
157            );
158
159            next_slot_input = state
160                .try_extend(
161                    next_slot_input,
162                    next_slot_input.slot,
163                    checkpoints.output(),
164                    None,
165                )
166                .unwrap_or_else(|next_slot_input| next_slot_input);
167
168            if let Err(error) = proof_sender.try_send(proof)
169                && let Err(error) = block_on(proof_sender.send(error.into_inner()))
170            {
171                debug!(%error, "Couldn't send proof, the channel is closed");
172                return Ok(());
173            }
174
175            // Artificial delay
176            sleep(delay);
177        }
178    }
179}