1mod metrics;
4#[cfg(not(miri))]
6#[cfg(test)]
7mod tests;
8
9use crate::disk_piece_cache::metrics::DiskPieceCacheMetrics;
10use crate::farm;
11use crate::farm::{FarmError, PieceCacheId, PieceCacheOffset};
12use crate::single_disk_farm::direct_io_file_wrapper::{DISK_PAGE_SIZE, DirectIoFileWrapper};
13use crate::utils::AsyncJoinOnDrop;
14use ab_core_primitives::hashes::Blake3Hash;
15use ab_core_primitives::pieces::{Piece, PieceIndex};
16use ab_farmer_components::file_ext::FileExt;
17use async_trait::async_trait;
18use bytes::BytesMut;
19use futures::channel::mpsc;
20use futures::{SinkExt, Stream, StreamExt, stream};
21use parking_lot::Mutex;
22use prometheus_client::registry::Registry;
23use std::num::NonZeroU32;
24use std::path::Path;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicU8, Ordering};
27use std::task::Poll;
28use std::{fs, io};
29use thiserror::Error;
30use tokio::runtime::Handle;
31use tokio::task;
32use tracing::{Span, debug, info, warn};
33
34const CONTENTS_READ_SKIP_LIMIT: usize = 3;
37const PIECES_READING_CONCURRENCY: usize = 32;
39
40#[derive(Debug, Error)]
42pub enum DiskPieceCacheError {
43 #[error("Disk piece cache I/O error: {0}")]
45 Io(#[from] io::Error),
46 #[error("Can't preallocate cache file, probably not enough space on disk: {0}")]
48 CantPreallocateCacheFile(io::Error),
49 #[error("Offset outsize of range: provided {provided}, max {max}")]
51 OffsetOutsideOfRange {
52 provided: u32,
54 max: u32,
56 },
57 #[error("Checksum mismatch")]
59 ChecksumMismatch,
60}
61
62#[derive(Debug)]
63struct FilePool {
64 files: Box<[DirectIoFileWrapper; PIECES_READING_CONCURRENCY]>,
65 cursor: AtomicU8,
66}
67
68impl FilePool {
69 fn open(path: &Path) -> io::Result<Self> {
70 let files = (0..PIECES_READING_CONCURRENCY)
71 .map(|_| DirectIoFileWrapper::open(path))
72 .collect::<Result<Box<_>, _>>()?
73 .try_into()
74 .expect("Statically correct length; qed");
75 Ok(Self {
76 files,
77 cursor: AtomicU8::new(0),
78 })
79 }
80
81 fn read(&self) -> &DirectIoFileWrapper {
82 let position = usize::from(self.cursor.fetch_add(1, Ordering::Relaxed));
83 &self.files[position % PIECES_READING_CONCURRENCY]
84 }
85
86 fn write(&self) -> &DirectIoFileWrapper {
87 &self.files[0]
90 }
91}
92
93#[derive(Debug)]
94struct Inner {
95 id: PieceCacheId,
96 files: FilePool,
97 max_num_elements: u32,
98 metrics: Option<DiskPieceCacheMetrics>,
99}
100
101#[derive(Debug, Clone)]
106pub struct DiskPieceCache {
107 inner: Arc<Inner>,
108}
109
110#[async_trait]
111impl farm::PieceCache for DiskPieceCache {
112 fn id(&self) -> &PieceCacheId {
113 &self.inner.id
114 }
115
116 #[inline]
117 fn max_num_elements(&self) -> u32 {
118 self.inner.max_num_elements
119 }
120
121 async fn contents(
122 &self,
123 ) -> Result<
124 Box<
125 dyn Stream<Item = Result<(PieceCacheOffset, Option<PieceIndex>), FarmError>>
126 + Unpin
127 + Send
128 + '_,
129 >,
130 FarmError,
131 > {
132 let this = self.clone();
133 let (mut sender, receiver) = mpsc::channel(100_000);
134 let span = Span::current();
135 let read_contents = task::spawn_blocking(move || {
136 let _guard = span.enter();
137
138 let contents = this.contents();
139 for (piece_cache_offset, maybe_piece) in contents {
140 if let Err(error) =
141 Handle::current().block_on(sender.send(Ok((piece_cache_offset, maybe_piece))))
142 {
143 debug!(%error, "Aborting contents iteration due to receiver dropping");
144 break;
145 }
146 }
147 });
148 let read_contents = Mutex::new(Some(AsyncJoinOnDrop::new(read_contents, false)));
149 let mut receiver = receiver;
151
152 Ok(Box::new(stream::poll_fn(move |ctx| {
153 let poll_result = receiver.poll_next_unpin(ctx);
154
155 if matches!(poll_result, Poll::Ready(None)) {
156 read_contents.lock().take();
157 }
158
159 poll_result
160 })))
161 }
162
163 async fn write_piece(
164 &self,
165 offset: PieceCacheOffset,
166 piece_index: PieceIndex,
167 piece: &Piece,
168 ) -> Result<(), FarmError> {
169 let piece = piece.clone();
170 let piece_cache = self.clone();
171 Ok(AsyncJoinOnDrop::new(
172 task::spawn_blocking(move || piece_cache.write_piece(offset, piece_index, &piece)),
173 false,
174 )
175 .await??)
176 }
177
178 async fn read_piece_index(
179 &self,
180 offset: PieceCacheOffset,
181 ) -> Result<Option<PieceIndex>, FarmError> {
182 let piece_cache = self.clone();
183 let span = Span::current();
184 Ok(AsyncJoinOnDrop::new(
185 task::spawn_blocking(move || {
186 let _guard = span.enter();
187
188 piece_cache.read_piece_index(offset)
189 }),
190 false,
191 )
192 .await??)
193 }
194
195 async fn read_piece(
196 &self,
197 offset: PieceCacheOffset,
198 ) -> Result<Option<(PieceIndex, Piece)>, FarmError> {
199 let span = Span::current();
200
201 if cfg!(windows) {
208 Ok(task::block_in_place(|| {
209 let _guard = span.enter();
210
211 self.read_piece(offset)
212 })?)
213 } else {
214 let piece_cache = self.clone();
215 Ok(AsyncJoinOnDrop::new(
216 task::spawn_blocking(move || {
217 let _guard = span.enter();
218
219 piece_cache.read_piece(offset)
220 }),
221 false,
222 )
223 .await??)
224 }
225 }
226
227 async fn read_pieces(
228 &self,
229 offsets: Box<dyn Iterator<Item = PieceCacheOffset> + Send>,
230 ) -> Result<
231 Box<
232 dyn Stream<Item = Result<(PieceCacheOffset, Option<(PieceIndex, Piece)>), FarmError>>
233 + Send
234 + Unpin
235 + '_,
236 >,
237 FarmError,
238 > {
239 let iter = offsets.map(move |offset| async move {
240 Ok((offset, farm::PieceCache::read_piece(self, offset).await?))
241 });
242 Ok(Box::new(
243 stream::iter(iter).buffer_unordered(PIECES_READING_CONCURRENCY),
246 ))
247 }
248}
249
250impl DiskPieceCache {
251 pub(crate) const FILE_NAME: &'static str = "piece_cache.bin";
252
253 pub fn open(
255 directory: &Path,
256 capacity: NonZeroU32,
257 id: Option<PieceCacheId>,
258 registry: Option<&mut Registry>,
259 ) -> Result<Self, DiskPieceCacheError> {
260 let capacity = capacity.get();
261 let files = FilePool::open(&directory.join(Self::FILE_NAME))?;
262
263 let expected_size = u64::from(Self::element_size()) * u64::from(capacity);
264 let expected_size = expected_size.div_ceil(DISK_PAGE_SIZE as u64) * DISK_PAGE_SIZE as u64;
266 {
267 let file = files.write();
268 if file.size()? != expected_size {
269 file.preallocate(expected_size)
272 .map_err(DiskPieceCacheError::CantPreallocateCacheFile)?;
273 file.set_len(expected_size)?;
275 }
276 }
277
278 let id = id.unwrap_or_else(PieceCacheId::new);
280 let metrics = registry.map(|registry| DiskPieceCacheMetrics::new(registry, &id, capacity));
281
282 Ok(Self {
283 inner: Arc::new(Inner {
284 id,
285 files,
286 max_num_elements: capacity,
287 metrics,
288 }),
289 })
290 }
291
292 pub const fn element_size() -> u32 {
294 (PieceIndex::SIZE + Piece::SIZE + Blake3Hash::SIZE) as u32
295 }
296
297 pub(crate) fn contents(
302 &self,
303 ) -> impl ExactSizeIterator<Item = (PieceCacheOffset, Option<PieceIndex>)> + '_ {
304 let mut element = vec![0; Self::element_size() as usize];
305 let count_total = self
306 .inner
307 .metrics
308 .as_ref()
309 .map(|metrics| {
310 metrics.contents.inc();
311 metrics.capacity_used.get() == 0
312 })
313 .unwrap_or_default();
314 let mut current_skip = 0;
315
316 (0..self.inner.max_num_elements).map(move |offset| {
318 if current_skip > CONTENTS_READ_SKIP_LIMIT {
319 return (PieceCacheOffset(offset), None);
320 }
321
322 match self.read_piece_internal(offset, &mut element) {
323 Ok(maybe_piece_index) => {
324 if maybe_piece_index.is_none() {
325 current_skip += 1;
326 } else {
327 if count_total && let Some(metrics) = &self.inner.metrics {
328 metrics.capacity_used.inc();
329 }
330 current_skip = 0;
331 }
332
333 (PieceCacheOffset(offset), maybe_piece_index)
334 }
335 Err(error) => {
336 warn!(%error, %offset, "Failed to read cache element");
337
338 current_skip += 1;
339
340 (PieceCacheOffset(offset), None)
341 }
342 }
343 })
344 }
345
346 pub(crate) fn write_piece(
351 &self,
352 offset: PieceCacheOffset,
353 piece_index: PieceIndex,
354 piece: &Piece,
355 ) -> Result<(), DiskPieceCacheError> {
356 let PieceCacheOffset(offset) = offset;
357 if offset >= self.inner.max_num_elements {
358 return Err(DiskPieceCacheError::OffsetOutsideOfRange {
359 provided: offset,
360 max: self.inner.max_num_elements - 1,
361 });
362 }
363
364 if let Some(metrics) = &self.inner.metrics {
365 metrics.write_piece.inc();
366 let capacity_used = i64::from(offset + 1);
367 if metrics.capacity_used.get() != capacity_used {
368 metrics.capacity_used.set(capacity_used);
369 }
370 }
371 let element_offset = u64::from(offset) * u64::from(Self::element_size());
372
373 let piece_index_bytes = piece_index.to_bytes();
374 let mut bytes = Vec::with_capacity(PieceIndex::SIZE + piece.len() + Blake3Hash::SIZE);
377 bytes.extend_from_slice(&piece_index_bytes);
378 bytes.extend_from_slice(piece.as_ref());
379 bytes.extend_from_slice(
380 {
381 let mut hasher = blake3::Hasher::new();
382 hasher.update(&piece_index_bytes);
383 hasher.update(piece.as_ref());
384 hasher.finalize()
385 }
386 .as_bytes(),
387 );
388 self.inner
389 .files
390 .write()
391 .write_all_at(&bytes, element_offset)?;
392
393 Ok(())
394 }
395
396 pub(crate) fn read_piece_index(
403 &self,
404 offset: PieceCacheOffset,
405 ) -> Result<Option<PieceIndex>, DiskPieceCacheError> {
406 let PieceCacheOffset(offset) = offset;
407 if offset >= self.inner.max_num_elements {
408 warn!(%offset, "Trying to read piece out of range, this must be an implementation bug");
409 return Err(DiskPieceCacheError::OffsetOutsideOfRange {
410 provided: offset,
411 max: self.inner.max_num_elements - 1,
412 });
413 }
414
415 if let Some(metrics) = &self.inner.metrics {
416 metrics.read_piece_index.inc();
417 }
418 self.read_piece_internal(offset, &mut vec![0; Self::element_size() as usize])
419 }
420
421 pub(crate) fn read_piece(
428 &self,
429 offset: PieceCacheOffset,
430 ) -> Result<Option<(PieceIndex, Piece)>, DiskPieceCacheError> {
431 let PieceCacheOffset(offset) = offset;
432 if offset >= self.inner.max_num_elements {
433 warn!(%offset, "Trying to read piece out of range, this must be an implementation bug");
434 return Err(DiskPieceCacheError::OffsetOutsideOfRange {
435 provided: offset,
436 max: self.inner.max_num_elements - 1,
437 });
438 }
439
440 if let Some(metrics) = &self.inner.metrics {
441 metrics.read_piece.inc();
442 }
443 let mut element = BytesMut::zeroed(Self::element_size() as usize);
444 if let Some(piece_index) = self.read_piece_internal(offset, &mut element)? {
445 let element = element.freeze();
446 let piece =
447 Piece::try_from(element.slice_ref(&element[PieceIndex::SIZE..][..Piece::SIZE]))
448 .expect("Correct length; qed");
449 Ok(Some((piece_index, piece)))
450 } else {
451 Ok(None)
452 }
453 }
454
455 fn read_piece_internal(
456 &self,
457 offset: u32,
458 element: &mut [u8],
459 ) -> Result<Option<PieceIndex>, DiskPieceCacheError> {
460 self.inner
461 .files
462 .read()
463 .read_exact_at(element, u64::from(offset) * u64::from(Self::element_size()))?;
464
465 let (piece_index_bytes, remaining_bytes) = element.split_at(PieceIndex::SIZE);
466 let (piece_bytes, expected_checksum) = remaining_bytes.split_at(Piece::SIZE);
467
468 let actual_checksum = {
470 let mut hasher = blake3::Hasher::new();
471 hasher.update(piece_index_bytes);
472 hasher.update(piece_bytes);
473 *hasher.finalize().as_bytes()
474 };
475 if actual_checksum != expected_checksum {
476 if element.iter().all(|&byte| byte == 0) {
477 return Ok(None);
478 }
479
480 debug!(
481 actual_checksum = %hex::encode(actual_checksum),
482 expected_checksum = %hex::encode(expected_checksum),
483 "Hash doesn't match, corrupted piece in cache"
484 );
485
486 return Err(DiskPieceCacheError::ChecksumMismatch);
487 }
488
489 let piece_index = PieceIndex::from_bytes(
490 piece_index_bytes
491 .try_into()
492 .expect("Statically known to have correct size; qed"),
493 );
494 Ok(Some(piece_index))
495 }
496
497 pub(crate) fn wipe(directory: &Path) -> io::Result<()> {
498 let piece_cache = directory.join(Self::FILE_NAME);
499 if !piece_cache.exists() {
500 return Ok(());
501 }
502 info!("Deleting piece cache file at {}", piece_cache.display());
503 fs::remove_file(piece_cache)
504 }
505}