ab_transaction_pool/
lib.rs

1#![feature(vec_deque_pop_if)]
2
3use ab_contracts_common::block::{BlockHash, BlockNumber};
4use ab_transaction::TransactionHash;
5use ab_transaction::owned::OwnedTransaction;
6use std::collections::{HashMap, HashSet, VecDeque};
7use std::num::{NonZeroU8, NonZeroU64, NonZeroUsize};
8
9/// Transaction pool limits
10#[derive(Debug, Copy, Clone)]
11pub struct TransactionPoolLimits {
12    /// Number of transactions
13    pub count: NonZeroUsize,
14    /// Total size of all transactions
15    pub size: NonZeroUsize,
16}
17
18#[derive(Debug)]
19pub struct TransactionAuthorizedDetails {
20    /// Block number at which transaction was authorized
21    pub block_number: BlockNumber,
22    /// Block hash at which transaction was authorized
23    pub block_hash: BlockHash,
24}
25
26#[derive(Debug)]
27pub enum TransactionState {
28    New,
29    Authorized {
30        at: VecDeque<TransactionAuthorizedDetails>,
31    },
32}
33
34#[derive(Debug)]
35#[non_exhaustive]
36pub struct PoolTransaction {
37    pub tx: OwnedTransaction,
38    pub state: TransactionState,
39    // TODO: Slots, other things?
40}
41
42/// Error for [`TransactionPool::add()`] method
43#[derive(Debug, thiserror::Error)]
44pub enum TransactionAddError {
45    /// Already exists
46    #[error("Already exists")]
47    AlreadyExists,
48    /// The block isn't found, possibly too old
49    #[error("The block isn't found, possibly too old")]
50    BlockNotFound,
51    /// Too many transactions
52    #[error("Too many transactions")]
53    TooManyTransactions,
54    /// Total size too large
55    #[error("Total size too large")]
56    TotalSizeTooLarge,
57}
58
59#[derive(Debug)]
60struct BlockHashDetails {
61    txs: HashSet<TransactionHash>,
62}
63
64// TODO: Some integration or tracking of slots, including optimization where only changes to read
65//  slots impact transaction authorization
66/// Transaction pool implementation.
67///
68/// The goal of the transaction pool is to retain a set of transactions and associated authorization
69/// information, which can be used for block production and propagation through the network.
70#[derive(Debug)]
71pub struct TransactionPool {
72    transactions: HashMap<TransactionHash, PoolTransaction>,
73    total_size: usize,
74    /// Map from block hash at which transactions were created to a set of transaction hashes
75    by_block_hash: HashMap<BlockHash, BlockHashDetails>,
76    // TODO: Optimize with an oldest block + `Vec<BlockHash>` instead
77    by_block_number: HashMap<BlockNumber, BlockHash>,
78    pruning_depth: NonZeroU64,
79    authorization_history_depth: NonZeroU8,
80    limits: TransactionPoolLimits,
81}
82
83impl TransactionPool {
84    /// Create new instance.
85    ///
86    /// `pruning_depth` defines how old (in blocks) should transaction be before it is automatically
87    /// removed from the transaction pool.
88    ///
89    /// `authorization_history_depth` defines a small number of recent blocks for which
90    /// authorization information is retained in each block.
91    ///
92    /// `limits` defines the limits of transaction pool.
93    pub fn new(
94        pruning_depth: NonZeroU64,
95        authorization_history_depth: NonZeroU8,
96        limits: TransactionPoolLimits,
97    ) -> Self {
98        Self {
99            transactions: HashMap::default(),
100            total_size: 0,
101            by_block_hash: HashMap::default(),
102            by_block_number: HashMap::default(),
103            pruning_depth,
104            authorization_history_depth,
105            limits,
106        }
107    }
108
109    /// Add new transaction to the pool
110    pub fn add(
111        &mut self,
112        tx_hash: TransactionHash,
113        tx: OwnedTransaction,
114    ) -> Result<(), TransactionAddError> {
115        if self.contains(&tx_hash) {
116            return Err(TransactionAddError::AlreadyExists);
117        }
118
119        let block_hash = tx.transaction().header.block_hash;
120        let Some(block_txs) = self.by_block_hash.get_mut(&block_hash) else {
121            return Err(TransactionAddError::BlockNotFound);
122        };
123
124        if self.transactions.len() == self.limits.count.get() {
125            return Err(TransactionAddError::TooManyTransactions);
126        }
127
128        let tx_size = tx.buffer().len() as usize;
129        if self.limits.size.get() - self.total_size < tx_size {
130            return Err(TransactionAddError::TotalSizeTooLarge);
131        }
132
133        self.total_size += tx_size;
134        self.transactions.insert(
135            tx_hash,
136            PoolTransaction {
137                tx,
138                state: TransactionState::New,
139            },
140        );
141        block_txs.txs.insert(tx_hash);
142
143        Ok(())
144    }
145
146    /// Mark transaction as authorized as of a specific block.
147    ///
148    /// Returns `false` if transaction is unknown.
149    pub fn mark_authorized(
150        &mut self,
151        tx_hash: &TransactionHash,
152        block_number: BlockNumber,
153        block_hash: BlockHash,
154    ) -> bool {
155        let Some(tx) = self.transactions.get_mut(tx_hash) else {
156            return false;
157        };
158
159        let authorized_details = TransactionAuthorizedDetails {
160            block_number,
161            block_hash,
162        };
163
164        match &mut tx.state {
165            TransactionState::New => {
166                tx.state = TransactionState::Authorized {
167                    at: VecDeque::from([authorized_details]),
168                };
169            }
170            TransactionState::Authorized { at } => {
171                if at.len() == usize::from(self.authorization_history_depth.get()) {
172                    at.pop_back();
173                }
174                at.push_front(authorized_details);
175            }
176        }
177
178        true
179    }
180
181    /// Whether transaction pool contains a transaction
182    pub fn contains(&self, tx_hash: &TransactionHash) -> bool {
183        self.transactions.contains_key(tx_hash)
184    }
185
186    /// Get iterator over all transactions
187    pub fn iter(
188        &self,
189    ) -> impl ExactSizeIterator<Item = (&'_ TransactionHash, &'_ PoolTransaction)> + '_ {
190        self.transactions.iter()
191    }
192
193    /// Remove transactions from the pool
194    pub fn remove<'a, Txs>(&mut self, tx_hashes: Txs)
195    where
196        Txs: Iterator<Item = &'a TransactionHash>,
197    {
198        for tx_hash in tx_hashes {
199            self.remove_single_tx(tx_hash)
200        }
201    }
202
203    fn remove_single_tx(&mut self, tx_hash: &TransactionHash) {
204        if let Some(tx) = self.transactions.remove(tx_hash) {
205            self.total_size -= tx.tx.buffer().len() as usize;
206
207            let block_hash = &tx.tx.transaction().header.block_hash;
208            if let Some(set) = self.by_block_hash.get_mut(block_hash) {
209                set.txs.remove(tx_hash);
210                if set.txs.is_empty() {
211                    self.by_block_hash.remove(block_hash);
212                }
213            }
214        }
215    }
216
217    /// Add the new best block.
218    ///
219    /// If there is already an existing block with the same or higher block number, it will be
220    /// removed alongside all transactions. Blocks older than configured pruning depth will be
221    /// removed automatically as well.
222    ///
223    /// This allows accepting transactions created at specified block hash.
224    pub fn add_best_block(&mut self, block_number: BlockNumber, block_hash: BlockHash) {
225        // Clean up old blocks or blocks that are at the same or higher block number
226        let allowed_blocks = block_number.saturating_sub(self.pruning_depth.get())..block_number;
227        self.by_block_number
228            .retain(|existing_block_number, existing_block_hash| {
229                if allowed_blocks.contains(existing_block_number) {
230                    return true;
231                }
232
233                if let Some(tx_hashes) = self.by_block_hash.remove(existing_block_hash) {
234                    for tx_hash in tx_hashes.txs {
235                        if let Some(tx) = self.transactions.remove(&tx_hash) {
236                            self.total_size -= tx.tx.buffer().len() as usize;
237                        }
238                    }
239                }
240                false
241            });
242
243        for tx in self.transactions.values_mut() {
244            if let TransactionState::Authorized { at } = &mut tx.state {
245                // Clean up verification status for blocks at the same or higher block number
246                while at
247                    .pop_front_if(|details| details.block_number >= block_number)
248                    .is_some()
249                {}
250            }
251        }
252
253        self.by_block_number.insert(block_number, block_hash);
254        self.by_block_hash.insert(
255            block_hash,
256            BlockHashDetails {
257                txs: HashSet::new(),
258            },
259        );
260    }
261}