ab_farmer_components/
reading.rs

1//! Reading utilities
2//!
3//! This module contains utilities for extracting data from plots/sectors created by functions in
4//! [`plotting`](crate::plotting) module earlier. This is a relatively expensive operation and is
5//! only used for cold storage purposes or when there is a need to prove a solution to consensus.
6
7use crate::sector::{
8    RecordMetadata, SectorContentsMap, SectorContentsMapFromBytesError, SectorMetadataChecksummed,
9    sector_record_chunks_size,
10};
11use crate::{ReadAt, ReadAtAsync, ReadAtSync};
12use ab_core_primitives::hashes::Blake3Hash;
13use ab_core_primitives::pieces::{Piece, PieceOffset, Record, RecordChunk};
14use ab_core_primitives::sectors::{SBucket, SectorId};
15use ab_erasure_coding::{ErasureCoding, ErasureCodingError, RecoveryShardState};
16use ab_proof_of_space::{Table, TableGenerator};
17use futures::StreamExt;
18use futures::stream::FuturesUnordered;
19use parity_scale_codec::Decode;
20use rayon::prelude::*;
21use std::mem::ManuallyDrop;
22use std::simd::Simd;
23use std::str::FromStr;
24use std::{fmt, io};
25use thiserror::Error;
26use tracing::debug;
27
28/// Errors that happen during reading
29#[derive(Debug, Error)]
30pub enum ReadingError {
31    /// Failed to read chunk.
32    ///
33    /// This is an implementation bug, most likely due to mismatch between sector contents map and
34    /// other farming parameters.
35    #[error("Failed to read chunk at location {chunk_location}: {error}")]
36    FailedToReadChunk {
37        /// Chunk location
38        chunk_location: u64,
39        /// Low-level error
40        error: io::Error,
41    },
42    /// Missing proof of space proof.
43    ///
44    /// This is either hardware issue or if happens for everyone all the time an implementation
45    /// bug.
46    #[error("Missing PoS proof for s-bucket {s_bucket}")]
47    MissingPosProof {
48        /// S-bucket
49        s_bucket: SBucket,
50    },
51    /// Failed to erasure-decode record
52    #[error("Failed to erasure-decode record at offset {piece_offset}: {error}")]
53    FailedToErasureDecodeRecord {
54        /// Piece offset
55        piece_offset: PieceOffset,
56        /// Lower-level error
57        error: ErasureCodingError,
58    },
59    /// Wrong record size after decoding
60    #[error("Wrong record size after decoding: expected {expected}, actual {actual}")]
61    WrongRecordSizeAfterDecoding {
62        /// Expected size in bytes
63        expected: usize,
64        /// Actual size in bytes
65        actual: usize,
66    },
67    /// Failed to decode sector contents map
68    #[error("Failed to decode sector contents map: {0}")]
69    FailedToDecodeSectorContentsMap(#[from] SectorContentsMapFromBytesError),
70    /// I/O error occurred
71    #[error("Reading I/O error: {0}")]
72    Io(#[from] io::Error),
73    /// Checksum mismatch
74    #[error("Checksum mismatch")]
75    ChecksumMismatch,
76}
77
78impl ReadingError {
79    /// Whether this error is fatal and renders farm unusable
80    pub fn is_fatal(&self) -> bool {
81        match self {
82            ReadingError::FailedToReadChunk { .. } => false,
83            ReadingError::MissingPosProof { .. } => false,
84            ReadingError::FailedToErasureDecodeRecord { .. } => false,
85            ReadingError::WrongRecordSizeAfterDecoding { .. } => false,
86            ReadingError::FailedToDecodeSectorContentsMap(_) => false,
87            ReadingError::Io(_) => true,
88            ReadingError::ChecksumMismatch => false,
89        }
90    }
91}
92
93/// Defines a mode of reading chunks in [`read_sector_record_chunks`].
94///
95/// Which option that is slower or faster depends on disk used, there is no one-size-fits-all here,
96/// unfortunately.
97#[derive(Debug, Copy, Clone)]
98pub enum ReadSectorRecordChunksMode {
99    /// Read individual chunks ([`RecordChunk::SIZE`] in size) concurrently, which results in lower
100    /// total data transfer, but requires for SSD to support high concurrency and low latency
101    ConcurrentChunks,
102    /// Read the whole sector at once and extract chunks from in-memory buffer, which uses more
103    /// memory, but only requires linear read speed from the disk to be decent
104    WholeSector,
105}
106
107impl fmt::Display for ReadSectorRecordChunksMode {
108    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109        match self {
110            Self::ConcurrentChunks => {
111                write!(f, "ConcurrentChunks")
112            }
113            Self::WholeSector => {
114                write!(f, "WholeSector")
115            }
116        }
117    }
118}
119
120impl FromStr for ReadSectorRecordChunksMode {
121    type Err = String;
122
123    fn from_str(s: &str) -> Result<Self, Self::Err> {
124        match s {
125            "ConcurrentChunks" => Ok(Self::ConcurrentChunks),
126            "WholeSector" => Ok(Self::WholeSector),
127            s => Err(format!("Can't parse {s} as `ReadSectorRecordChunksMode`")),
128        }
129    }
130}
131
132// TODO: Workaround for https://github.com/rust-lang/rust/issues/144690 that gets triggered on
133//  `s_bucket_offsets` argument below
134const _: () = {
135    assert!(65536 == Record::NUM_S_BUCKETS);
136};
137/// Read sector record chunks, only plotted s-buckets are returned (in decoded form).
138///
139/// NOTE: This is an async function, but it also does CPU-intensive operation internally, while it
140/// is not very long, make sure it is okay to do so in your context.
141pub async fn read_sector_record_chunks<PosTable, S, A>(
142    piece_offset: PieceOffset,
143    pieces_in_sector: u16,
144    // TODO: Workaround for https://github.com/rust-lang/rust/issues/144690
145    // s_bucket_offsets: &[u32; Record::NUM_S_BUCKETS],
146    s_bucket_offsets: &[u32; 65536],
147    sector_contents_map: &SectorContentsMap,
148    pos_table: &PosTable,
149    sector: &ReadAt<S, A>,
150    mode: ReadSectorRecordChunksMode,
151) -> Result<Box<[Option<RecordChunk>; Record::NUM_S_BUCKETS]>, ReadingError>
152where
153    PosTable: Table,
154    S: ReadAtSync,
155    A: ReadAtAsync,
156{
157    let mut record_chunks = Box::<[Option<RecordChunk>; Record::NUM_S_BUCKETS]>::try_from(
158        vec![None::<RecordChunk>; Record::NUM_S_BUCKETS].into_boxed_slice(),
159    )
160    .expect("Correct size; qed");
161
162    let read_chunks_inputs = record_chunks
163        .par_iter_mut()
164        .zip(sector_contents_map.par_iter_record_chunk_to_plot(piece_offset))
165        .zip(
166            (u16::from(SBucket::ZERO)..=u16::from(SBucket::MAX))
167                .into_par_iter()
168                .map(SBucket::from)
169                .zip(s_bucket_offsets.par_iter()),
170        )
171        .map(
172            |((maybe_record_chunk, maybe_chunk_details), (s_bucket, &s_bucket_offset))| {
173                let (chunk_offset, encoded_chunk_used) = maybe_chunk_details?;
174
175                let chunk_location = chunk_offset as u64 + u64::from(s_bucket_offset);
176
177                Some((
178                    maybe_record_chunk,
179                    chunk_location,
180                    encoded_chunk_used,
181                    s_bucket,
182                ))
183            },
184        )
185        .collect::<Vec<_>>();
186
187    let sector_contents_map_size = SectorContentsMap::encoded_size(pieces_in_sector) as u64;
188    let sector_bytes = match mode {
189        ReadSectorRecordChunksMode::ConcurrentChunks => None,
190        ReadSectorRecordChunksMode::WholeSector => {
191            Some(vec![0u8; crate::sector::sector_size(pieces_in_sector)])
192        }
193    };
194    match sector {
195        ReadAt::Sync(sector) => {
196            let sector_bytes = {
197                if let Some(mut sector_bytes) = sector_bytes {
198                    sector.read_at(&mut sector_bytes, 0)?;
199                    Some(sector_bytes)
200                } else {
201                    None
202                }
203            };
204            read_chunks_inputs.into_par_iter().flatten().try_for_each(
205                |(maybe_record_chunk, chunk_location, encoded_chunk_used, s_bucket)| {
206                    let mut record_chunk = [0; RecordChunk::SIZE];
207                    if let Some(sector_bytes) = &sector_bytes {
208                        record_chunk.copy_from_slice(
209                            &sector_bytes[sector_contents_map_size as usize
210                                + chunk_location as usize * RecordChunk::SIZE..]
211                                [..RecordChunk::SIZE],
212                        );
213                    } else {
214                        sector
215                            .read_at(
216                                &mut record_chunk,
217                                sector_contents_map_size
218                                    + chunk_location * RecordChunk::SIZE as u64,
219                            )
220                            .map_err(|error| ReadingError::FailedToReadChunk {
221                                chunk_location,
222                                error,
223                            })?;
224                    }
225
226                    // Decode chunk if necessary
227                    if encoded_chunk_used {
228                        let proof = pos_table
229                            .find_proof(s_bucket.into())
230                            .ok_or(ReadingError::MissingPosProof { s_bucket })?;
231
232                        record_chunk =
233                            Simd::to_array(Simd::from(record_chunk) ^ Simd::from(*proof.hash()));
234                    }
235
236                    maybe_record_chunk.replace(RecordChunk::from(record_chunk));
237
238                    Ok::<_, ReadingError>(())
239                },
240            )?;
241        }
242        ReadAt::Async(sector) => {
243            let sector_bytes = &{
244                if let Some(sector_bytes) = sector_bytes {
245                    Some(sector.read_at(sector_bytes, 0).await?)
246                } else {
247                    None
248                }
249            };
250            let processing_chunks = read_chunks_inputs
251                .into_iter()
252                .flatten()
253                .map(
254                    |(maybe_record_chunk, chunk_location, encoded_chunk_used, s_bucket)| async move {
255                        let mut record_chunk = [0; RecordChunk::SIZE];
256                        if let Some(sector_bytes) = &sector_bytes {
257                            record_chunk.copy_from_slice(
258                                &sector_bytes[sector_contents_map_size as usize
259                                    + chunk_location as usize * RecordChunk::SIZE..]
260                                    [..RecordChunk::SIZE],
261                            );
262                        } else {
263                            record_chunk.copy_from_slice(
264                                &sector
265                                    .read_at(
266                                        vec![0; RecordChunk::SIZE],
267                                        sector_contents_map_size + chunk_location * RecordChunk::SIZE as u64,
268                                    )
269                                    .await
270                                    .map_err(|error| ReadingError::FailedToReadChunk {
271                                        chunk_location,
272                                        error,
273                                    })?
274                            );
275                        }
276
277
278                        // Decode chunk if necessary
279                        if encoded_chunk_used {
280                            let proof = pos_table.find_proof(s_bucket.into())
281                                .ok_or(ReadingError::MissingPosProof { s_bucket })?;
282
283                            record_chunk = Simd::to_array(
284                                Simd::from(record_chunk) ^ Simd::from(*proof.hash()),
285                            );
286                        }
287
288                        maybe_record_chunk.replace(RecordChunk::from(record_chunk));
289
290                        Ok::<_, ReadingError>(())
291                    },
292                )
293                .collect::<FuturesUnordered<_>>()
294                .filter_map(|result| async move {
295                    result.err()
296                });
297
298            std::pin::pin!(processing_chunks)
299                .next()
300                .await
301                .map_or(Ok(()), Err)?;
302        }
303    }
304
305    Ok(record_chunks)
306}
307
308/// Given sector record chunks recover extended record chunks (both source and parity)
309pub fn recover_extended_record_chunks(
310    sector_record_chunks: &[Option<RecordChunk>; Record::NUM_S_BUCKETS],
311    piece_offset: PieceOffset,
312    erasure_coding: &ErasureCoding,
313) -> Result<Box<[RecordChunk; Record::NUM_S_BUCKETS]>, ReadingError> {
314    // Restore source record scalars
315
316    let mut recovered_sector_record_chunks = vec![[0u8; RecordChunk::SIZE]; Record::NUM_S_BUCKETS];
317    {
318        let (source_sector_record_chunks, parity_sector_record_chunks) =
319            sector_record_chunks.split_at(Record::NUM_CHUNKS);
320        let (source_recovered_sector_record_chunks, parity_recovered_sector_record_chunks) =
321            recovered_sector_record_chunks.split_at_mut(Record::NUM_CHUNKS);
322
323        let source = source_sector_record_chunks
324            .iter()
325            .zip(source_recovered_sector_record_chunks.iter_mut())
326            .map(
327                |(maybe_input_chunk, output_chunk)| match maybe_input_chunk {
328                    Some(input_chunk) => {
329                        output_chunk.copy_from_slice(input_chunk.as_slice());
330                        RecoveryShardState::Present(input_chunk.as_slice())
331                    }
332                    None => RecoveryShardState::MissingRecover(output_chunk.as_mut_slice()),
333                },
334            );
335        let parity = parity_sector_record_chunks
336            .iter()
337            .zip(parity_recovered_sector_record_chunks.iter_mut())
338            .map(
339                |(maybe_input_chunk, output_chunk)| match maybe_input_chunk {
340                    Some(input_chunk) => {
341                        output_chunk.copy_from_slice(input_chunk.as_slice());
342                        RecoveryShardState::Present(input_chunk.as_slice())
343                    }
344                    None => RecoveryShardState::MissingRecover(output_chunk.as_mut_slice()),
345                },
346            );
347        erasure_coding.recover(source, parity).map_err(|error| {
348            ReadingError::FailedToErasureDecodeRecord {
349                piece_offset,
350                error,
351            }
352        })?;
353    }
354
355    // Allocation in vector can be larger than contents, we need to make sure allocation is the same
356    // as the contents, this should also contain fast path if allocation matches contents
357    let record_chunks = recovered_sector_record_chunks
358        .into_iter()
359        .map(RecordChunk::from)
360        .collect::<Box<_>>();
361    let mut record_chunks = ManuallyDrop::new(record_chunks);
362    // SAFETY: Original memory is not dropped, size of the data checked above
363    let record_chunks = unsafe { Box::from_raw(record_chunks.as_mut_ptr() as *mut _) };
364
365    Ok(record_chunks)
366}
367
368/// Given sector record chunks recover source record chunks in form of an iterator.
369pub fn recover_source_record(
370    sector_record_chunks: &[Option<RecordChunk>; Record::NUM_S_BUCKETS],
371    piece_offset: PieceOffset,
372    erasure_coding: &ErasureCoding,
373) -> Result<Box<Record>, ReadingError> {
374    // Restore source record scalars
375    let mut recovered_record = Record::new_boxed();
376
377    let (source_sector_record_chunks, parity_sector_record_chunks) =
378        sector_record_chunks.split_at(Record::NUM_CHUNKS);
379    let source = source_sector_record_chunks
380        .iter()
381        .zip(recovered_record.iter_mut())
382        .map(
383            |(maybe_input_chunk, output_chunk)| match maybe_input_chunk {
384                Some(input_chunk) => {
385                    output_chunk.copy_from_slice(input_chunk.as_slice());
386                    RecoveryShardState::Present(input_chunk.as_slice())
387                }
388                None => RecoveryShardState::MissingRecover(output_chunk.as_mut_slice()),
389            },
390        );
391    let parity =
392        parity_sector_record_chunks
393            .iter()
394            .map(|maybe_input_chunk| match maybe_input_chunk {
395                Some(input_chunk) => RecoveryShardState::Present(input_chunk.as_slice()),
396                None => RecoveryShardState::MissingIgnore,
397            });
398    erasure_coding.recover(source, parity).map_err(|error| {
399        ReadingError::FailedToErasureDecodeRecord {
400            piece_offset,
401            error,
402        }
403    })?;
404
405    Ok(recovered_record)
406}
407
408/// Read metadata (roots and proof) for record
409pub(crate) async fn read_record_metadata<S, A>(
410    piece_offset: PieceOffset,
411    pieces_in_sector: u16,
412    sector: &ReadAt<S, A>,
413) -> Result<RecordMetadata, ReadingError>
414where
415    S: ReadAtSync,
416    A: ReadAtAsync,
417{
418    let sector_metadata_start = SectorContentsMap::encoded_size(pieces_in_sector) as u64
419        + sector_record_chunks_size(pieces_in_sector) as u64;
420    // Move to the beginning of the root and proof we care about
421    let record_metadata_offset =
422        sector_metadata_start + RecordMetadata::encoded_size() as u64 * u64::from(piece_offset);
423
424    let mut record_metadata_bytes = vec![0; RecordMetadata::encoded_size()];
425    match sector {
426        ReadAt::Sync(sector) => {
427            sector.read_at(&mut record_metadata_bytes, record_metadata_offset)?;
428        }
429        ReadAt::Async(sector) => {
430            record_metadata_bytes = sector
431                .read_at(record_metadata_bytes, record_metadata_offset)
432                .await?;
433        }
434    }
435    let record_metadata = RecordMetadata::decode(&mut record_metadata_bytes.as_ref())
436        .expect("Length is correct, contents doesn't have specific structure to it; qed");
437
438    Ok(record_metadata)
439}
440
441/// Read piece from sector.
442///
443/// NOTE: Even though this function is async, proof of time table generation is expensive and should
444/// be done in a dedicated thread where blocking is allowed.
445pub async fn read_piece<PosTable, S, A>(
446    piece_offset: PieceOffset,
447    sector_id: &SectorId,
448    sector_metadata: &SectorMetadataChecksummed,
449    sector: &ReadAt<S, A>,
450    erasure_coding: &ErasureCoding,
451    mode: ReadSectorRecordChunksMode,
452    table_generator: &mut PosTable::Generator,
453) -> Result<Piece, ReadingError>
454where
455    PosTable: Table,
456    S: ReadAtSync,
457    A: ReadAtAsync,
458{
459    let pieces_in_sector = sector_metadata.pieces_in_sector;
460
461    let sector_contents_map = {
462        let mut sector_contents_map_bytes =
463            vec![0; SectorContentsMap::encoded_size(pieces_in_sector)];
464        match sector {
465            ReadAt::Sync(sector) => {
466                sector.read_at(&mut sector_contents_map_bytes, 0)?;
467            }
468            ReadAt::Async(sector) => {
469                sector_contents_map_bytes = sector.read_at(sector_contents_map_bytes, 0).await?;
470            }
471        }
472
473        SectorContentsMap::from_bytes(&sector_contents_map_bytes, pieces_in_sector)?
474    };
475
476    let sector_record_chunks = read_sector_record_chunks(
477        piece_offset,
478        pieces_in_sector,
479        &sector_metadata.s_bucket_offsets(),
480        &sector_contents_map,
481        &table_generator.generate(&sector_id.derive_evaluation_seed(piece_offset)),
482        sector,
483        mode,
484    )
485    .await?;
486    // Restore source record scalars
487    let record = recover_source_record(&sector_record_chunks, piece_offset, erasure_coding)?;
488
489    let record_metadata = read_record_metadata(piece_offset, pieces_in_sector, sector).await?;
490
491    let mut piece = Piece::default();
492
493    piece.record_mut().copy_from_slice(record.as_slice());
494
495    *piece.root_mut() = record_metadata.root;
496    *piece.parity_chunks_root_mut() = record_metadata.parity_chunks_root;
497    *piece.proof_mut() = record_metadata.proof;
498
499    // Verify checksum
500    let actual_checksum = Blake3Hash::from(blake3::hash(piece.as_ref()));
501    if actual_checksum != record_metadata.piece_checksum {
502        debug!(
503            ?sector_id,
504            %piece_offset,
505            %actual_checksum,
506            expected_checksum = %record_metadata.piece_checksum,
507            "Hash doesn't match, plotted piece is corrupted"
508        );
509
510        return Err(ReadingError::ChecksumMismatch);
511    }
512
513    Ok(piece.to_shared())
514}