ab_client_archiving/
segment_headers_store.rs1use ab_core_primitives::block::BlockNumber;
2use ab_core_primitives::segments::{SegmentHeader, SegmentIndex};
3use parity_scale_codec::Encode;
4use parking_lot::RwLock;
5use std::hint::black_box;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicU16, Ordering};
8use tracing::debug;
9
10#[derive(Debug, thiserror::Error)]
12pub enum SegmentHeaderStoreError {
13 #[error(
15 "Segment index {segment_index} must strictly follow last segment index \
16 {last_segment_index}, can't store segment header"
17 )]
18 MustFollowLastSegmentIndex {
19 segment_index: SegmentIndex,
21 last_segment_index: SegmentIndex,
23 },
24 #[error("First segment index must be zero, found {segment_index}")]
26 FirstSegmentIndexZero {
27 segment_index: SegmentIndex,
29 },
30}
31
32#[derive(Debug)]
33struct SegmentHeadersStoreInner {
34 next_key_index: AtomicU16,
35 cache: RwLock<Vec<SegmentHeader>>,
37}
38
39#[derive(Debug)]
53pub struct SegmentHeadersStore {
54 inner: Arc<SegmentHeadersStoreInner>,
55 confirmation_depth_k: BlockNumber,
56}
57
58impl Clone for SegmentHeadersStore {
59 fn clone(&self) -> Self {
60 Self {
61 inner: Arc::clone(&self.inner),
62 confirmation_depth_k: self.confirmation_depth_k,
63 }
64 }
65}
66
67impl SegmentHeadersStore {
68 const KEY_PREFIX: &'static [u8] = b"segment-headers";
69 const INITIAL_CACHE_CAPACITY: usize = 1_000;
70
71 pub fn new(confirmation_depth_k: BlockNumber) -> Result<Self, SegmentHeaderStoreError> {
73 let cache = Vec::with_capacity(Self::INITIAL_CACHE_CAPACITY);
74
75 debug!("Started loading segment headers into cache");
76 let next_key_index = 0;
80 debug!("Finished loading segment headers into cache");
92
93 Ok(Self {
94 inner: Arc::new(SegmentHeadersStoreInner {
95 next_key_index: AtomicU16::new(next_key_index),
97 cache: RwLock::new(cache),
98 }),
99 confirmation_depth_k,
100 })
101 }
102
103 pub fn last_segment_header(&self) -> Option<SegmentHeader> {
105 self.inner.cache.read().last().cloned()
106 }
107
108 pub fn max_segment_index(&self) -> Option<SegmentIndex> {
110 let segment_index = self.inner.cache.read().len().checked_sub(1)? as u64;
111 Some(SegmentIndex::from(segment_index))
112 }
113
114 pub fn add_segment_headers(
118 &self,
119 segment_headers: &[SegmentHeader],
120 ) -> Result<(), SegmentHeaderStoreError> {
121 let mut maybe_last_segment_index = self.max_segment_index();
122 let mut segment_headers_to_store = Vec::with_capacity(segment_headers.len());
123 for segment_header in segment_headers {
126 let segment_index = segment_header.segment_index();
127 match maybe_last_segment_index {
128 Some(last_segment_index) => {
129 if segment_index <= last_segment_index {
130 continue;
132 }
133
134 if segment_index != last_segment_index + SegmentIndex::ONE {
135 return Err(SegmentHeaderStoreError::MustFollowLastSegmentIndex {
136 segment_index,
137 last_segment_index,
138 });
139 }
140
141 segment_headers_to_store.push(segment_header);
142 maybe_last_segment_index.replace(segment_index);
143 }
144 None => {
145 if segment_index != SegmentIndex::ZERO {
146 return Err(SegmentHeaderStoreError::FirstSegmentIndexZero {
147 segment_index,
148 });
149 }
150
151 segment_headers_to_store.push(segment_header);
152 maybe_last_segment_index.replace(segment_index);
153 }
154 }
155 }
156
157 if segment_headers_to_store.is_empty() {
158 return Ok(());
159 }
160
161 {
165 let key_index = self.inner.next_key_index.fetch_add(1, Ordering::SeqCst);
166 let key = Self::key(key_index);
167 let value = segment_headers_to_store.encode();
168 let insert_data = vec![(key.as_slice(), value.as_slice())];
169
170 black_box(insert_data);
171 }
173 self.inner.cache.write().extend(segment_headers_to_store);
174
175 Ok(())
176 }
177
178 pub fn get_segment_header(&self, segment_index: SegmentIndex) -> Option<SegmentHeader> {
180 self.inner
181 .cache
182 .read()
183 .get(u64::from(segment_index) as usize)
184 .copied()
185 }
186
187 fn key(key_index: u16) -> Vec<u8> {
188 (Self::KEY_PREFIX, key_index.to_le_bytes()).encode()
189 }
190
191 pub fn segment_headers_for_block(&self, block_number: BlockNumber) -> Vec<SegmentHeader> {
193 let Some(last_segment_index) = self.max_segment_index() else {
194 return Vec::new();
196 };
197
198 if block_number == BlockNumber::ONE {
200 return vec![
203 self.get_segment_header(SegmentIndex::ZERO)
204 .expect("Segment headers are stored in monotonically increasing order; qed"),
205 ];
206 }
207
208 if last_segment_index == SegmentIndex::ZERO {
209 return Vec::new();
211 }
212
213 let mut current_segment_index = last_segment_index;
214 loop {
215 let current_segment_header = self
218 .get_segment_header(current_segment_index)
219 .expect("Segment headers are stored in monotonically increasing order; qed");
220
221 let target_block_number = current_segment_header.last_archived_block.number()
223 + BlockNumber::ONE
224 + self.confirmation_depth_k;
225 if target_block_number == block_number {
226 let mut headers_for_block = vec![current_segment_header];
227
228 let last_archived_block_number = current_segment_header.last_archived_block.number;
230 let mut segment_index = current_segment_index - SegmentIndex::ONE;
231
232 while let Some(segment_header) = self.get_segment_header(segment_index) {
233 if segment_header.last_archived_block.number == last_archived_block_number {
234 headers_for_block.insert(0, segment_header);
235 segment_index -= SegmentIndex::ONE;
236 } else {
237 break;
238 }
239 }
240
241 return headers_for_block;
242 }
243
244 if target_block_number > block_number {
246 if current_segment_index > SegmentIndex::ONE {
248 current_segment_index -= SegmentIndex::ONE
249 } else {
250 break;
251 }
252 } else {
253 return Vec::new();
255 }
256 }
257
258 Vec::new()
260 }
261}