Skip to main content

ab_farmer/
disk_piece_cache.rs

1//! Disk piece cache implementation
2
3mod metrics;
4// TODO: Not supported under Miri: https://github.com/rust-lang/miri/issues/4464
5#[cfg(not(miri))]
6#[cfg(test)]
7mod tests;
8
9use crate::disk_piece_cache::metrics::DiskPieceCacheMetrics;
10use crate::farm;
11use crate::farm::{FarmError, PieceCacheId, PieceCacheOffset};
12use crate::single_disk_farm::direct_io_file_wrapper::{DISK_PAGE_SIZE, DirectIoFileWrapper};
13use crate::utils::AsyncJoinOnDrop;
14use ab_core_primitives::hashes::Blake3Hash;
15use ab_core_primitives::pieces::{Piece, PieceIndex};
16use ab_farmer_components::file_ext::FileExt;
17use async_trait::async_trait;
18use bytes::BytesMut;
19use futures::channel::mpsc;
20use futures::{SinkExt, Stream, StreamExt, stream};
21use parking_lot::Mutex;
22use prometheus_client::registry::Registry;
23use std::num::NonZeroU32;
24use std::path::Path;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicU8, Ordering};
27use std::task::Poll;
28use std::{fs, io, iter};
29use thiserror::Error;
30use tokio::runtime::Handle;
31use tokio::task;
32use tracing::{Span, debug, info, warn};
33
34/// How many pieces should be skipped before stopping to check the rest of contents, this allows to
35/// not miss most of the pieces after one or two corrupted pieces
36const CONTENTS_READ_SKIP_LIMIT: usize = 3;
37/// How many piece to read from disk at the same time (using tokio thread pool)
38const PIECES_READING_CONCURRENCY: usize = 32;
39
40/// Disk piece cache open error
41#[derive(Debug, Error)]
42pub enum DiskPieceCacheError {
43    /// I/O error occurred
44    #[error("Disk piece cache I/O error: {0}")]
45    Io(#[from] io::Error),
46    /// Can't preallocate cache file, probably not enough space on disk
47    #[error("Can't preallocate cache file, probably not enough space on disk: {0}")]
48    CantPreallocateCacheFile(io::Error),
49    /// Offset outsize of range
50    #[error("Offset outsize of range: provided {provided}, max {max}")]
51    OffsetOutsideOfRange {
52        /// Provided offset
53        provided: u32,
54        /// Max offset
55        max: u32,
56    },
57    /// Checksum mismatch
58    #[error("Checksum mismatch")]
59    ChecksumMismatch,
60}
61
62#[derive(Debug)]
63struct FilePool {
64    files: Box<[DirectIoFileWrapper; PIECES_READING_CONCURRENCY]>,
65    cursor: AtomicU8,
66}
67
68impl FilePool {
69    fn open(path: &Path) -> io::Result<Self> {
70        let files = iter::repeat_with(|| DirectIoFileWrapper::open(path))
71            .take(PIECES_READING_CONCURRENCY)
72            .collect::<Result<Box<_>, _>>()?
73            .try_into()
74            .expect("Statically correct length; qed");
75        Ok(Self {
76            files,
77            cursor: AtomicU8::new(0),
78        })
79    }
80
81    fn read(&self) -> &DirectIoFileWrapper {
82        let position = usize::from(self.cursor.fetch_add(1, Ordering::Relaxed));
83        &self.files[position % PIECES_READING_CONCURRENCY]
84    }
85
86    fn write(&self) -> &DirectIoFileWrapper {
87        // Always the same file or else overlapping writes will be corrupted due to
88        // read/modify/write internals, which are in turn caused by alignment requirements
89        &self.files[0]
90    }
91}
92
93#[derive(Debug)]
94struct Inner {
95    id: PieceCacheId,
96    files: FilePool,
97    max_num_elements: u32,
98    metrics: Option<DiskPieceCacheMetrics>,
99}
100
101/// Dedicated piece cache stored on one disk, is used both to accelerate DSN queries and to plot
102/// faster.
103///
104/// Implementation is backed by a file on disk.
105#[derive(Debug, Clone)]
106pub struct DiskPieceCache {
107    inner: Arc<Inner>,
108}
109
110#[async_trait]
111impl farm::PieceCache for DiskPieceCache {
112    fn id(&self) -> &PieceCacheId {
113        &self.inner.id
114    }
115
116    #[inline]
117    fn max_num_elements(&self) -> u32 {
118        self.inner.max_num_elements
119    }
120
121    async fn contents(
122        &self,
123    ) -> Result<
124        Box<
125            dyn Stream<Item = Result<(PieceCacheOffset, Option<PieceIndex>), FarmError>>
126                + Unpin
127                + Send
128                + '_,
129        >,
130        FarmError,
131    > {
132        let this = self.clone();
133        let (mut sender, receiver) = mpsc::channel(100_000);
134        let span = Span::current();
135        let read_contents = task::spawn_blocking(move || {
136            let _guard = span.enter();
137
138            let contents = this.contents();
139            for (piece_cache_offset, maybe_piece) in contents {
140                if let Err(error) =
141                    Handle::current().block_on(sender.send(Ok((piece_cache_offset, maybe_piece))))
142                {
143                    debug!(%error, "Aborting contents iteration due to receiver dropping");
144                    break;
145                }
146            }
147        });
148        let read_contents = Mutex::new(Some(AsyncJoinOnDrop::new(read_contents, false)));
149        // Change order such that in closure below `receiver` is dropped before `read_contents`
150        let mut receiver = receiver;
151
152        Ok(Box::new(stream::poll_fn(move |ctx| {
153            let poll_result = receiver.poll_next_unpin(ctx);
154
155            if matches!(poll_result, Poll::Ready(None)) {
156                read_contents.lock().take();
157            }
158
159            poll_result
160        })))
161    }
162
163    async fn write_piece(
164        &self,
165        offset: PieceCacheOffset,
166        piece_index: PieceIndex,
167        piece: &Piece,
168    ) -> Result<(), FarmError> {
169        let piece = piece.clone();
170        let piece_cache = self.clone();
171        Ok(AsyncJoinOnDrop::new(
172            task::spawn_blocking(move || piece_cache.write_piece(offset, piece_index, &piece)),
173            false,
174        )
175        .await??)
176    }
177
178    async fn read_piece_index(
179        &self,
180        offset: PieceCacheOffset,
181    ) -> Result<Option<PieceIndex>, FarmError> {
182        let piece_cache = self.clone();
183        let span = Span::current();
184        Ok(AsyncJoinOnDrop::new(
185            task::spawn_blocking(move || {
186                let _guard = span.enter();
187
188                piece_cache.read_piece_index(offset)
189            }),
190            false,
191        )
192        .await??)
193    }
194
195    async fn read_piece(
196        &self,
197        offset: PieceCacheOffset,
198    ) -> Result<Option<(PieceIndex, Piece)>, FarmError> {
199        let span = Span::current();
200
201        // TODO: On Windows spawning blocking task that allows concurrent reads causes huge memory
202        //  usage. No idea why it happens, but not spawning anything at all helps for some reason.
203        //  Someone at some point should figure it out and fix, but it will probably be not me
204        //  (Nazar).
205        //  See https://github.com/autonomys/subspace/issues/2813 and linked forum post for details.
206        //  This TODO exists in multiple files
207        if cfg!(windows) {
208            Ok(task::block_in_place(|| {
209                let _guard = span.enter();
210
211                self.read_piece(offset)
212            })?)
213        } else {
214            let piece_cache = self.clone();
215            Ok(AsyncJoinOnDrop::new(
216                task::spawn_blocking(move || {
217                    let _guard = span.enter();
218
219                    piece_cache.read_piece(offset)
220                }),
221                false,
222            )
223            .await??)
224        }
225    }
226
227    async fn read_pieces(
228        &self,
229        offsets: Box<dyn Iterator<Item = PieceCacheOffset> + Send>,
230    ) -> Result<
231        Box<
232            dyn Stream<Item = Result<(PieceCacheOffset, Option<(PieceIndex, Piece)>), FarmError>>
233                + Send
234                + Unpin
235                + '_,
236        >,
237        FarmError,
238    > {
239        let iter = offsets.map(move |offset| async move {
240            Ok((offset, farm::PieceCache::read_piece(self, offset).await?))
241        });
242        Ok(Box::new(
243            // Constrain concurrency to avoid excessive memory usage, while still getting
244            // performance of concurrent reads
245            stream::iter(iter).buffer_unordered(PIECES_READING_CONCURRENCY),
246        ))
247    }
248}
249
250impl DiskPieceCache {
251    pub(crate) const FILE_NAME: &'static str = "piece_cache.bin";
252
253    /// Open cache, capacity is measured in elements of [`DiskPieceCache::element_size()`] size
254    pub fn open(
255        directory: &Path,
256        capacity: NonZeroU32,
257        id: Option<PieceCacheId>,
258        registry: Option<&mut Registry>,
259    ) -> Result<Self, DiskPieceCacheError> {
260        let capacity = capacity.get();
261        let files = FilePool::open(&directory.join(Self::FILE_NAME))?;
262
263        let expected_size = u64::from(Self::element_size()) * u64::from(capacity);
264        // Align plot file size for disk sector size
265        let expected_size = expected_size.div_ceil(DISK_PAGE_SIZE as u64) * DISK_PAGE_SIZE as u64;
266        {
267            let file = files.write();
268            if file.size()? != expected_size {
269                // Allocating the whole file (`set_len` below can create a sparse file, which will
270                // cause writes to fail later)
271                file.preallocate(expected_size)
272                    .map_err(DiskPieceCacheError::CantPreallocateCacheFile)?;
273                // Truncating file (if necessary)
274                file.set_len(expected_size)?;
275            }
276        }
277
278        // ID for cache is ephemeral unless provided explicitly
279        let id = id.unwrap_or_else(PieceCacheId::new);
280        let metrics = registry.map(|registry| DiskPieceCacheMetrics::new(registry, &id, capacity));
281
282        Ok(Self {
283            inner: Arc::new(Inner {
284                id,
285                files,
286                max_num_elements: capacity,
287                metrics,
288            }),
289        })
290    }
291
292    /// Size of a single piece cache element
293    pub const fn element_size() -> u32 {
294        (PieceIndex::SIZE + Piece::SIZE + Blake3Hash::SIZE) as u32
295    }
296
297    /// Contents of this piece cache
298    ///
299    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
300    /// doesn't happen for the same piece being accessed!
301    pub(crate) fn contents(
302        &self,
303    ) -> impl ExactSizeIterator<Item = (PieceCacheOffset, Option<PieceIndex>)> + '_ {
304        let mut element = vec![0; Self::element_size() as usize];
305        let count_total = self.inner.metrics.as_ref().is_some_and(|metrics| {
306            metrics.contents.inc();
307            metrics.capacity_used.get() == 0
308        });
309        let mut current_skip = 0;
310
311        // TODO: Parallelize or read in larger batches
312        (0..self.inner.max_num_elements).map(move |offset| {
313            if current_skip > CONTENTS_READ_SKIP_LIMIT {
314                return (PieceCacheOffset(offset), None);
315            }
316
317            match self.read_piece_internal(offset, &mut element) {
318                Ok(maybe_piece_index) => {
319                    if maybe_piece_index.is_none() {
320                        current_skip += 1;
321                    } else {
322                        if count_total && let Some(metrics) = &self.inner.metrics {
323                            metrics.capacity_used.inc();
324                        }
325                        current_skip = 0;
326                    }
327
328                    (PieceCacheOffset(offset), maybe_piece_index)
329                }
330                Err(error) => {
331                    warn!(%error, %offset, "Failed to read cache element");
332
333                    current_skip += 1;
334
335                    (PieceCacheOffset(offset), None)
336                }
337            }
338        })
339    }
340
341    /// Store piece in cache at specified offset, replacing existing piece if there is one.
342    ///
343    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
344    /// doesn't happen for the same piece being accessed!
345    pub(crate) fn write_piece(
346        &self,
347        offset: PieceCacheOffset,
348        piece_index: PieceIndex,
349        piece: &Piece,
350    ) -> Result<(), DiskPieceCacheError> {
351        let PieceCacheOffset(offset) = offset;
352        if offset >= self.inner.max_num_elements {
353            return Err(DiskPieceCacheError::OffsetOutsideOfRange {
354                provided: offset,
355                max: self.inner.max_num_elements - 1,
356            });
357        }
358
359        if let Some(metrics) = &self.inner.metrics {
360            metrics.write_piece.inc();
361            let capacity_used = i64::from(offset + 1);
362            if metrics.capacity_used.get() != capacity_used {
363                metrics.capacity_used.set(capacity_used);
364            }
365        }
366        let element_offset = u64::from(offset) * u64::from(Self::element_size());
367
368        let piece_index_bytes = piece_index.to_bytes();
369        // File writes are read/write/modify internally, so combine all data here for more efficient
370        // write
371        let mut bytes = Vec::with_capacity(PieceIndex::SIZE + Piece::SIZE + Blake3Hash::SIZE);
372        bytes.extend_from_slice(&piece_index_bytes);
373        bytes.extend_from_slice(piece.as_ref());
374        bytes.extend_from_slice(
375            {
376                let mut hasher = blake3::Hasher::new();
377                hasher.update(&piece_index_bytes);
378                hasher.update(piece.as_ref());
379                hasher.finalize()
380            }
381            .as_bytes(),
382        );
383        self.inner
384            .files
385            .write()
386            .write_all_at(&bytes, element_offset)?;
387
388        Ok(())
389    }
390
391    /// Read piece index from cache at specified offset.
392    ///
393    /// Returns `None` if offset is out of range.
394    ///
395    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
396    /// doesn't happen for the same piece being accessed!
397    pub(crate) fn read_piece_index(
398        &self,
399        offset: PieceCacheOffset,
400    ) -> Result<Option<PieceIndex>, DiskPieceCacheError> {
401        let PieceCacheOffset(offset) = offset;
402        if offset >= self.inner.max_num_elements {
403            warn!(%offset, "Trying to read piece out of range, this must be an implementation bug");
404            return Err(DiskPieceCacheError::OffsetOutsideOfRange {
405                provided: offset,
406                max: self.inner.max_num_elements - 1,
407            });
408        }
409
410        if let Some(metrics) = &self.inner.metrics {
411            metrics.read_piece_index.inc();
412        }
413        self.read_piece_internal(offset, &mut vec![0; Self::element_size() as usize])
414    }
415
416    /// Read piece from cache at specified offset.
417    ///
418    /// Returns `None` if offset is out of range.
419    ///
420    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
421    /// doesn't happen for the same piece being accessed!
422    pub(crate) fn read_piece(
423        &self,
424        offset: PieceCacheOffset,
425    ) -> Result<Option<(PieceIndex, Piece)>, DiskPieceCacheError> {
426        let PieceCacheOffset(offset) = offset;
427        if offset >= self.inner.max_num_elements {
428            warn!(%offset, "Trying to read piece out of range, this must be an implementation bug");
429            return Err(DiskPieceCacheError::OffsetOutsideOfRange {
430                provided: offset,
431                max: self.inner.max_num_elements - 1,
432            });
433        }
434
435        if let Some(metrics) = &self.inner.metrics {
436            metrics.read_piece.inc();
437        }
438        let mut element = BytesMut::zeroed(Self::element_size() as usize);
439        if let Some(piece_index) = self.read_piece_internal(offset, &mut element)? {
440            let element = element.freeze();
441            let piece =
442                Piece::try_from(element.slice_ref(&element[PieceIndex::SIZE..][..Piece::SIZE]))
443                    .expect("Correct length; qed");
444            Ok(Some((piece_index, piece)))
445        } else {
446            Ok(None)
447        }
448    }
449
450    fn read_piece_internal(
451        &self,
452        offset: u32,
453        element: &mut [u8],
454    ) -> Result<Option<PieceIndex>, DiskPieceCacheError> {
455        self.inner
456            .files
457            .read()
458            .read_exact_at(element, u64::from(offset) * u64::from(Self::element_size()))?;
459
460        let (piece_index_bytes, remaining_bytes) = element.split_at(PieceIndex::SIZE);
461        let (piece_bytes, expected_checksum) = remaining_bytes.split_at(Piece::SIZE);
462
463        // Verify checksum
464        let actual_checksum = {
465            let mut hasher = blake3::Hasher::new();
466            hasher.update(piece_index_bytes);
467            hasher.update(piece_bytes);
468            *hasher.finalize().as_bytes()
469        };
470        if actual_checksum != expected_checksum {
471            if element.iter().all(|&byte| byte == 0) {
472                return Ok(None);
473            }
474
475            debug!(
476                actual_checksum = %hex::encode(actual_checksum),
477                expected_checksum = %hex::encode(expected_checksum),
478                "Hash doesn't match, corrupted piece in cache"
479            );
480
481            return Err(DiskPieceCacheError::ChecksumMismatch);
482        }
483
484        let piece_index = PieceIndex::from_bytes(
485            piece_index_bytes
486                .try_into()
487                .expect("Statically known to have correct size; qed"),
488        );
489        Ok(Some(piece_index))
490    }
491
492    pub(crate) fn wipe(directory: &Path) -> io::Result<()> {
493        let piece_cache = directory.join(Self::FILE_NAME);
494        if !piece_cache.exists() {
495            return Ok(());
496        }
497        info!("Deleting piece cache file at {}", piece_cache.display());
498        fs::remove_file(piece_cache)
499    }
500}