ab_transaction_pool/
lib.rs

1use ab_core_primitives::block::{BlockNumber, BlockRoot};
2use ab_core_primitives::transaction::TransactionHash;
3use ab_core_primitives::transaction::owned::OwnedTransaction;
4use std::collections::{HashMap, HashSet, VecDeque};
5use std::num::{NonZeroU8, NonZeroU64, NonZeroUsize};
6
7/// Transaction pool limits
8#[derive(Debug, Copy, Clone)]
9pub struct TransactionPoolLimits {
10    /// Number of transactions
11    pub count: NonZeroUsize,
12    /// Total size of all transactions
13    pub size: NonZeroUsize,
14}
15
16#[derive(Debug)]
17pub struct TransactionAuthorizedDetails {
18    /// Block number at which transaction was authorized
19    pub block_number: BlockNumber,
20    /// Block root at which transaction was authorized
21    pub block_root: BlockRoot,
22}
23
24#[derive(Debug)]
25pub enum TransactionState {
26    New,
27    Authorized {
28        at: VecDeque<TransactionAuthorizedDetails>,
29    },
30}
31
32#[derive(Debug)]
33#[non_exhaustive]
34pub struct PoolTransaction {
35    pub tx: OwnedTransaction,
36    pub state: TransactionState,
37    // TODO: Slots, other things?
38}
39
40/// Error for [`TransactionPool::add()`] method
41#[derive(Debug, thiserror::Error)]
42pub enum TransactionAddError {
43    /// Already exists
44    #[error("Already exists")]
45    AlreadyExists,
46    /// The block isn't found, possibly too old
47    #[error("The block isn't found, possibly too old")]
48    BlockNotFound,
49    /// Too many transactions
50    #[error("Too many transactions")]
51    TooManyTransactions,
52    /// Total size too large
53    #[error("Total size too large")]
54    TotalSizeTooLarge,
55}
56
57#[derive(Debug)]
58struct BlockRootDetails {
59    txs: HashSet<TransactionHash>,
60}
61
62// TODO: Some integration or tracking of slots, including optimization where only changes to read
63//  slots impact transaction authorization
64/// Transaction pool implementation.
65///
66/// The goal of the transaction pool is to retain a set of transactions and associated authorization
67/// information, which can be used for block production and propagation through the network.
68#[derive(Debug)]
69pub struct TransactionPool {
70    transactions: HashMap<TransactionHash, PoolTransaction>,
71    total_size: usize,
72    /// Map from block root at which transactions were created to a set of transaction hashes
73    by_block_root: HashMap<BlockRoot, BlockRootDetails>,
74    // TODO: Optimize with an oldest block + `Vec<BlockHash>` instead
75    by_block_number: HashMap<BlockNumber, BlockRoot>,
76    pruning_depth: NonZeroU64,
77    authorization_history_depth: NonZeroU8,
78    limits: TransactionPoolLimits,
79}
80
81impl TransactionPool {
82    /// Create a new instance.
83    ///
84    /// `pruning_depth` defines how old (in blocks) should transaction be before it is automatically
85    /// removed from the transaction pool.
86    ///
87    /// `authorization_history_depth` defines a small number of recent blocks for which
88    /// authorization information is retained in each block.
89    ///
90    /// `limits` defines the limits of transaction pool.
91    pub fn new(
92        pruning_depth: NonZeroU64,
93        authorization_history_depth: NonZeroU8,
94        limits: TransactionPoolLimits,
95    ) -> Self {
96        Self {
97            transactions: HashMap::default(),
98            total_size: 0,
99            by_block_root: HashMap::default(),
100            by_block_number: HashMap::default(),
101            pruning_depth,
102            authorization_history_depth,
103            limits,
104        }
105    }
106
107    /// Add new transaction to the pool
108    pub fn add(
109        &mut self,
110        tx_hash: TransactionHash,
111        tx: OwnedTransaction,
112    ) -> Result<(), TransactionAddError> {
113        if self.contains(&tx_hash) {
114            return Err(TransactionAddError::AlreadyExists);
115        }
116
117        let block_root = tx.transaction().header.block_root;
118        let Some(block_txs) = self.by_block_root.get_mut(&block_root) else {
119            return Err(TransactionAddError::BlockNotFound);
120        };
121
122        if self.transactions.len() == self.limits.count.get() {
123            return Err(TransactionAddError::TooManyTransactions);
124        }
125
126        let tx_size = tx.buffer().len() as usize;
127        if self.limits.size.get() - self.total_size < tx_size {
128            return Err(TransactionAddError::TotalSizeTooLarge);
129        }
130
131        self.total_size += tx_size;
132        self.transactions.insert(
133            tx_hash,
134            PoolTransaction {
135                tx,
136                state: TransactionState::New,
137            },
138        );
139        block_txs.txs.insert(tx_hash);
140
141        Ok(())
142    }
143
144    /// Mark transaction as authorized as of a specific block.
145    ///
146    /// Returns `false` if transaction is unknown.
147    pub fn mark_authorized(
148        &mut self,
149        tx_hash: &TransactionHash,
150        block_number: BlockNumber,
151        block_root: BlockRoot,
152    ) -> bool {
153        let Some(tx) = self.transactions.get_mut(tx_hash) else {
154            return false;
155        };
156
157        let authorized_details = TransactionAuthorizedDetails {
158            block_number,
159            block_root,
160        };
161
162        match &mut tx.state {
163            TransactionState::New => {
164                tx.state = TransactionState::Authorized {
165                    at: VecDeque::from([authorized_details]),
166                };
167            }
168            TransactionState::Authorized { at } => {
169                if at.len() == usize::from(self.authorization_history_depth.get()) {
170                    at.pop_back();
171                }
172                at.push_front(authorized_details);
173            }
174        }
175
176        true
177    }
178
179    /// Whether transaction pool contains a transaction
180    pub fn contains(&self, tx_hash: &TransactionHash) -> bool {
181        self.transactions.contains_key(tx_hash)
182    }
183
184    /// Get iterator over all transactions
185    pub fn iter(
186        &self,
187    ) -> impl ExactSizeIterator<Item = (&'_ TransactionHash, &'_ PoolTransaction)> + '_ {
188        self.transactions.iter()
189    }
190
191    /// Remove transactions from the pool
192    pub fn remove<'a, Txs>(&mut self, tx_hashes: Txs)
193    where
194        Txs: Iterator<Item = &'a TransactionHash>,
195    {
196        for tx_hash in tx_hashes {
197            self.remove_single_tx(tx_hash)
198        }
199    }
200
201    fn remove_single_tx(&mut self, tx_hash: &TransactionHash) {
202        if let Some(tx) = self.transactions.remove(tx_hash) {
203            self.total_size -= tx.tx.buffer().len() as usize;
204
205            let block_root = &tx.tx.transaction().header.block_root;
206            if let Some(set) = self.by_block_root.get_mut(block_root) {
207                set.txs.remove(tx_hash);
208                if set.txs.is_empty() {
209                    self.by_block_root.remove(block_root);
210                }
211            }
212        }
213    }
214
215    /// Add the new best block.
216    ///
217    /// If there is already an existing block with the same or higher block number, it will be
218    /// removed alongside all transactions. Blocks older than configured pruning depth will be
219    /// removed automatically as well.
220    ///
221    /// This allows accepting transactions created at specified block root.
222    pub fn add_best_block(&mut self, block_number: BlockNumber, block_root: BlockRoot) {
223        // Clean up old blocks or blocks that are at the same or higher block number
224        let allowed_blocks =
225            block_number.saturating_sub(BlockNumber::new(self.pruning_depth.get()))..block_number;
226        self.by_block_number
227            .retain(|existing_block_number, existing_block_root| {
228                if allowed_blocks.contains(existing_block_number) {
229                    return true;
230                }
231
232                if let Some(tx_hashes) = self.by_block_root.remove(existing_block_root) {
233                    for tx_hash in tx_hashes.txs {
234                        if let Some(tx) = self.transactions.remove(&tx_hash) {
235                            self.total_size -= tx.tx.buffer().len() as usize;
236                        }
237                    }
238                }
239                false
240            });
241
242        for tx in self.transactions.values_mut() {
243            if let TransactionState::Authorized { at } = &mut tx.state {
244                // Clean up verification status for blocks at the same or higher block number
245                while at
246                    .pop_front_if(|details| details.block_number >= block_number)
247                    .is_some()
248                {}
249            }
250        }
251
252        self.by_block_number.insert(block_number, block_root);
253        self.by_block_root.insert(
254            block_root,
255            BlockRootDetails {
256                txs: HashSet::new(),
257            },
258        );
259    }
260}