ab_farmer/single_disk_farm/
plot_cache.rs1#[cfg(not(miri))]
5#[cfg(test)]
6mod tests;
7
8use crate::farm::{FarmError, MaybePieceStoredResult, PlotCache};
9use crate::single_disk_farm::direct_io_file_wrapper::DirectIoFileWrapper;
10use crate::utils::AsyncJoinOnDrop;
11use ab_core_primitives::hashes::Blake3Hash;
12use ab_core_primitives::pieces::{Piece, PieceIndex};
13use ab_farmer_components::file_ext::FileExt;
14use ab_farmer_components::sector::SectorMetadataChecksummed;
15use ab_networking::libp2p::kad::RecordKey;
16use ab_networking::utils::multihash::ToMultihash;
17use async_lock::RwLock as AsyncRwLock;
18use async_trait::async_trait;
19use bytes::BytesMut;
20use parking_lot::RwLock;
21use std::collections::HashMap;
22use std::sync::{Arc, Weak};
23use std::{io, mem};
24use thiserror::Error;
25use tokio::task;
26use tracing::{debug, info, trace, warn};
27
28#[derive(Debug, Error)]
30pub enum DiskPlotCacheError {
31 #[error("Plot cache I/O error: {0}")]
33 Io(#[from] io::Error),
34 #[error("Failed to spawn task for blocking thread: {0}")]
36 TokioJoinError(#[from] tokio::task::JoinError),
37 #[error("Checksum mismatch")]
39 ChecksumMismatch,
40}
41
42#[derive(Debug)]
43struct CachedPieces {
44 map: HashMap<RecordKey, u32>,
46 next_offset: Option<u32>,
47}
48
49#[derive(Debug, Clone)]
51pub struct DiskPlotCache {
52 file: Weak<DirectIoFileWrapper>,
53 sectors_metadata: Weak<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
54 cached_pieces: Arc<RwLock<CachedPieces>>,
55 target_sector_count: u16,
56 sector_size: u64,
57}
58
59#[async_trait]
60impl PlotCache for DiskPlotCache {
61 async fn is_piece_maybe_stored(
62 &self,
63 key: &RecordKey,
64 ) -> Result<MaybePieceStoredResult, FarmError> {
65 Ok(self.is_piece_maybe_stored(key))
66 }
67
68 async fn try_store_piece(
71 &self,
72 piece_index: PieceIndex,
73 piece: &Piece,
74 ) -> Result<bool, FarmError> {
75 Ok(self.try_store_piece(piece_index, piece).await?)
76 }
77
78 async fn read_piece(&self, key: &RecordKey) -> Result<Option<Piece>, FarmError> {
79 Ok(self.read_piece(key).await)
80 }
81}
82
83impl DiskPlotCache {
84 pub(crate) fn new(
85 file: &Arc<DirectIoFileWrapper>,
86 sectors_metadata: &Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
87 target_sector_count: u16,
88 sector_size: u64,
89 ) -> Self {
90 info!("Checking plot cache contents, this can take a while");
91 let cached_pieces = {
92 let sectors_metadata = sectors_metadata.read_blocking();
93 let mut element = vec![0; Self::element_size() as usize];
94 #[allow(clippy::mutable_key_type)]
96 let mut map = HashMap::new();
97 let mut next_offset = None;
98
99 let file_size = sector_size * u64::from(target_sector_count);
100 let plotted_size = sector_size * sectors_metadata.len() as u64;
101
102 let from_offset = (plotted_size / Self::element_size() as u64) as u32;
104 let to_offset = (file_size / Self::element_size() as u64) as u32;
105 for offset in (from_offset..to_offset).rev() {
107 match Self::read_piece_internal(file, offset, &mut element) {
108 Ok(maybe_piece_index) => match maybe_piece_index {
109 Some(piece_index) => {
110 map.insert(RecordKey::from(piece_index.to_multihash()), offset);
111 }
112 None => {
113 next_offset.replace(offset);
114 break;
115 }
116 },
117 Err(DiskPlotCacheError::ChecksumMismatch) => {
118 next_offset.replace(offset);
119 break;
120 }
121 Err(error) => {
122 warn!(%error, %offset, "Failed to read plot cache element");
123 break;
124 }
125 }
126 }
127
128 CachedPieces { map, next_offset }
129 };
130
131 info!("Finished checking plot cache contents");
132
133 Self {
134 file: Arc::downgrade(file),
135 sectors_metadata: Arc::downgrade(sectors_metadata),
136 cached_pieces: Arc::new(RwLock::new(cached_pieces)),
137 target_sector_count,
138 sector_size,
139 }
140 }
141
142 pub(crate) const fn element_size() -> u32 {
144 (PieceIndex::SIZE + Piece::SIZE + Blake3Hash::SIZE) as u32
145 }
146
147 pub(crate) fn is_piece_maybe_stored(&self, key: &RecordKey) -> MaybePieceStoredResult {
150 let offset = {
151 let cached_pieces = self.cached_pieces.read();
152
153 let Some(offset) = cached_pieces.map.get(key).copied() else {
154 return if cached_pieces.next_offset.is_some() {
155 MaybePieceStoredResult::Vacant
156 } else {
157 MaybePieceStoredResult::No
158 };
159 };
160
161 offset
162 };
163
164 let Some(sectors_metadata) = self.sectors_metadata.upgrade() else {
165 return MaybePieceStoredResult::No;
166 };
167
168 let element_offset = u64::from(offset) * u64::from(Self::element_size());
169 let plotted_bytes = self.sector_size * sectors_metadata.read_blocking().len() as u64;
171
172 if element_offset < plotted_bytes {
174 self.cached_pieces.write().map.remove(key);
176 MaybePieceStoredResult::No
177 } else {
178 MaybePieceStoredResult::Yes
179 }
180 }
181
182 pub(crate) async fn try_store_piece(
185 &self,
186 piece_index: PieceIndex,
187 piece: &Piece,
188 ) -> Result<bool, DiskPlotCacheError> {
189 let offset = {
190 if self.cached_pieces.read().next_offset.is_none() {
193 return Ok(false);
194 };
195
196 let mut cached_pieces = self.cached_pieces.write();
199 let Some(next_offset) = cached_pieces.next_offset else {
200 return Ok(false);
201 };
202
203 let offset = next_offset;
204 cached_pieces.next_offset = offset.checked_sub(1);
205 offset
206 };
207
208 let Some(sectors_metadata) = self.sectors_metadata.upgrade() else {
209 return Ok(false);
211 };
212
213 let element_offset = u64::from(offset) * u64::from(Self::element_size());
214 let sectors_metadata = sectors_metadata.read().await;
215 let plotted_sectors_count = sectors_metadata.len() as u16;
216 let plotted_bytes = self.sector_size * u64::from(plotted_sectors_count);
217
218 if element_offset < plotted_bytes {
220 drop(sectors_metadata);
222 let mut cached_pieces = self.cached_pieces.write();
223 cached_pieces.next_offset.take();
225 if plotted_sectors_count == self.target_sector_count {
226 mem::take(&mut cached_pieces.map);
228 }
229 return Ok(false);
230 }
231
232 let Some(file) = self.file.upgrade() else {
233 return Ok(false);
235 };
236
237 trace!(
238 %offset,
239 ?piece_index,
240 %plotted_sectors_count,
241 "Found available piece cache free space offset, writing piece",
242 );
243
244 let write_fut = tokio::task::spawn_blocking({
245 let piece_index_bytes = piece_index.to_bytes();
246 let mut bytes = Vec::with_capacity(PieceIndex::SIZE + piece.len() + Blake3Hash::SIZE);
249 bytes.extend_from_slice(&piece_index_bytes);
250 bytes.extend_from_slice(piece.as_ref());
251 bytes.extend_from_slice(
252 {
253 let mut hasher = blake3::Hasher::new();
254 hasher.update(&piece_index_bytes);
255 hasher.update(piece.as_ref());
256 hasher.finalize()
257 }
258 .as_bytes(),
259 );
260
261 move || file.write_all_at(&bytes, element_offset)
262 });
263
264 AsyncJoinOnDrop::new(write_fut, false).await??;
265
266 drop(sectors_metadata);
268 self.cached_pieces
270 .write()
271 .map
272 .insert(RecordKey::from(piece_index.to_multihash()), offset);
273
274 Ok(true)
275 }
276
277 pub(crate) async fn read_piece(&self, key: &RecordKey) -> Option<Piece> {
281 let offset = self.cached_pieces.read().map.get(key).copied()?;
282
283 let file = self.file.upgrade()?;
284
285 let read_fn = move || {
286 let mut element = BytesMut::zeroed(Self::element_size() as usize);
287 if let Ok(Some(_piece_index)) = Self::read_piece_internal(&file, offset, &mut element) {
288 let element = element.freeze();
289 let piece =
290 Piece::try_from(element.slice_ref(&element[PieceIndex::SIZE..][..Piece::SIZE]))
291 .expect("Correct length; qed");
292 Some(piece)
293 } else {
294 None
295 }
296 };
297 let maybe_piece = if cfg!(windows) {
304 task::block_in_place(read_fn)
305 } else {
306 let read_fut = task::spawn_blocking(read_fn);
307
308 AsyncJoinOnDrop::new(read_fut, false)
309 .await
310 .unwrap_or_default()
311 };
312
313 if maybe_piece.is_none()
314 && let Some(sectors_metadata) = self.sectors_metadata.upgrade()
315 {
316 let plotted_sectors_count = sectors_metadata.read().await.len() as u16;
317
318 let mut cached_pieces = self.cached_pieces.write();
319 if plotted_sectors_count == self.target_sector_count {
320 mem::take(&mut cached_pieces.map);
322 } else {
323 cached_pieces.map.remove(key);
325 }
326 }
327
328 maybe_piece
329 }
330
331 fn read_piece_internal(
332 file: &DirectIoFileWrapper,
333 offset: u32,
334 element: &mut [u8],
335 ) -> Result<Option<PieceIndex>, DiskPlotCacheError> {
336 file.read_exact_at(element, u64::from(offset) * u64::from(Self::element_size()))?;
337
338 let (piece_index_bytes, remaining_bytes) = element.split_at(PieceIndex::SIZE);
339 let (piece_bytes, expected_checksum) = remaining_bytes.split_at(Piece::SIZE);
340
341 let actual_checksum = {
343 let mut hasher = blake3::Hasher::new();
344 hasher.update(piece_index_bytes);
345 hasher.update(piece_bytes);
346 *hasher.finalize().as_bytes()
347 };
348 if actual_checksum != expected_checksum {
349 if element.iter().all(|&byte| byte == 0) {
350 return Ok(None);
351 }
352
353 debug!(
354 actual_checksum = %hex::encode(actual_checksum),
355 expected_checksum = %hex::encode(expected_checksum),
356 "Hash doesn't match, corrupted or overridden piece in cache"
357 );
358
359 return Err(DiskPlotCacheError::ChecksumMismatch);
360 }
361
362 let piece_index = PieceIndex::from_bytes(
363 piece_index_bytes
364 .try_into()
365 .expect("Statically known to have correct size; qed"),
366 );
367 Ok(Some(piece_index))
368 }
369}