ab_farmer/
disk_piece_cache.rs

1//! Disk piece cache implementation
2
3mod metrics;
4// TODO: Not supported under Miri: https://github.com/rust-lang/miri/issues/4464
5#[cfg(not(miri))]
6#[cfg(test)]
7mod tests;
8
9use crate::disk_piece_cache::metrics::DiskPieceCacheMetrics;
10use crate::farm;
11use crate::farm::{FarmError, PieceCacheId, PieceCacheOffset};
12use crate::single_disk_farm::direct_io_file_wrapper::{DISK_PAGE_SIZE, DirectIoFileWrapper};
13use crate::utils::AsyncJoinOnDrop;
14use ab_core_primitives::hashes::Blake3Hash;
15use ab_core_primitives::pieces::{Piece, PieceIndex};
16use ab_farmer_components::file_ext::FileExt;
17use async_trait::async_trait;
18use bytes::BytesMut;
19use futures::channel::mpsc;
20use futures::{SinkExt, Stream, StreamExt, stream};
21use parking_lot::Mutex;
22use prometheus_client::registry::Registry;
23use std::num::NonZeroU32;
24use std::path::Path;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicU8, Ordering};
27use std::task::Poll;
28use std::{fs, io};
29use thiserror::Error;
30use tokio::runtime::Handle;
31use tokio::task;
32use tracing::{Span, debug, info, warn};
33
34/// How many pieces should be skipped before stopping to check the rest of contents, this allows to
35/// not miss most of the pieces after one or two corrupted pieces
36const CONTENTS_READ_SKIP_LIMIT: usize = 3;
37/// How many piece to read from disk at the same time (using tokio thread pool)
38const PIECES_READING_CONCURRENCY: usize = 32;
39
40/// Disk piece cache open error
41#[derive(Debug, Error)]
42pub enum DiskPieceCacheError {
43    /// I/O error occurred
44    #[error("Disk piece cache I/O error: {0}")]
45    Io(#[from] io::Error),
46    /// Can't preallocate cache file, probably not enough space on disk
47    #[error("Can't preallocate cache file, probably not enough space on disk: {0}")]
48    CantPreallocateCacheFile(io::Error),
49    /// Offset outsize of range
50    #[error("Offset outsize of range: provided {provided}, max {max}")]
51    OffsetOutsideOfRange {
52        /// Provided offset
53        provided: u32,
54        /// Max offset
55        max: u32,
56    },
57    /// Checksum mismatch
58    #[error("Checksum mismatch")]
59    ChecksumMismatch,
60}
61
62#[derive(Debug)]
63struct FilePool {
64    files: Box<[DirectIoFileWrapper; PIECES_READING_CONCURRENCY]>,
65    cursor: AtomicU8,
66}
67
68impl FilePool {
69    fn open(path: &Path) -> io::Result<Self> {
70        let files = (0..PIECES_READING_CONCURRENCY)
71            .map(|_| DirectIoFileWrapper::open(path))
72            .collect::<Result<Box<_>, _>>()?
73            .try_into()
74            .expect("Statically correct length; qed");
75        Ok(Self {
76            files,
77            cursor: AtomicU8::new(0),
78        })
79    }
80
81    fn read(&self) -> &DirectIoFileWrapper {
82        let position = usize::from(self.cursor.fetch_add(1, Ordering::Relaxed));
83        &self.files[position % PIECES_READING_CONCURRENCY]
84    }
85
86    fn write(&self) -> &DirectIoFileWrapper {
87        // Always the same file or else overlapping writes will be corrupted due to
88        // read/modify/write internals, which are in turn caused by alignment requirements
89        &self.files[0]
90    }
91}
92
93#[derive(Debug)]
94struct Inner {
95    id: PieceCacheId,
96    files: FilePool,
97    max_num_elements: u32,
98    metrics: Option<DiskPieceCacheMetrics>,
99}
100
101/// Dedicated piece cache stored on one disk, is used both to accelerate DSN queries and to plot
102/// faster.
103///
104/// Implementation is backed by a file on disk.
105#[derive(Debug, Clone)]
106pub struct DiskPieceCache {
107    inner: Arc<Inner>,
108}
109
110#[async_trait]
111impl farm::PieceCache for DiskPieceCache {
112    fn id(&self) -> &PieceCacheId {
113        &self.inner.id
114    }
115
116    #[inline]
117    fn max_num_elements(&self) -> u32 {
118        self.inner.max_num_elements
119    }
120
121    async fn contents(
122        &self,
123    ) -> Result<
124        Box<
125            dyn Stream<Item = Result<(PieceCacheOffset, Option<PieceIndex>), FarmError>>
126                + Unpin
127                + Send
128                + '_,
129        >,
130        FarmError,
131    > {
132        let this = self.clone();
133        let (mut sender, receiver) = mpsc::channel(100_000);
134        let span = Span::current();
135        let read_contents = task::spawn_blocking(move || {
136            let _guard = span.enter();
137
138            let contents = this.contents();
139            for (piece_cache_offset, maybe_piece) in contents {
140                if let Err(error) =
141                    Handle::current().block_on(sender.send(Ok((piece_cache_offset, maybe_piece))))
142                {
143                    debug!(%error, "Aborting contents iteration due to receiver dropping");
144                    break;
145                }
146            }
147        });
148        let read_contents = Mutex::new(Some(AsyncJoinOnDrop::new(read_contents, false)));
149        // Change order such that in closure below `receiver` is dropped before `read_contents`
150        let mut receiver = receiver;
151
152        Ok(Box::new(stream::poll_fn(move |ctx| {
153            let poll_result = receiver.poll_next_unpin(ctx);
154
155            if matches!(poll_result, Poll::Ready(None)) {
156                read_contents.lock().take();
157            }
158
159            poll_result
160        })))
161    }
162
163    async fn write_piece(
164        &self,
165        offset: PieceCacheOffset,
166        piece_index: PieceIndex,
167        piece: &Piece,
168    ) -> Result<(), FarmError> {
169        let piece = piece.clone();
170        let piece_cache = self.clone();
171        Ok(AsyncJoinOnDrop::new(
172            task::spawn_blocking(move || piece_cache.write_piece(offset, piece_index, &piece)),
173            false,
174        )
175        .await??)
176    }
177
178    async fn read_piece_index(
179        &self,
180        offset: PieceCacheOffset,
181    ) -> Result<Option<PieceIndex>, FarmError> {
182        let piece_cache = self.clone();
183        let span = Span::current();
184        Ok(AsyncJoinOnDrop::new(
185            task::spawn_blocking(move || {
186                let _guard = span.enter();
187
188                piece_cache.read_piece_index(offset)
189            }),
190            false,
191        )
192        .await??)
193    }
194
195    async fn read_piece(
196        &self,
197        offset: PieceCacheOffset,
198    ) -> Result<Option<(PieceIndex, Piece)>, FarmError> {
199        let span = Span::current();
200
201        // TODO: On Windows spawning blocking task that allows concurrent reads causes huge memory
202        //  usage. No idea why it happens, but not spawning anything at all helps for some reason.
203        //  Someone at some point should figure it out and fix, but it will probably be not me
204        //  (Nazar).
205        //  See https://github.com/autonomys/subspace/issues/2813 and linked forum post for details.
206        //  This TODO exists in multiple files
207        if cfg!(windows) {
208            Ok(task::block_in_place(|| {
209                let _guard = span.enter();
210
211                self.read_piece(offset)
212            })?)
213        } else {
214            let piece_cache = self.clone();
215            Ok(AsyncJoinOnDrop::new(
216                task::spawn_blocking(move || {
217                    let _guard = span.enter();
218
219                    piece_cache.read_piece(offset)
220                }),
221                false,
222            )
223            .await??)
224        }
225    }
226
227    async fn read_pieces(
228        &self,
229        offsets: Box<dyn Iterator<Item = PieceCacheOffset> + Send>,
230    ) -> Result<
231        Box<
232            dyn Stream<Item = Result<(PieceCacheOffset, Option<(PieceIndex, Piece)>), FarmError>>
233                + Send
234                + Unpin
235                + '_,
236        >,
237        FarmError,
238    > {
239        let iter = offsets.map(move |offset| async move {
240            Ok((offset, farm::PieceCache::read_piece(self, offset).await?))
241        });
242        Ok(Box::new(
243            // Constrain concurrency to avoid excessive memory usage, while still getting
244            // performance of concurrent reads
245            stream::iter(iter).buffer_unordered(PIECES_READING_CONCURRENCY),
246        ))
247    }
248}
249
250impl DiskPieceCache {
251    pub(crate) const FILE_NAME: &'static str = "piece_cache.bin";
252
253    /// Open cache, capacity is measured in elements of [`DiskPieceCache::element_size()`] size
254    pub fn open(
255        directory: &Path,
256        capacity: NonZeroU32,
257        id: Option<PieceCacheId>,
258        registry: Option<&mut Registry>,
259    ) -> Result<Self, DiskPieceCacheError> {
260        let capacity = capacity.get();
261        let files = FilePool::open(&directory.join(Self::FILE_NAME))?;
262
263        let expected_size = u64::from(Self::element_size()) * u64::from(capacity);
264        // Align plot file size for disk sector size
265        let expected_size = expected_size.div_ceil(DISK_PAGE_SIZE as u64) * DISK_PAGE_SIZE as u64;
266        {
267            let file = files.write();
268            if file.size()? != expected_size {
269                // Allocating the whole file (`set_len` below can create a sparse file, which will
270                // cause writes to fail later)
271                file.preallocate(expected_size)
272                    .map_err(DiskPieceCacheError::CantPreallocateCacheFile)?;
273                // Truncating file (if necessary)
274                file.set_len(expected_size)?;
275            }
276        }
277
278        // ID for cache is ephemeral unless provided explicitly
279        let id = id.unwrap_or_else(PieceCacheId::new);
280        let metrics = registry.map(|registry| DiskPieceCacheMetrics::new(registry, &id, capacity));
281
282        Ok(Self {
283            inner: Arc::new(Inner {
284                id,
285                files,
286                max_num_elements: capacity,
287                metrics,
288            }),
289        })
290    }
291
292    /// Size of a single piece cache element
293    pub const fn element_size() -> u32 {
294        (PieceIndex::SIZE + Piece::SIZE + Blake3Hash::SIZE) as u32
295    }
296
297    /// Contents of this piece cache
298    ///
299    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
300    /// doesn't happen for the same piece being accessed!
301    pub(crate) fn contents(
302        &self,
303    ) -> impl ExactSizeIterator<Item = (PieceCacheOffset, Option<PieceIndex>)> + '_ {
304        let mut element = vec![0; Self::element_size() as usize];
305        let count_total = self
306            .inner
307            .metrics
308            .as_ref()
309            .map(|metrics| {
310                metrics.contents.inc();
311                metrics.capacity_used.get() == 0
312            })
313            .unwrap_or_default();
314        let mut current_skip = 0;
315
316        // TODO: Parallelize or read in larger batches
317        (0..self.inner.max_num_elements).map(move |offset| {
318            if current_skip > CONTENTS_READ_SKIP_LIMIT {
319                return (PieceCacheOffset(offset), None);
320            }
321
322            match self.read_piece_internal(offset, &mut element) {
323                Ok(maybe_piece_index) => {
324                    if maybe_piece_index.is_none() {
325                        current_skip += 1;
326                    } else {
327                        if count_total && let Some(metrics) = &self.inner.metrics {
328                            metrics.capacity_used.inc();
329                        }
330                        current_skip = 0;
331                    }
332
333                    (PieceCacheOffset(offset), maybe_piece_index)
334                }
335                Err(error) => {
336                    warn!(%error, %offset, "Failed to read cache element");
337
338                    current_skip += 1;
339
340                    (PieceCacheOffset(offset), None)
341                }
342            }
343        })
344    }
345
346    /// Store piece in cache at specified offset, replacing existing piece if there is one.
347    ///
348    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
349    /// doesn't happen for the same piece being accessed!
350    pub(crate) fn write_piece(
351        &self,
352        offset: PieceCacheOffset,
353        piece_index: PieceIndex,
354        piece: &Piece,
355    ) -> Result<(), DiskPieceCacheError> {
356        let PieceCacheOffset(offset) = offset;
357        if offset >= self.inner.max_num_elements {
358            return Err(DiskPieceCacheError::OffsetOutsideOfRange {
359                provided: offset,
360                max: self.inner.max_num_elements - 1,
361            });
362        }
363
364        if let Some(metrics) = &self.inner.metrics {
365            metrics.write_piece.inc();
366            let capacity_used = i64::from(offset + 1);
367            if metrics.capacity_used.get() != capacity_used {
368                metrics.capacity_used.set(capacity_used);
369            }
370        }
371        let element_offset = u64::from(offset) * u64::from(Self::element_size());
372
373        let piece_index_bytes = piece_index.to_bytes();
374        // File writes are read/write/modify internally, so combine all data here for more efficient
375        // write
376        let mut bytes = Vec::with_capacity(PieceIndex::SIZE + piece.len() + Blake3Hash::SIZE);
377        bytes.extend_from_slice(&piece_index_bytes);
378        bytes.extend_from_slice(piece.as_ref());
379        bytes.extend_from_slice(
380            {
381                let mut hasher = blake3::Hasher::new();
382                hasher.update(&piece_index_bytes);
383                hasher.update(piece.as_ref());
384                hasher.finalize()
385            }
386            .as_bytes(),
387        );
388        self.inner
389            .files
390            .write()
391            .write_all_at(&bytes, element_offset)?;
392
393        Ok(())
394    }
395
396    /// Read piece index from cache at specified offset.
397    ///
398    /// Returns `None` if offset is out of range.
399    ///
400    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
401    /// doesn't happen for the same piece being accessed!
402    pub(crate) fn read_piece_index(
403        &self,
404        offset: PieceCacheOffset,
405    ) -> Result<Option<PieceIndex>, DiskPieceCacheError> {
406        let PieceCacheOffset(offset) = offset;
407        if offset >= self.inner.max_num_elements {
408            warn!(%offset, "Trying to read piece out of range, this must be an implementation bug");
409            return Err(DiskPieceCacheError::OffsetOutsideOfRange {
410                provided: offset,
411                max: self.inner.max_num_elements - 1,
412            });
413        }
414
415        if let Some(metrics) = &self.inner.metrics {
416            metrics.read_piece_index.inc();
417        }
418        self.read_piece_internal(offset, &mut vec![0; Self::element_size() as usize])
419    }
420
421    /// Read piece from cache at specified offset.
422    ///
423    /// Returns `None` if offset is out of range.
424    ///
425    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
426    /// doesn't happen for the same piece being accessed!
427    pub(crate) fn read_piece(
428        &self,
429        offset: PieceCacheOffset,
430    ) -> Result<Option<(PieceIndex, Piece)>, DiskPieceCacheError> {
431        let PieceCacheOffset(offset) = offset;
432        if offset >= self.inner.max_num_elements {
433            warn!(%offset, "Trying to read piece out of range, this must be an implementation bug");
434            return Err(DiskPieceCacheError::OffsetOutsideOfRange {
435                provided: offset,
436                max: self.inner.max_num_elements - 1,
437            });
438        }
439
440        if let Some(metrics) = &self.inner.metrics {
441            metrics.read_piece.inc();
442        }
443        let mut element = BytesMut::zeroed(Self::element_size() as usize);
444        if let Some(piece_index) = self.read_piece_internal(offset, &mut element)? {
445            let element = element.freeze();
446            let piece =
447                Piece::try_from(element.slice_ref(&element[PieceIndex::SIZE..][..Piece::SIZE]))
448                    .expect("Correct length; qed");
449            Ok(Some((piece_index, piece)))
450        } else {
451            Ok(None)
452        }
453    }
454
455    fn read_piece_internal(
456        &self,
457        offset: u32,
458        element: &mut [u8],
459    ) -> Result<Option<PieceIndex>, DiskPieceCacheError> {
460        self.inner
461            .files
462            .read()
463            .read_exact_at(element, u64::from(offset) * u64::from(Self::element_size()))?;
464
465        let (piece_index_bytes, remaining_bytes) = element.split_at(PieceIndex::SIZE);
466        let (piece_bytes, expected_checksum) = remaining_bytes.split_at(Piece::SIZE);
467
468        // Verify checksum
469        let actual_checksum = {
470            let mut hasher = blake3::Hasher::new();
471            hasher.update(piece_index_bytes);
472            hasher.update(piece_bytes);
473            *hasher.finalize().as_bytes()
474        };
475        if actual_checksum != expected_checksum {
476            if element.iter().all(|&byte| byte == 0) {
477                return Ok(None);
478            }
479
480            debug!(
481                actual_checksum = %hex::encode(actual_checksum),
482                expected_checksum = %hex::encode(expected_checksum),
483                "Hash doesn't match, corrupted piece in cache"
484            );
485
486            return Err(DiskPieceCacheError::ChecksumMismatch);
487        }
488
489        let piece_index = PieceIndex::from_bytes(
490            piece_index_bytes
491                .try_into()
492                .expect("Statically known to have correct size; qed"),
493        );
494        Ok(Some(piece_index))
495    }
496
497    pub(crate) fn wipe(directory: &Path) -> io::Result<()> {
498        let piece_cache = directory.join(Self::FILE_NAME);
499        if !piece_cache.exists() {
500            return Ok(());
501        }
502        info!("Deleting piece cache file at {}", piece_cache.display());
503        fs::remove_file(piece_cache)
504    }
505}