ab_farmer_components/
reading.rs

1//! Reading utilities
2//!
3//! This module contains utilities for extracting data from plots/sectors created by functions in
4//! [`plotting`](crate::plotting) module earlier. This is a relatively expensive operation and is
5//! only used for cold storage purposes or when there is a need to prove a solution to consensus.
6
7use crate::sector::{
8    RecordMetadata, SectorContentsMap, SectorContentsMapFromBytesError, SectorMetadataChecksummed,
9    sector_record_chunks_size,
10};
11use crate::{ReadAt, ReadAtAsync, ReadAtSync};
12use ab_core_primitives::hashes::Blake3Hash;
13use ab_core_primitives::pieces::{Piece, PieceOffset, Record, RecordChunk};
14use ab_core_primitives::sectors::{SBucket, SectorId};
15use ab_erasure_coding::{ErasureCoding, ErasureCodingError, RecoveryShardState};
16use ab_proof_of_space::{PosProofs, Table, TableGenerator};
17use futures::StreamExt;
18use futures::stream::FuturesUnordered;
19use parity_scale_codec::Decode;
20use rayon::prelude::*;
21use std::io;
22use std::simd::Simd;
23use thiserror::Error;
24use tracing::debug;
25
26/// Errors that happen during reading
27#[derive(Debug, Error)]
28pub enum ReadingError {
29    /// Failed to read chunk.
30    ///
31    /// This is an implementation bug, most likely due to mismatch between sector contents map and
32    /// other farming parameters.
33    #[error("Failed to read chunk at location {chunk_location}: {error}")]
34    FailedToReadChunk {
35        /// Chunk location
36        chunk_location: u64,
37        /// Low-level error
38        error: io::Error,
39    },
40    /// Missing proof of space proof.
41    ///
42    /// This is either hardware issue or if happens for everyone all the time an implementation
43    /// bug.
44    #[error("Missing PoS proof for s-bucket {s_bucket}")]
45    MissingPosProof {
46        /// S-bucket
47        s_bucket: SBucket,
48    },
49    /// Failed to erasure-decode record
50    #[error("Failed to erasure-decode record at offset {piece_offset}: {error}")]
51    FailedToErasureDecodeRecord {
52        /// Piece offset
53        piece_offset: PieceOffset,
54        /// Lower-level error
55        error: ErasureCodingError,
56    },
57    /// Wrong record size after decoding
58    #[error("Wrong record size after decoding: expected {expected}, actual {actual}")]
59    WrongRecordSizeAfterDecoding {
60        /// Expected size in bytes
61        expected: usize,
62        /// Actual size in bytes
63        actual: usize,
64    },
65    /// Failed to decode sector contents map
66    #[error("Failed to decode sector contents map: {0}")]
67    FailedToDecodeSectorContentsMap(#[from] SectorContentsMapFromBytesError),
68    /// I/O error occurred
69    #[error("Reading I/O error: {0}")]
70    Io(#[from] io::Error),
71    /// Checksum mismatch
72    #[error("Checksum mismatch")]
73    ChecksumMismatch,
74}
75
76impl ReadingError {
77    /// Whether this error is fatal and renders farm unusable
78    pub fn is_fatal(&self) -> bool {
79        match self {
80            ReadingError::FailedToReadChunk { .. } => false,
81            ReadingError::MissingPosProof { .. } => false,
82            ReadingError::FailedToErasureDecodeRecord { .. } => false,
83            ReadingError::WrongRecordSizeAfterDecoding { .. } => false,
84            ReadingError::FailedToDecodeSectorContentsMap(_) => false,
85            ReadingError::Io(_) => true,
86            ReadingError::ChecksumMismatch => false,
87        }
88    }
89}
90
91// TODO: Workaround for https://github.com/rust-lang/rust/issues/144690 that gets triggered on
92//  `s_bucket_offsets` argument below
93const _: () = {
94    assert!(65536 == Record::NUM_S_BUCKETS);
95};
96/// Read sector record chunks, only plotted s-buckets are returned (in decoded form).
97///
98/// NOTE: This is an async function, but it also does CPU-intensive operation internally, while it
99/// is not very long, make sure it is okay to do so in your context.
100pub async fn read_sector_record_chunks<S, A>(
101    piece_offset: PieceOffset,
102    pieces_in_sector: u16,
103    // TODO: Workaround for https://github.com/rust-lang/rust/issues/144690
104    // s_bucket_offsets: &[u32; Record::NUM_S_BUCKETS],
105    s_bucket_offsets: &[u32; 65536],
106    sector_contents_map: &SectorContentsMap,
107    pos_proofs: &PosProofs,
108    sector: &ReadAt<S, A>,
109) -> Result<Box<[Option<RecordChunk>; Record::NUM_S_BUCKETS]>, ReadingError>
110where
111    S: ReadAtSync,
112    A: ReadAtAsync,
113{
114    let mut record_chunks = Box::<[Option<RecordChunk>; Record::NUM_S_BUCKETS]>::try_from(
115        vec![None::<RecordChunk>; Record::NUM_S_BUCKETS].into_boxed_slice(),
116    )
117    .expect("Correct size; qed");
118
119    let read_chunks_inputs = record_chunks
120        .par_iter_mut()
121        .zip(sector_contents_map.par_iter_record_chunk_to_plot(piece_offset))
122        .zip(s_bucket_offsets.par_iter())
123        .map(
124            |((maybe_record_chunk, maybe_chunk_offset), &s_bucket_offset)| {
125                let chunk_offset = maybe_chunk_offset?;
126
127                let chunk_location = chunk_offset as u64 + u64::from(s_bucket_offset);
128
129                Some((maybe_record_chunk, chunk_location))
130            },
131        )
132        .flatten()
133        .collect::<Vec<_>>();
134
135    let sector_contents_map_size = SectorContentsMap::encoded_size(pieces_in_sector) as u64;
136    match sector {
137        ReadAt::Sync(sector) => {
138            read_chunks_inputs
139                .into_par_iter()
140                .zip(&pos_proofs.proofs)
141                .try_for_each(|((maybe_record_chunk, chunk_location), pos_proof)| {
142                    let mut record_chunk = [0; RecordChunk::SIZE];
143                    sector
144                        .read_at(
145                            &mut record_chunk,
146                            sector_contents_map_size + chunk_location * RecordChunk::SIZE as u64,
147                        )
148                        .map_err(|error| ReadingError::FailedToReadChunk {
149                            chunk_location,
150                            error,
151                        })?;
152
153                    // TODO: Use SIMD for hashing
154                    record_chunk =
155                        Simd::to_array(Simd::from(record_chunk) ^ Simd::from(*pos_proof.hash()));
156
157                    maybe_record_chunk.replace(RecordChunk::from(record_chunk));
158
159                    Ok::<_, ReadingError>(())
160                })?;
161        }
162        ReadAt::Async(sector) => {
163            let processing_chunks = read_chunks_inputs
164                .into_iter()
165                .zip(&pos_proofs.proofs)
166                .map(
167                    |((maybe_record_chunk, chunk_location), pos_proof)| async move {
168                        let mut record_chunk = [0; RecordChunk::SIZE];
169                        record_chunk.copy_from_slice(
170                            &sector
171                                .read_at(
172                                    vec![0; RecordChunk::SIZE],
173                                    sector_contents_map_size
174                                        + chunk_location * RecordChunk::SIZE as u64,
175                                )
176                                .await
177                                .map_err(|error| ReadingError::FailedToReadChunk {
178                                    chunk_location,
179                                    error,
180                                })?,
181                        );
182
183                        // TODO: Use SIMD for hashing
184                        record_chunk = Simd::to_array(
185                            Simd::from(record_chunk) ^ Simd::from(*pos_proof.hash()),
186                        );
187
188                        maybe_record_chunk.replace(RecordChunk::from(record_chunk));
189
190                        Ok::<_, ReadingError>(())
191                    },
192                )
193                .collect::<FuturesUnordered<_>>()
194                .filter_map(|result| async move { result.err() });
195
196            std::pin::pin!(processing_chunks)
197                .next()
198                .await
199                .map_or(Ok(()), Err)?;
200        }
201    }
202
203    Ok(record_chunks)
204}
205
206/// Given sector record chunks recover extended record chunks (both source and parity)
207pub fn recover_extended_record_chunks(
208    sector_record_chunks: &[Option<RecordChunk>; Record::NUM_S_BUCKETS],
209    piece_offset: PieceOffset,
210    erasure_coding: &ErasureCoding,
211) -> Result<Box<[RecordChunk; Record::NUM_S_BUCKETS]>, ReadingError> {
212    // Restore source record scalars
213
214    let mut recovered_sector_record_chunks = vec![[0u8; RecordChunk::SIZE]; Record::NUM_S_BUCKETS];
215    {
216        let (source_sector_record_chunks, parity_sector_record_chunks) =
217            sector_record_chunks.split_at(Record::NUM_CHUNKS);
218        let (source_recovered_sector_record_chunks, parity_recovered_sector_record_chunks) =
219            recovered_sector_record_chunks.split_at_mut(Record::NUM_CHUNKS);
220
221        let source = source_sector_record_chunks
222            .iter()
223            .zip(source_recovered_sector_record_chunks.iter_mut())
224            .map(
225                |(maybe_input_chunk, output_chunk)| match maybe_input_chunk {
226                    Some(input_chunk) => {
227                        output_chunk.copy_from_slice(input_chunk.as_slice());
228                        RecoveryShardState::Present(input_chunk.as_slice())
229                    }
230                    None => RecoveryShardState::MissingRecover(output_chunk.as_mut_slice()),
231                },
232            );
233        let parity = parity_sector_record_chunks
234            .iter()
235            .zip(parity_recovered_sector_record_chunks.iter_mut())
236            .map(
237                |(maybe_input_chunk, output_chunk)| match maybe_input_chunk {
238                    Some(input_chunk) => {
239                        output_chunk.copy_from_slice(input_chunk.as_slice());
240                        RecoveryShardState::Present(input_chunk.as_slice())
241                    }
242                    None => RecoveryShardState::MissingRecover(output_chunk.as_mut_slice()),
243                },
244            );
245        erasure_coding.recover(source, parity).map_err(|error| {
246            ReadingError::FailedToErasureDecodeRecord {
247                piece_offset,
248                error,
249            }
250        })?;
251    }
252
253    // Allocation in vector can be larger than contents, we need to make sure allocation is the same
254    // as the contents, this should also contain fast path if allocation matches contents
255    let record_chunks = recovered_sector_record_chunks
256        .into_iter()
257        .map(RecordChunk::from)
258        .collect::<Box<_>>();
259    // SAFETY: Size of the data is guaranteed above
260    let record_chunks = unsafe {
261        Box::from_raw(Box::into_raw(record_chunks).cast::<[RecordChunk; Record::NUM_S_BUCKETS]>())
262    };
263
264    Ok(record_chunks)
265}
266
267/// Given sector record chunks recover source record chunks in form of an iterator.
268pub fn recover_source_record(
269    sector_record_chunks: &[Option<RecordChunk>; Record::NUM_S_BUCKETS],
270    piece_offset: PieceOffset,
271    erasure_coding: &ErasureCoding,
272) -> Result<Box<Record>, ReadingError> {
273    // Restore source record scalars
274    let mut recovered_record = Record::new_boxed();
275
276    let (source_sector_record_chunks, parity_sector_record_chunks) =
277        sector_record_chunks.split_at(Record::NUM_CHUNKS);
278    let source = source_sector_record_chunks
279        .iter()
280        .zip(recovered_record.iter_mut())
281        .map(
282            |(maybe_input_chunk, output_chunk)| match maybe_input_chunk {
283                Some(input_chunk) => {
284                    output_chunk.copy_from_slice(input_chunk.as_slice());
285                    RecoveryShardState::Present(input_chunk.as_slice())
286                }
287                None => RecoveryShardState::MissingRecover(output_chunk.as_mut_slice()),
288            },
289        );
290    let parity =
291        parity_sector_record_chunks
292            .iter()
293            .map(|maybe_input_chunk| match maybe_input_chunk {
294                Some(input_chunk) => RecoveryShardState::Present(input_chunk.as_slice()),
295                None => RecoveryShardState::MissingIgnore,
296            });
297    erasure_coding.recover(source, parity).map_err(|error| {
298        ReadingError::FailedToErasureDecodeRecord {
299            piece_offset,
300            error,
301        }
302    })?;
303
304    Ok(recovered_record)
305}
306
307/// Read metadata (roots and proof) for record
308pub(crate) async fn read_record_metadata<S, A>(
309    piece_offset: PieceOffset,
310    pieces_in_sector: u16,
311    sector: &ReadAt<S, A>,
312) -> Result<RecordMetadata, ReadingError>
313where
314    S: ReadAtSync,
315    A: ReadAtAsync,
316{
317    let sector_metadata_start = SectorContentsMap::encoded_size(pieces_in_sector) as u64
318        + sector_record_chunks_size(pieces_in_sector) as u64;
319    // Move to the beginning of the root and proof we care about
320    let record_metadata_offset =
321        sector_metadata_start + RecordMetadata::encoded_size() as u64 * u64::from(piece_offset);
322
323    let mut record_metadata_bytes = vec![0; RecordMetadata::encoded_size()];
324    match sector {
325        ReadAt::Sync(sector) => {
326            sector.read_at(&mut record_metadata_bytes, record_metadata_offset)?;
327        }
328        ReadAt::Async(sector) => {
329            record_metadata_bytes = sector
330                .read_at(record_metadata_bytes, record_metadata_offset)
331                .await?;
332        }
333    }
334    let record_metadata = RecordMetadata::decode(&mut record_metadata_bytes.as_ref())
335        .expect("Length is correct, contents doesn't have specific structure to it; qed");
336
337    Ok(record_metadata)
338}
339
340/// Read piece from sector.
341///
342/// NOTE: Even though this function is async, proof of time table generation is expensive and should
343/// be done in a dedicated thread where blocking is allowed.
344pub async fn read_piece<PosTable, S, A>(
345    piece_offset: PieceOffset,
346    sector_id: &SectorId,
347    sector_metadata: &SectorMetadataChecksummed,
348    sector: &ReadAt<S, A>,
349    erasure_coding: &ErasureCoding,
350    table_generator: &PosTable::Generator,
351) -> Result<Piece, ReadingError>
352where
353    PosTable: Table,
354    S: ReadAtSync,
355    A: ReadAtAsync,
356{
357    let pieces_in_sector = sector_metadata.pieces_in_sector;
358
359    let sector_contents_map = {
360        let mut sector_contents_map_bytes =
361            vec![0; SectorContentsMap::encoded_size(pieces_in_sector)];
362        match sector {
363            ReadAt::Sync(sector) => {
364                sector.read_at(&mut sector_contents_map_bytes, 0)?;
365            }
366            ReadAt::Async(sector) => {
367                sector_contents_map_bytes = sector.read_at(sector_contents_map_bytes, 0).await?;
368            }
369        }
370
371        SectorContentsMap::from_bytes(&sector_contents_map_bytes, pieces_in_sector)?
372    };
373
374    let sector_record_chunks = read_sector_record_chunks(
375        piece_offset,
376        pieces_in_sector,
377        &sector_metadata.s_bucket_offsets(),
378        &sector_contents_map,
379        &table_generator.create_proofs(&sector_id.derive_evaluation_seed(piece_offset)),
380        sector,
381    )
382    .await?;
383    // Restore source record scalars
384    let record = recover_source_record(&sector_record_chunks, piece_offset, erasure_coding)?;
385
386    let record_metadata = read_record_metadata(piece_offset, pieces_in_sector, sector).await?;
387
388    let mut piece = Piece::default();
389
390    piece.record_mut().copy_from_slice(record.as_slice());
391
392    *piece.root_mut() = record_metadata.root;
393    *piece.parity_chunks_root_mut() = record_metadata.parity_chunks_root;
394    *piece.proof_mut() = record_metadata.proof;
395
396    // Verify checksum
397    let actual_checksum = Blake3Hash::from(blake3::hash(piece.as_ref()));
398    if actual_checksum != record_metadata.piece_checksum {
399        debug!(
400            ?sector_id,
401            %piece_offset,
402            %actual_checksum,
403            expected_checksum = %record_metadata.piece_checksum,
404            "Hash doesn't match, plotted piece is corrupted"
405        );
406
407        return Err(ReadingError::ChecksumMismatch);
408    }
409
410    Ok(piece.to_shared())
411}