ab_farmer/single_disk_farm/
plot_cache.rs

1//! Plot cache for single disk farm
2
3// TODO: Not supported under Miri: https://github.com/rust-lang/miri/issues/4464
4#[cfg(not(miri))]
5#[cfg(test)]
6mod tests;
7
8use crate::farm::{FarmError, MaybePieceStoredResult, PlotCache};
9use crate::single_disk_farm::direct_io_file_wrapper::DirectIoFileWrapper;
10use crate::utils::AsyncJoinOnDrop;
11use ab_core_primitives::hashes::Blake3Hash;
12use ab_core_primitives::pieces::{Piece, PieceIndex};
13use ab_farmer_components::file_ext::FileExt;
14use ab_farmer_components::sector::SectorMetadataChecksummed;
15use ab_networking::libp2p::kad::RecordKey;
16use ab_networking::utils::multihash::ToMultihash;
17use async_lock::RwLock as AsyncRwLock;
18use async_trait::async_trait;
19use bytes::BytesMut;
20use parking_lot::RwLock;
21use std::collections::HashMap;
22use std::sync::{Arc, Weak};
23use std::{io, mem};
24use thiserror::Error;
25use tokio::task;
26use tracing::{debug, info, trace, warn};
27
28/// Disk plot cache open error
29#[derive(Debug, Error)]
30pub enum DiskPlotCacheError {
31    /// I/O error occurred
32    #[error("Plot cache I/O error: {0}")]
33    Io(#[from] io::Error),
34    /// Failed to spawn task for blocking thread
35    #[error("Failed to spawn task for blocking thread: {0}")]
36    TokioJoinError(#[from] tokio::task::JoinError),
37    /// Checksum mismatch
38    #[error("Checksum mismatch")]
39    ChecksumMismatch,
40}
41
42#[derive(Debug)]
43struct CachedPieces {
44    /// Map of piece index into offset
45    map: HashMap<RecordKey, u32>,
46    next_offset: Option<u32>,
47}
48
49/// Additional piece cache that exploit part of the plot that does not contain sectors yet
50#[derive(Debug, Clone)]
51pub struct DiskPlotCache {
52    file: Weak<DirectIoFileWrapper>,
53    sectors_metadata: Weak<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
54    cached_pieces: Arc<RwLock<CachedPieces>>,
55    target_sector_count: u16,
56    sector_size: u64,
57}
58
59#[async_trait]
60impl PlotCache for DiskPlotCache {
61    async fn is_piece_maybe_stored(
62        &self,
63        key: &RecordKey,
64    ) -> Result<MaybePieceStoredResult, FarmError> {
65        Ok(self.is_piece_maybe_stored(key))
66    }
67
68    /// Store piece in cache if there is free space, and return `Ok(true)`.
69    /// Returns `Ok(false)` if there is no free space, or the farm or process is shutting down.
70    async fn try_store_piece(
71        &self,
72        piece_index: PieceIndex,
73        piece: &Piece,
74    ) -> Result<bool, FarmError> {
75        Ok(self.try_store_piece(piece_index, piece).await?)
76    }
77
78    async fn read_piece(&self, key: &RecordKey) -> Result<Option<Piece>, FarmError> {
79        Ok(self.read_piece(key).await)
80    }
81}
82
83impl DiskPlotCache {
84    pub(crate) fn new(
85        file: &Arc<DirectIoFileWrapper>,
86        sectors_metadata: &Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
87        target_sector_count: u16,
88        sector_size: u64,
89    ) -> Self {
90        info!("Checking plot cache contents, this can take a while");
91        let cached_pieces = {
92            let sectors_metadata = sectors_metadata.read_blocking();
93            let mut element = vec![0; Self::element_size() as usize];
94            // Clippy complains about `RecordKey`, but it is not changing here, so it is fine
95            #[allow(clippy::mutable_key_type)]
96            let mut map = HashMap::new();
97            let mut next_offset = None;
98
99            let file_size = sector_size * u64::from(target_sector_count);
100            let plotted_size = sector_size * sectors_metadata.len() as u64;
101
102            // Step over all free potential offsets for pieces that could have been cached
103            let from_offset = (plotted_size / Self::element_size() as u64) as u32;
104            let to_offset = (file_size / Self::element_size() as u64) as u32;
105            // TODO: Parallelize or read in larger batches
106            for offset in (from_offset..to_offset).rev() {
107                match Self::read_piece_internal(file, offset, &mut element) {
108                    Ok(maybe_piece_index) => match maybe_piece_index {
109                        Some(piece_index) => {
110                            map.insert(RecordKey::from(piece_index.to_multihash()), offset);
111                        }
112                        None => {
113                            next_offset.replace(offset);
114                            break;
115                        }
116                    },
117                    Err(DiskPlotCacheError::ChecksumMismatch) => {
118                        next_offset.replace(offset);
119                        break;
120                    }
121                    Err(error) => {
122                        warn!(%error, %offset, "Failed to read plot cache element");
123                        break;
124                    }
125                }
126            }
127
128            CachedPieces { map, next_offset }
129        };
130
131        info!("Finished checking plot cache contents");
132
133        Self {
134            file: Arc::downgrade(file),
135            sectors_metadata: Arc::downgrade(sectors_metadata),
136            cached_pieces: Arc::new(RwLock::new(cached_pieces)),
137            target_sector_count,
138            sector_size,
139        }
140    }
141
142    /// Size of a single plot cache element
143    pub(crate) const fn element_size() -> u32 {
144        (PieceIndex::SIZE + Piece::SIZE + Blake3Hash::SIZE) as u32
145    }
146
147    /// Check if piece is potentially stored in this cache (not guaranteed to be because it might be
148    /// overridden with sector any time)
149    pub(crate) fn is_piece_maybe_stored(&self, key: &RecordKey) -> MaybePieceStoredResult {
150        let offset = {
151            let cached_pieces = self.cached_pieces.read();
152
153            let Some(offset) = cached_pieces.map.get(key).copied() else {
154                return if cached_pieces.next_offset.is_some() {
155                    MaybePieceStoredResult::Vacant
156                } else {
157                    MaybePieceStoredResult::No
158                };
159            };
160
161            offset
162        };
163
164        let Some(sectors_metadata) = self.sectors_metadata.upgrade() else {
165            return MaybePieceStoredResult::No;
166        };
167
168        let element_offset = u64::from(offset) * u64::from(Self::element_size());
169        // Blocking read is fine because writes in farmer are very rare and very brief
170        let plotted_bytes = self.sector_size * sectors_metadata.read_blocking().len() as u64;
171
172        // Make sure offset is after anything that is already plotted
173        if element_offset < plotted_bytes {
174            // Remove entry since it was overwritten with a sector already
175            self.cached_pieces.write().map.remove(key);
176            MaybePieceStoredResult::No
177        } else {
178            MaybePieceStoredResult::Yes
179        }
180    }
181
182    /// Store piece in cache if there is free space, and return `Ok(true)`.
183    /// Returns `Ok(false)` if there is no free space, or the farm or process is shutting down.
184    pub(crate) async fn try_store_piece(
185        &self,
186        piece_index: PieceIndex,
187        piece: &Piece,
188    ) -> Result<bool, DiskPlotCacheError> {
189        let offset = {
190            // First, do a quick concurrent check for free space with a read lock, dropping it
191            // immediately.
192            if self.cached_pieces.read().next_offset.is_none() {
193                return Ok(false);
194            };
195
196            // Then, if there was free space, acquire a write lock, and check for intervening
197            // writes.
198            let mut cached_pieces = self.cached_pieces.write();
199            let Some(next_offset) = cached_pieces.next_offset else {
200                return Ok(false);
201            };
202
203            let offset = next_offset;
204            cached_pieces.next_offset = offset.checked_sub(1);
205            offset
206        };
207
208        let Some(sectors_metadata) = self.sectors_metadata.upgrade() else {
209            // Metadata has been dropped, farm or process is shutting down
210            return Ok(false);
211        };
212
213        let element_offset = u64::from(offset) * u64::from(Self::element_size());
214        let sectors_metadata = sectors_metadata.read().await;
215        let plotted_sectors_count = sectors_metadata.len() as u16;
216        let plotted_bytes = self.sector_size * u64::from(plotted_sectors_count);
217
218        // Make sure offset is after anything that is already plotted
219        if element_offset < plotted_bytes {
220            // Just to be safe, avoid any overlap of read and write locks
221            drop(sectors_metadata);
222            let mut cached_pieces = self.cached_pieces.write();
223            // No space to store more pieces anymore
224            cached_pieces.next_offset.take();
225            if plotted_sectors_count == self.target_sector_count {
226                // Free allocated memory once fully plotted
227                mem::take(&mut cached_pieces.map);
228            }
229            return Ok(false);
230        }
231
232        let Some(file) = self.file.upgrade() else {
233            // File has been dropped, farm or process is shutting down
234            return Ok(false);
235        };
236
237        trace!(
238            %offset,
239            ?piece_index,
240            %plotted_sectors_count,
241            "Found available piece cache free space offset, writing piece",
242        );
243
244        let write_fut = tokio::task::spawn_blocking({
245            let piece_index_bytes = piece_index.to_bytes();
246            // File writes are read/write/modify internally, so combine all data here for more
247            // efficient write
248            let mut bytes = Vec::with_capacity(PieceIndex::SIZE + piece.len() + Blake3Hash::SIZE);
249            bytes.extend_from_slice(&piece_index_bytes);
250            bytes.extend_from_slice(piece.as_ref());
251            bytes.extend_from_slice(
252                {
253                    let mut hasher = blake3::Hasher::new();
254                    hasher.update(&piece_index_bytes);
255                    hasher.update(piece.as_ref());
256                    hasher.finalize()
257                }
258                .as_bytes(),
259            );
260
261            move || file.write_all_at(&bytes, element_offset)
262        });
263
264        AsyncJoinOnDrop::new(write_fut, false).await??;
265
266        // Just to be safe, avoid any overlap of read and write locks
267        drop(sectors_metadata);
268        // Store newly written piece in the map
269        self.cached_pieces
270            .write()
271            .map
272            .insert(RecordKey::from(piece_index.to_multihash()), offset);
273
274        Ok(true)
275    }
276
277    /// Read piece from cache.
278    ///
279    /// Returns `None` if not cached.
280    pub(crate) async fn read_piece(&self, key: &RecordKey) -> Option<Piece> {
281        let offset = self.cached_pieces.read().map.get(key).copied()?;
282
283        let file = self.file.upgrade()?;
284
285        let read_fn = move || {
286            let mut element = BytesMut::zeroed(Self::element_size() as usize);
287            if let Ok(Some(_piece_index)) = Self::read_piece_internal(&file, offset, &mut element) {
288                let element = element.freeze();
289                let piece =
290                    Piece::try_from(element.slice_ref(&element[PieceIndex::SIZE..][..Piece::SIZE]))
291                        .expect("Correct length; qed");
292                Some(piece)
293            } else {
294                None
295            }
296        };
297        // TODO: On Windows spawning blocking task that allows concurrent reads causes huge memory
298        //  usage. No idea why it happens, but not spawning anything at all helps for some reason.
299        //  Someone at some point should figure it out and fix, but it will probably be not me
300        //  (Nazar).
301        //  See https://github.com/autonomys/subspace/issues/2813 and linked forum post for details.
302        //  This TODO exists in multiple files
303        let maybe_piece = if cfg!(windows) {
304            task::block_in_place(read_fn)
305        } else {
306            let read_fut = task::spawn_blocking(read_fn);
307
308            AsyncJoinOnDrop::new(read_fut, false)
309                .await
310                .unwrap_or_default()
311        };
312
313        if maybe_piece.is_none()
314            && let Some(sectors_metadata) = self.sectors_metadata.upgrade()
315        {
316            let plotted_sectors_count = sectors_metadata.read().await.len() as u16;
317
318            let mut cached_pieces = self.cached_pieces.write();
319            if plotted_sectors_count == self.target_sector_count {
320                // Free allocated memory once fully plotted
321                mem::take(&mut cached_pieces.map);
322            } else {
323                // Remove entry just in case it was overridden with a sector already
324                cached_pieces.map.remove(key);
325            }
326        }
327
328        maybe_piece
329    }
330
331    fn read_piece_internal(
332        file: &DirectIoFileWrapper,
333        offset: u32,
334        element: &mut [u8],
335    ) -> Result<Option<PieceIndex>, DiskPlotCacheError> {
336        file.read_exact_at(element, u64::from(offset) * u64::from(Self::element_size()))?;
337
338        let (piece_index_bytes, remaining_bytes) = element.split_at(PieceIndex::SIZE);
339        let (piece_bytes, expected_checksum) = remaining_bytes.split_at(Piece::SIZE);
340
341        // Verify checksum
342        let actual_checksum = {
343            let mut hasher = blake3::Hasher::new();
344            hasher.update(piece_index_bytes);
345            hasher.update(piece_bytes);
346            *hasher.finalize().as_bytes()
347        };
348        if actual_checksum != expected_checksum {
349            if element.iter().all(|&byte| byte == 0) {
350                return Ok(None);
351            }
352
353            debug!(
354                actual_checksum = %hex::encode(actual_checksum),
355                expected_checksum = %hex::encode(expected_checksum),
356                "Hash doesn't match, corrupted or overridden piece in cache"
357            );
358
359            return Err(DiskPlotCacheError::ChecksumMismatch);
360        }
361
362        let piece_index = PieceIndex::from_bytes(
363            piece_index_bytes
364                .try_into()
365                .expect("Statically known to have correct size; qed"),
366        );
367        Ok(Some(piece_index))
368    }
369}