ab_client_archiving/
segment_headers_store.rs

1use ab_core_primitives::block::BlockNumber;
2use ab_core_primitives::segments::{SegmentHeader, SegmentIndex};
3use parity_scale_codec::Encode;
4use parking_lot::RwLock;
5use std::hint::black_box;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicU16, Ordering};
8use tracing::debug;
9
10/// Error for [`SegmentHeadersStore`]
11#[derive(Debug, thiserror::Error)]
12pub enum SegmentHeaderStoreError {
13    /// Segment index must strictly follow last segment index, can't store segment header
14    #[error(
15        "Segment index {segment_index} must strictly follow last segment index \
16        {last_segment_index}, can't store segment header"
17    )]
18    MustFollowLastSegmentIndex {
19        /// Segment index that was attempted to be inserted
20        segment_index: SegmentIndex,
21        /// Last segment index
22        last_segment_index: SegmentIndex,
23    },
24    /// First segment index must be zero
25    #[error("First segment index must be zero, found {segment_index}")]
26    FirstSegmentIndexZero {
27        /// Segment index that was attempted to be inserted
28        segment_index: SegmentIndex,
29    },
30}
31
32#[derive(Debug)]
33struct SegmentHeadersStoreInner {
34    next_key_index: AtomicU16,
35    /// In-memory cache of segment headers
36    cache: RwLock<Vec<SegmentHeader>>,
37}
38
39// TODO: Disk persistence
40/// Persistent storage of segment headers.
41///
42/// It maintains all known segment headers. During sync from DSN it is possible that this data
43/// structure contains segment headers that from the point of view of the tip of the current chain
44/// are "in the future". This is expected and must be accounted for in the archiver and other
45/// places.
46///
47/// Segment headers are stored in batches (which is more efficient to store and retrieve). Each next
48/// batch contains distinct segment headers with monotonically increasing segment indices. During
49/// instantiation all previously stored batches will be read and in-memory representation of the
50/// whole contents will be created such that queries to this data structure are quick and not
51/// involving any disk I/O.
52#[derive(Debug)]
53pub struct SegmentHeadersStore {
54    inner: Arc<SegmentHeadersStoreInner>,
55    confirmation_depth_k: BlockNumber,
56}
57
58impl Clone for SegmentHeadersStore {
59    fn clone(&self) -> Self {
60        Self {
61            inner: Arc::clone(&self.inner),
62            confirmation_depth_k: self.confirmation_depth_k,
63        }
64    }
65}
66
67impl SegmentHeadersStore {
68    const KEY_PREFIX: &'static [u8] = b"segment-headers";
69    const INITIAL_CACHE_CAPACITY: usize = 1_000;
70
71    /// Create new instance
72    pub fn new(confirmation_depth_k: BlockNumber) -> Result<Self, SegmentHeaderStoreError> {
73        let cache = Vec::with_capacity(Self::INITIAL_CACHE_CAPACITY);
74
75        debug!("Started loading segment headers into cache");
76        // Segment headers are stored in batches (which is more efficient to store and retrieve), this is why code deals
77        // with key indices here rather that segment indices. Essentially this iterates over keys from 0 until missing
78        // entry is hit, which becomes the next key index where additional segment headers will be stored.
79        let next_key_index = 0;
80        // while let Some(segment_headers) =
81        //     aux_store
82        //         .get_aux(&Self::key(next_key_index))?
83        //         .map(|segment_header| {
84        //             Vec::<SegmentHeader>::decode(&mut segment_header.as_slice())
85        //                 .expect("Always correct segment header unless DB is corrupted; qed")
86        //         })
87        // {
88        //     cache.extend(segment_headers);
89        //     next_key_index += 1;
90        // }
91        debug!("Finished loading segment headers into cache");
92
93        Ok(Self {
94            inner: Arc::new(SegmentHeadersStoreInner {
95                // aux_store,
96                next_key_index: AtomicU16::new(next_key_index),
97                cache: RwLock::new(cache),
98            }),
99            confirmation_depth_k,
100        })
101    }
102
103    /// Returns last observed segment header
104    pub fn last_segment_header(&self) -> Option<SegmentHeader> {
105        self.inner.cache.read().last().cloned()
106    }
107
108    /// Returns last observed segment index
109    pub fn max_segment_index(&self) -> Option<SegmentIndex> {
110        let segment_index = self.inner.cache.read().len().checked_sub(1)? as u64;
111        Some(SegmentIndex::from(segment_index))
112    }
113
114    /// Add segment headers.
115    ///
116    /// Multiple can be inserted for efficiency purposes.
117    pub fn add_segment_headers(
118        &self,
119        segment_headers: &[SegmentHeader],
120    ) -> Result<(), SegmentHeaderStoreError> {
121        let mut maybe_last_segment_index = self.max_segment_index();
122        let mut segment_headers_to_store = Vec::with_capacity(segment_headers.len());
123        // Check all input segment headers to see which ones are not stored yet and verifying that segment indices are
124        // monotonically increasing
125        for segment_header in segment_headers {
126            let segment_index = segment_header.segment_index();
127            match maybe_last_segment_index {
128                Some(last_segment_index) => {
129                    if segment_index <= last_segment_index {
130                        // Skip already stored segment headers
131                        continue;
132                    }
133
134                    if segment_index != last_segment_index + SegmentIndex::ONE {
135                        return Err(SegmentHeaderStoreError::MustFollowLastSegmentIndex {
136                            segment_index,
137                            last_segment_index,
138                        });
139                    }
140
141                    segment_headers_to_store.push(segment_header);
142                    maybe_last_segment_index.replace(segment_index);
143                }
144                None => {
145                    if segment_index != SegmentIndex::ZERO {
146                        return Err(SegmentHeaderStoreError::FirstSegmentIndexZero {
147                            segment_index,
148                        });
149                    }
150
151                    segment_headers_to_store.push(segment_header);
152                    maybe_last_segment_index.replace(segment_index);
153                }
154            }
155        }
156
157        if segment_headers_to_store.is_empty() {
158            return Ok(());
159        }
160
161        // Insert all new segment headers into vacant key index for efficiency purposes
162        // TODO: Do compaction when we have too many keys: combine multiple segment headers into a
163        //  single entry for faster retrievals and more compact storage
164        {
165            let key_index = self.inner.next_key_index.fetch_add(1, Ordering::SeqCst);
166            let key = Self::key(key_index);
167            let value = segment_headers_to_store.encode();
168            let insert_data = vec![(key.as_slice(), value.as_slice())];
169
170            black_box(insert_data);
171            // self.inner.aux_store.insert_aux(&insert_data, &[])?;
172        }
173        self.inner.cache.write().extend(segment_headers_to_store);
174
175        Ok(())
176    }
177
178    /// Get a single segment header
179    pub fn get_segment_header(&self, segment_index: SegmentIndex) -> Option<SegmentHeader> {
180        self.inner
181            .cache
182            .read()
183            .get(u64::from(segment_index) as usize)
184            .copied()
185    }
186
187    fn key(key_index: u16) -> Vec<u8> {
188        (Self::KEY_PREFIX, key_index.to_le_bytes()).encode()
189    }
190
191    /// Get segment headers that are expected to be included at specified block number.
192    pub fn segment_headers_for_block(&self, block_number: BlockNumber) -> Vec<SegmentHeader> {
193        let Some(last_segment_index) = self.max_segment_index() else {
194            // Not initialized
195            return Vec::new();
196        };
197
198        // Special case for the initial segment (for genesis block).
199        if block_number == BlockNumber::ONE {
200            // If there is a segment index present, and we store monotonically increasing segment
201            // headers, then the first header exists.
202            return vec![
203                self.get_segment_header(SegmentIndex::ZERO)
204                    .expect("Segment headers are stored in monotonically increasing order; qed"),
205            ];
206        }
207
208        if last_segment_index == SegmentIndex::ZERO {
209            // Genesis segment already included in block #1
210            return Vec::new();
211        }
212
213        let mut current_segment_index = last_segment_index;
214        loop {
215            // If the current segment index present, and we store monotonically increasing segment
216            // headers, then the current segment header exists as well.
217            let current_segment_header = self
218                .get_segment_header(current_segment_index)
219                .expect("Segment headers are stored in monotonically increasing order; qed");
220
221            // The block immediately after the archived segment adding the confirmation depth
222            let target_block_number = current_segment_header.last_archived_block.number()
223                + BlockNumber::ONE
224                + self.confirmation_depth_k;
225            if target_block_number == block_number {
226                let mut headers_for_block = vec![current_segment_header];
227
228                // Check block spanning multiple segments
229                let last_archived_block_number = current_segment_header.last_archived_block.number;
230                let mut segment_index = current_segment_index - SegmentIndex::ONE;
231
232                while let Some(segment_header) = self.get_segment_header(segment_index) {
233                    if segment_header.last_archived_block.number == last_archived_block_number {
234                        headers_for_block.insert(0, segment_header);
235                        segment_index -= SegmentIndex::ONE;
236                    } else {
237                        break;
238                    }
239                }
240
241                return headers_for_block;
242            }
243
244            // iterate segments further
245            if target_block_number > block_number {
246                // no need to check the initial segment
247                if current_segment_index > SegmentIndex::ONE {
248                    current_segment_index -= SegmentIndex::ONE
249                } else {
250                    break;
251                }
252            } else {
253                // No segment headers required
254                return Vec::new();
255            }
256        }
257
258        // No segment headers required
259        Vec::new()
260    }
261}