ab_farmer_components/
reading.rs

1//! Reading utilities
2//!
3//! This module contains utilities for extracting data from plots/sectors created by functions in
4//! [`plotting`](crate::plotting) module earlier. This is a relatively expensive operation and is
5//! only used for cold storage purposes or when there is a need to prove a solution to consensus.
6
7use crate::sector::{
8    RecordMetadata, SectorContentsMap, SectorContentsMapFromBytesError, SectorMetadataChecksummed,
9    sector_record_chunks_size,
10};
11use crate::{ReadAt, ReadAtAsync, ReadAtSync};
12use ab_core_primitives::hashes::Blake3Hash;
13use ab_core_primitives::pieces::{Piece, PieceOffset, Record, RecordChunk};
14use ab_core_primitives::sectors::{SBucket, SectorId};
15use ab_erasure_coding::{ErasureCoding, ErasureCodingError, RecoveryShardState};
16use ab_proof_of_space::{Table, TableGenerator};
17use futures::StreamExt;
18use futures::stream::FuturesUnordered;
19use parity_scale_codec::Decode;
20use rayon::prelude::*;
21use std::mem::ManuallyDrop;
22use std::simd::Simd;
23use std::str::FromStr;
24use std::{fmt, io};
25use thiserror::Error;
26use tracing::debug;
27
28/// Errors that happen during reading
29#[derive(Debug, Error)]
30pub enum ReadingError {
31    /// Failed to read chunk.
32    ///
33    /// This is an implementation bug, most likely due to mismatch between sector contents map and
34    /// other farming parameters.
35    #[error("Failed to read chunk at location {chunk_location}: {error}")]
36    FailedToReadChunk {
37        /// Chunk location
38        chunk_location: u64,
39        /// Low-level error
40        error: io::Error,
41    },
42    /// Missing proof of space proof.
43    ///
44    /// This is either hardware issue or if happens for everyone all the time an implementation
45    /// bug.
46    #[error("Missing PoS proof for s-bucket {s_bucket}")]
47    MissingPosProof {
48        /// S-bucket
49        s_bucket: SBucket,
50    },
51    /// Failed to erasure-decode record
52    #[error("Failed to erasure-decode record at offset {piece_offset}: {error}")]
53    FailedToErasureDecodeRecord {
54        /// Piece offset
55        piece_offset: PieceOffset,
56        /// Lower-level error
57        error: ErasureCodingError,
58    },
59    /// Wrong record size after decoding
60    #[error("Wrong record size after decoding: expected {expected}, actual {actual}")]
61    WrongRecordSizeAfterDecoding {
62        /// Expected size in bytes
63        expected: usize,
64        /// Actual size in bytes
65        actual: usize,
66    },
67    /// Failed to decode sector contents map
68    #[error("Failed to decode sector contents map: {0}")]
69    FailedToDecodeSectorContentsMap(#[from] SectorContentsMapFromBytesError),
70    /// I/O error occurred
71    #[error("Reading I/O error: {0}")]
72    Io(#[from] io::Error),
73    /// Checksum mismatch
74    #[error("Checksum mismatch")]
75    ChecksumMismatch,
76}
77
78impl ReadingError {
79    /// Whether this error is fatal and renders farm unusable
80    pub fn is_fatal(&self) -> bool {
81        match self {
82            ReadingError::FailedToReadChunk { .. } => false,
83            ReadingError::MissingPosProof { .. } => false,
84            ReadingError::FailedToErasureDecodeRecord { .. } => false,
85            ReadingError::WrongRecordSizeAfterDecoding { .. } => false,
86            ReadingError::FailedToDecodeSectorContentsMap(_) => false,
87            ReadingError::Io(_) => true,
88            ReadingError::ChecksumMismatch => false,
89        }
90    }
91}
92
93/// Defines a mode of reading chunks in [`read_sector_record_chunks`].
94///
95/// Which option that is slower or faster depends on disk used, there is no one-size-fits-all here,
96/// unfortunately.
97#[derive(Debug, Copy, Clone)]
98pub enum ReadSectorRecordChunksMode {
99    /// Read individual chunks ([`RecordChunk::SIZE`] in size) concurrently, which results in lower
100    /// total data transfer, but requires for SSD to support high concurrency and low latency
101    ConcurrentChunks,
102    /// Read the whole sector at once and extract chunks from in-memory buffer, which uses more
103    /// memory, but only requires linear read speed from the disk to be decent
104    WholeSector,
105}
106
107impl fmt::Display for ReadSectorRecordChunksMode {
108    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109        match self {
110            Self::ConcurrentChunks => {
111                write!(f, "ConcurrentChunks")
112            }
113            Self::WholeSector => {
114                write!(f, "WholeSector")
115            }
116        }
117    }
118}
119
120impl FromStr for ReadSectorRecordChunksMode {
121    type Err = String;
122
123    fn from_str(s: &str) -> Result<Self, Self::Err> {
124        match s {
125            "ConcurrentChunks" => Ok(Self::ConcurrentChunks),
126            "WholeSector" => Ok(Self::WholeSector),
127            s => Err(format!("Can't parse {s} as `ReadSectorRecordChunksMode`")),
128        }
129    }
130}
131
132/// Read sector record chunks, only plotted s-buckets are returned (in decoded form).
133///
134/// NOTE: This is an async function, but it also does CPU-intensive operation internally, while it
135/// is not very long, make sure it is okay to do so in your context.
136pub async fn read_sector_record_chunks<PosTable, S, A>(
137    piece_offset: PieceOffset,
138    pieces_in_sector: u16,
139    s_bucket_offsets: &[u32; Record::NUM_S_BUCKETS],
140    sector_contents_map: &SectorContentsMap,
141    pos_table: &PosTable,
142    sector: &ReadAt<S, A>,
143    mode: ReadSectorRecordChunksMode,
144) -> Result<Box<[Option<RecordChunk>; Record::NUM_S_BUCKETS]>, ReadingError>
145where
146    PosTable: Table,
147    S: ReadAtSync,
148    A: ReadAtAsync,
149{
150    let mut record_chunks = Box::<[Option<RecordChunk>; Record::NUM_S_BUCKETS]>::try_from(
151        vec![None::<RecordChunk>; Record::NUM_S_BUCKETS].into_boxed_slice(),
152    )
153    .expect("Correct size; qed");
154
155    let read_chunks_inputs = record_chunks
156        .par_iter_mut()
157        .zip(sector_contents_map.par_iter_record_chunk_to_plot(piece_offset))
158        .zip(
159            (u16::from(SBucket::ZERO)..=u16::from(SBucket::MAX))
160                .into_par_iter()
161                .map(SBucket::from)
162                .zip(s_bucket_offsets.par_iter()),
163        )
164        .map(
165            |((maybe_record_chunk, maybe_chunk_details), (s_bucket, &s_bucket_offset))| {
166                let (chunk_offset, encoded_chunk_used) = maybe_chunk_details?;
167
168                let chunk_location = chunk_offset as u64 + u64::from(s_bucket_offset);
169
170                Some((
171                    maybe_record_chunk,
172                    chunk_location,
173                    encoded_chunk_used,
174                    s_bucket,
175                ))
176            },
177        )
178        .collect::<Vec<_>>();
179
180    let sector_contents_map_size = SectorContentsMap::encoded_size(pieces_in_sector) as u64;
181    let sector_bytes = match mode {
182        ReadSectorRecordChunksMode::ConcurrentChunks => None,
183        ReadSectorRecordChunksMode::WholeSector => {
184            Some(vec![0u8; crate::sector::sector_size(pieces_in_sector)])
185        }
186    };
187    match sector {
188        ReadAt::Sync(sector) => {
189            let sector_bytes = {
190                if let Some(mut sector_bytes) = sector_bytes {
191                    sector.read_at(&mut sector_bytes, 0)?;
192                    Some(sector_bytes)
193                } else {
194                    None
195                }
196            };
197            read_chunks_inputs.into_par_iter().flatten().try_for_each(
198                |(maybe_record_chunk, chunk_location, encoded_chunk_used, s_bucket)| {
199                    let mut record_chunk = [0; RecordChunk::SIZE];
200                    if let Some(sector_bytes) = &sector_bytes {
201                        record_chunk.copy_from_slice(
202                            &sector_bytes[sector_contents_map_size as usize
203                                + chunk_location as usize * RecordChunk::SIZE..]
204                                [..RecordChunk::SIZE],
205                        );
206                    } else {
207                        sector
208                            .read_at(
209                                &mut record_chunk,
210                                sector_contents_map_size
211                                    + chunk_location * RecordChunk::SIZE as u64,
212                            )
213                            .map_err(|error| ReadingError::FailedToReadChunk {
214                                chunk_location,
215                                error,
216                            })?;
217                    }
218
219                    // Decode chunk if necessary
220                    if encoded_chunk_used {
221                        let proof = pos_table
222                            .find_proof(s_bucket.into())
223                            .ok_or(ReadingError::MissingPosProof { s_bucket })?;
224
225                        record_chunk =
226                            Simd::to_array(Simd::from(record_chunk) ^ Simd::from(*proof.hash()));
227                    }
228
229                    maybe_record_chunk.replace(RecordChunk::from(record_chunk));
230
231                    Ok::<_, ReadingError>(())
232                },
233            )?;
234        }
235        ReadAt::Async(sector) => {
236            let sector_bytes = &{
237                if let Some(sector_bytes) = sector_bytes {
238                    Some(sector.read_at(sector_bytes, 0).await?)
239                } else {
240                    None
241                }
242            };
243            let processing_chunks = read_chunks_inputs
244                .into_iter()
245                .flatten()
246                .map(
247                    |(maybe_record_chunk, chunk_location, encoded_chunk_used, s_bucket)| async move {
248                        let mut record_chunk = [0; RecordChunk::SIZE];
249                        if let Some(sector_bytes) = &sector_bytes {
250                            record_chunk.copy_from_slice(
251                                &sector_bytes[sector_contents_map_size as usize
252                                    + chunk_location as usize * RecordChunk::SIZE..]
253                                    [..RecordChunk::SIZE],
254                            );
255                        } else {
256                            record_chunk.copy_from_slice(
257                                &sector
258                                    .read_at(
259                                        vec![0; RecordChunk::SIZE],
260                                        sector_contents_map_size + chunk_location * RecordChunk::SIZE as u64,
261                                    )
262                                    .await
263                                    .map_err(|error| ReadingError::FailedToReadChunk {
264                                        chunk_location,
265                                        error,
266                                    })?
267                            );
268                        }
269
270
271                        // Decode chunk if necessary
272                        if encoded_chunk_used {
273                            let proof = pos_table.find_proof(s_bucket.into())
274                                .ok_or(ReadingError::MissingPosProof { s_bucket })?;
275
276                            record_chunk = Simd::to_array(
277                                Simd::from(record_chunk) ^ Simd::from(*proof.hash()),
278                            );
279                        }
280
281                        maybe_record_chunk.replace(RecordChunk::from(record_chunk));
282
283                        Ok::<_, ReadingError>(())
284                    },
285                )
286                .collect::<FuturesUnordered<_>>()
287                .filter_map(|result| async move {
288                    result.err()
289                });
290
291            std::pin::pin!(processing_chunks)
292                .next()
293                .await
294                .map_or(Ok(()), Err)?;
295        }
296    }
297
298    Ok(record_chunks)
299}
300
301/// Given sector record chunks recover extended record chunks (both source and parity)
302pub fn recover_extended_record_chunks(
303    sector_record_chunks: &[Option<RecordChunk>; Record::NUM_S_BUCKETS],
304    piece_offset: PieceOffset,
305    erasure_coding: &ErasureCoding,
306) -> Result<Box<[RecordChunk; Record::NUM_S_BUCKETS]>, ReadingError> {
307    // Restore source record scalars
308
309    let mut recovered_sector_record_chunks = vec![[0u8; RecordChunk::SIZE]; Record::NUM_S_BUCKETS];
310    {
311        let (source_sector_record_chunks, parity_sector_record_chunks) =
312            sector_record_chunks.split_at(Record::NUM_CHUNKS);
313        let (source_recovered_sector_record_chunks, parity_recovered_sector_record_chunks) =
314            recovered_sector_record_chunks.split_at_mut(Record::NUM_CHUNKS);
315
316        let source = source_sector_record_chunks
317            .iter()
318            .zip(source_recovered_sector_record_chunks.iter_mut())
319            .map(
320                |(maybe_input_chunk, output_chunk)| match maybe_input_chunk {
321                    Some(input_chunk) => {
322                        output_chunk.copy_from_slice(input_chunk.as_slice());
323                        RecoveryShardState::Present(input_chunk.as_slice())
324                    }
325                    None => RecoveryShardState::MissingRecover(output_chunk.as_mut_slice()),
326                },
327            );
328        let parity = parity_sector_record_chunks
329            .iter()
330            .zip(parity_recovered_sector_record_chunks.iter_mut())
331            .map(
332                |(maybe_input_chunk, output_chunk)| match maybe_input_chunk {
333                    Some(input_chunk) => {
334                        output_chunk.copy_from_slice(input_chunk.as_slice());
335                        RecoveryShardState::Present(input_chunk.as_slice())
336                    }
337                    None => RecoveryShardState::MissingRecover(output_chunk.as_mut_slice()),
338                },
339            );
340        erasure_coding.recover(source, parity).map_err(|error| {
341            ReadingError::FailedToErasureDecodeRecord {
342                piece_offset,
343                error,
344            }
345        })?;
346    }
347
348    // Allocation in vector can be larger than contents, we need to make sure allocation is the same
349    // as the contents, this should also contain fast path if allocation matches contents
350    let record_chunks = recovered_sector_record_chunks
351        .into_iter()
352        .map(RecordChunk::from)
353        .collect::<Box<_>>();
354    let mut record_chunks = ManuallyDrop::new(record_chunks);
355    // SAFETY: Original memory is not dropped, size of the data checked above
356    let record_chunks = unsafe { Box::from_raw(record_chunks.as_mut_ptr() as *mut _) };
357
358    Ok(record_chunks)
359}
360
361/// Given sector record chunks recover source record chunks in form of an iterator.
362pub fn recover_source_record(
363    sector_record_chunks: &[Option<RecordChunk>; Record::NUM_S_BUCKETS],
364    piece_offset: PieceOffset,
365    erasure_coding: &ErasureCoding,
366) -> Result<Box<Record>, ReadingError> {
367    // Restore source record scalars
368    let mut recovered_record = Record::new_boxed();
369
370    let (source_sector_record_chunks, parity_sector_record_chunks) =
371        sector_record_chunks.split_at(Record::NUM_CHUNKS);
372    let source = source_sector_record_chunks
373        .iter()
374        .zip(recovered_record.iter_mut())
375        .map(
376            |(maybe_input_chunk, output_chunk)| match maybe_input_chunk {
377                Some(input_chunk) => {
378                    output_chunk.copy_from_slice(input_chunk.as_slice());
379                    RecoveryShardState::Present(input_chunk.as_slice())
380                }
381                None => RecoveryShardState::MissingRecover(output_chunk.as_mut_slice()),
382            },
383        );
384    let parity =
385        parity_sector_record_chunks
386            .iter()
387            .map(|maybe_input_chunk| match maybe_input_chunk {
388                Some(input_chunk) => RecoveryShardState::Present(input_chunk.as_slice()),
389                None => RecoveryShardState::MissingIgnore,
390            });
391    erasure_coding.recover(source, parity).map_err(|error| {
392        ReadingError::FailedToErasureDecodeRecord {
393            piece_offset,
394            error,
395        }
396    })?;
397
398    Ok(recovered_record)
399}
400
401/// Read metadata (roots and proof) for record
402pub(crate) async fn read_record_metadata<S, A>(
403    piece_offset: PieceOffset,
404    pieces_in_sector: u16,
405    sector: &ReadAt<S, A>,
406) -> Result<RecordMetadata, ReadingError>
407where
408    S: ReadAtSync,
409    A: ReadAtAsync,
410{
411    let sector_metadata_start = SectorContentsMap::encoded_size(pieces_in_sector) as u64
412        + sector_record_chunks_size(pieces_in_sector) as u64;
413    // Move to the beginning of the root and proof we care about
414    let record_metadata_offset =
415        sector_metadata_start + RecordMetadata::encoded_size() as u64 * u64::from(piece_offset);
416
417    let mut record_metadata_bytes = vec![0; RecordMetadata::encoded_size()];
418    match sector {
419        ReadAt::Sync(sector) => {
420            sector.read_at(&mut record_metadata_bytes, record_metadata_offset)?;
421        }
422        ReadAt::Async(sector) => {
423            record_metadata_bytes = sector
424                .read_at(record_metadata_bytes, record_metadata_offset)
425                .await?;
426        }
427    }
428    let record_metadata = RecordMetadata::decode(&mut record_metadata_bytes.as_ref())
429        .expect("Length is correct, contents doesn't have specific structure to it; qed");
430
431    Ok(record_metadata)
432}
433
434/// Read piece from sector.
435///
436/// NOTE: Even though this function is async, proof of time table generation is expensive and should
437/// be done in a dedicated thread where blocking is allowed.
438pub async fn read_piece<PosTable, S, A>(
439    piece_offset: PieceOffset,
440    sector_id: &SectorId,
441    sector_metadata: &SectorMetadataChecksummed,
442    sector: &ReadAt<S, A>,
443    erasure_coding: &ErasureCoding,
444    mode: ReadSectorRecordChunksMode,
445    table_generator: &mut PosTable::Generator,
446) -> Result<Piece, ReadingError>
447where
448    PosTable: Table,
449    S: ReadAtSync,
450    A: ReadAtAsync,
451{
452    let pieces_in_sector = sector_metadata.pieces_in_sector;
453
454    let sector_contents_map = {
455        let mut sector_contents_map_bytes =
456            vec![0; SectorContentsMap::encoded_size(pieces_in_sector)];
457        match sector {
458            ReadAt::Sync(sector) => {
459                sector.read_at(&mut sector_contents_map_bytes, 0)?;
460            }
461            ReadAt::Async(sector) => {
462                sector_contents_map_bytes = sector.read_at(sector_contents_map_bytes, 0).await?;
463            }
464        }
465
466        SectorContentsMap::from_bytes(&sector_contents_map_bytes, pieces_in_sector)?
467    };
468
469    let sector_record_chunks = read_sector_record_chunks(
470        piece_offset,
471        pieces_in_sector,
472        &sector_metadata.s_bucket_offsets(),
473        &sector_contents_map,
474        &table_generator.generate(&sector_id.derive_evaluation_seed(piece_offset)),
475        sector,
476        mode,
477    )
478    .await?;
479    // Restore source record scalars
480    let record = recover_source_record(&sector_record_chunks, piece_offset, erasure_coding)?;
481
482    let record_metadata = read_record_metadata(piece_offset, pieces_in_sector, sector).await?;
483
484    let mut piece = Piece::default();
485
486    piece.record_mut().copy_from_slice(record.as_slice());
487
488    *piece.root_mut() = record_metadata.root;
489    *piece.parity_chunks_root_mut() = record_metadata.parity_chunks_root;
490    *piece.proof_mut() = record_metadata.proof;
491
492    // Verify checksum
493    let actual_checksum = Blake3Hash::from(blake3::hash(piece.as_ref()));
494    if actual_checksum != record_metadata.piece_checksum {
495        debug!(
496            ?sector_id,
497            %piece_offset,
498            %actual_checksum,
499            expected_checksum = %record_metadata.piece_checksum,
500            "Hash doesn't match, plotted piece is corrupted"
501        );
502
503        return Err(ReadingError::ChecksumMismatch);
504    }
505
506    Ok(piece.to_shared())
507}