1use crate::sector::{
8 RecordMetadata, SectorContentsMap, SectorContentsMapFromBytesError, SectorMetadataChecksummed,
9 sector_record_chunks_size,
10};
11use crate::{ReadAt, ReadAtAsync, ReadAtSync};
12use ab_core_primitives::hashes::Blake3Hash;
13use ab_core_primitives::pieces::{Piece, PieceOffset, Record, RecordChunk};
14use ab_core_primitives::sectors::{SBucket, SectorId};
15use ab_erasure_coding::{ErasureCoding, ErasureCodingError, RecoveryShardState};
16use ab_proof_of_space::{PosProofs, Table, TableGenerator};
17use futures::StreamExt;
18use futures::stream::FuturesUnordered;
19use parity_scale_codec::Decode;
20use rayon::prelude::*;
21use std::io;
22use std::simd::Simd;
23use thiserror::Error;
24use tracing::debug;
25
26#[derive(Debug, Error)]
28pub enum ReadingError {
29 #[error("Failed to read chunk at location {chunk_location}: {error}")]
34 FailedToReadChunk {
35 chunk_location: u64,
37 error: io::Error,
39 },
40 #[error("Missing PoS proof for s-bucket {s_bucket}")]
45 MissingPosProof {
46 s_bucket: SBucket,
48 },
49 #[error("Failed to erasure-decode record at offset {piece_offset}: {error}")]
51 FailedToErasureDecodeRecord {
52 piece_offset: PieceOffset,
54 error: ErasureCodingError,
56 },
57 #[error("Wrong record size after decoding: expected {expected}, actual {actual}")]
59 WrongRecordSizeAfterDecoding {
60 expected: usize,
62 actual: usize,
64 },
65 #[error("Failed to decode sector contents map: {0}")]
67 FailedToDecodeSectorContentsMap(#[from] SectorContentsMapFromBytesError),
68 #[error("Reading I/O error: {0}")]
70 Io(#[from] io::Error),
71 #[error("Checksum mismatch")]
73 ChecksumMismatch,
74}
75
76impl ReadingError {
77 pub fn is_fatal(&self) -> bool {
79 match self {
80 ReadingError::FailedToReadChunk { .. } => false,
81 ReadingError::MissingPosProof { .. } => false,
82 ReadingError::FailedToErasureDecodeRecord { .. } => false,
83 ReadingError::WrongRecordSizeAfterDecoding { .. } => false,
84 ReadingError::FailedToDecodeSectorContentsMap(_) => false,
85 ReadingError::Io(_) => true,
86 ReadingError::ChecksumMismatch => false,
87 }
88 }
89}
90
91const _: () = {
94 assert!(65536 == Record::NUM_S_BUCKETS);
95};
96pub async fn read_sector_record_chunks<S, A>(
101 piece_offset: PieceOffset,
102 pieces_in_sector: u16,
103 s_bucket_offsets: &[u32; 65536],
106 sector_contents_map: &SectorContentsMap,
107 pos_proofs: &PosProofs,
108 sector: &ReadAt<S, A>,
109) -> Result<Box<[Option<RecordChunk>; Record::NUM_S_BUCKETS]>, ReadingError>
110where
111 S: ReadAtSync,
112 A: ReadAtAsync,
113{
114 let mut record_chunks = Box::<[Option<RecordChunk>; Record::NUM_S_BUCKETS]>::try_from(
115 vec![None::<RecordChunk>; Record::NUM_S_BUCKETS].into_boxed_slice(),
116 )
117 .expect("Correct size; qed");
118
119 let read_chunks_inputs = record_chunks
120 .par_iter_mut()
121 .zip(sector_contents_map.par_iter_record_chunk_to_plot(piece_offset))
122 .zip(s_bucket_offsets.par_iter())
123 .map(
124 |((maybe_record_chunk, maybe_chunk_offset), &s_bucket_offset)| {
125 let chunk_offset = maybe_chunk_offset?;
126
127 let chunk_location = chunk_offset as u64 + u64::from(s_bucket_offset);
128
129 Some((maybe_record_chunk, chunk_location))
130 },
131 )
132 .flatten()
133 .collect::<Vec<_>>();
134
135 let sector_contents_map_size = SectorContentsMap::encoded_size(pieces_in_sector) as u64;
136 match sector {
137 ReadAt::Sync(sector) => {
138 read_chunks_inputs
139 .into_par_iter()
140 .zip(&pos_proofs.proofs)
141 .try_for_each(|((maybe_record_chunk, chunk_location), pos_proof)| {
142 let mut record_chunk = [0; RecordChunk::SIZE];
143 sector
144 .read_at(
145 &mut record_chunk,
146 sector_contents_map_size + chunk_location * RecordChunk::SIZE as u64,
147 )
148 .map_err(|error| ReadingError::FailedToReadChunk {
149 chunk_location,
150 error,
151 })?;
152
153 record_chunk =
155 Simd::to_array(Simd::from(record_chunk) ^ Simd::from(*pos_proof.hash()));
156
157 maybe_record_chunk.replace(RecordChunk::from(record_chunk));
158
159 Ok::<_, ReadingError>(())
160 })?;
161 }
162 ReadAt::Async(sector) => {
163 let processing_chunks = read_chunks_inputs
164 .into_iter()
165 .zip(&pos_proofs.proofs)
166 .map(
167 |((maybe_record_chunk, chunk_location), pos_proof)| async move {
168 let mut record_chunk = [0; RecordChunk::SIZE];
169 record_chunk.copy_from_slice(
170 §or
171 .read_at(
172 vec![0; RecordChunk::SIZE],
173 sector_contents_map_size
174 + chunk_location * RecordChunk::SIZE as u64,
175 )
176 .await
177 .map_err(|error| ReadingError::FailedToReadChunk {
178 chunk_location,
179 error,
180 })?,
181 );
182
183 record_chunk = Simd::to_array(
185 Simd::from(record_chunk) ^ Simd::from(*pos_proof.hash()),
186 );
187
188 maybe_record_chunk.replace(RecordChunk::from(record_chunk));
189
190 Ok::<_, ReadingError>(())
191 },
192 )
193 .collect::<FuturesUnordered<_>>()
194 .filter_map(|result| async move { result.err() });
195
196 std::pin::pin!(processing_chunks)
197 .next()
198 .await
199 .map_or(Ok(()), Err)?;
200 }
201 }
202
203 Ok(record_chunks)
204}
205
206pub fn recover_extended_record_chunks(
208 sector_record_chunks: &[Option<RecordChunk>; Record::NUM_S_BUCKETS],
209 piece_offset: PieceOffset,
210 erasure_coding: &ErasureCoding,
211) -> Result<Box<[RecordChunk; Record::NUM_S_BUCKETS]>, ReadingError> {
212 let mut recovered_sector_record_chunks = vec![[0u8; RecordChunk::SIZE]; Record::NUM_S_BUCKETS];
215 {
216 let (source_sector_record_chunks, parity_sector_record_chunks) =
217 sector_record_chunks.split_at(Record::NUM_CHUNKS);
218 let (source_recovered_sector_record_chunks, parity_recovered_sector_record_chunks) =
219 recovered_sector_record_chunks.split_at_mut(Record::NUM_CHUNKS);
220
221 let source = source_sector_record_chunks
222 .iter()
223 .zip(source_recovered_sector_record_chunks.iter_mut())
224 .map(
225 |(maybe_input_chunk, output_chunk)| match maybe_input_chunk {
226 Some(input_chunk) => {
227 output_chunk.copy_from_slice(input_chunk.as_slice());
228 RecoveryShardState::Present(input_chunk.as_slice())
229 }
230 None => RecoveryShardState::MissingRecover(output_chunk.as_mut_slice()),
231 },
232 );
233 let parity = parity_sector_record_chunks
234 .iter()
235 .zip(parity_recovered_sector_record_chunks.iter_mut())
236 .map(
237 |(maybe_input_chunk, output_chunk)| match maybe_input_chunk {
238 Some(input_chunk) => {
239 output_chunk.copy_from_slice(input_chunk.as_slice());
240 RecoveryShardState::Present(input_chunk.as_slice())
241 }
242 None => RecoveryShardState::MissingRecover(output_chunk.as_mut_slice()),
243 },
244 );
245 erasure_coding.recover(source, parity).map_err(|error| {
246 ReadingError::FailedToErasureDecodeRecord {
247 piece_offset,
248 error,
249 }
250 })?;
251 }
252
253 let record_chunks = recovered_sector_record_chunks
256 .into_iter()
257 .map(RecordChunk::from)
258 .collect::<Box<_>>();
259 let record_chunks = unsafe {
261 Box::from_raw(Box::into_raw(record_chunks).cast::<[RecordChunk; Record::NUM_S_BUCKETS]>())
262 };
263
264 Ok(record_chunks)
265}
266
267pub fn recover_source_record(
269 sector_record_chunks: &[Option<RecordChunk>; Record::NUM_S_BUCKETS],
270 piece_offset: PieceOffset,
271 erasure_coding: &ErasureCoding,
272) -> Result<Box<Record>, ReadingError> {
273 let mut recovered_record = Record::new_boxed();
275
276 let (source_sector_record_chunks, parity_sector_record_chunks) =
277 sector_record_chunks.split_at(Record::NUM_CHUNKS);
278 let source = source_sector_record_chunks
279 .iter()
280 .zip(recovered_record.iter_mut())
281 .map(
282 |(maybe_input_chunk, output_chunk)| match maybe_input_chunk {
283 Some(input_chunk) => {
284 output_chunk.copy_from_slice(input_chunk.as_slice());
285 RecoveryShardState::Present(input_chunk.as_slice())
286 }
287 None => RecoveryShardState::MissingRecover(output_chunk.as_mut_slice()),
288 },
289 );
290 let parity =
291 parity_sector_record_chunks
292 .iter()
293 .map(|maybe_input_chunk| match maybe_input_chunk {
294 Some(input_chunk) => RecoveryShardState::Present(input_chunk.as_slice()),
295 None => RecoveryShardState::MissingIgnore,
296 });
297 erasure_coding.recover(source, parity).map_err(|error| {
298 ReadingError::FailedToErasureDecodeRecord {
299 piece_offset,
300 error,
301 }
302 })?;
303
304 Ok(recovered_record)
305}
306
307pub(crate) async fn read_record_metadata<S, A>(
309 piece_offset: PieceOffset,
310 pieces_in_sector: u16,
311 sector: &ReadAt<S, A>,
312) -> Result<RecordMetadata, ReadingError>
313where
314 S: ReadAtSync,
315 A: ReadAtAsync,
316{
317 let sector_metadata_start = SectorContentsMap::encoded_size(pieces_in_sector) as u64
318 + sector_record_chunks_size(pieces_in_sector) as u64;
319 let record_metadata_offset =
321 sector_metadata_start + RecordMetadata::encoded_size() as u64 * u64::from(piece_offset);
322
323 let mut record_metadata_bytes = vec![0; RecordMetadata::encoded_size()];
324 match sector {
325 ReadAt::Sync(sector) => {
326 sector.read_at(&mut record_metadata_bytes, record_metadata_offset)?;
327 }
328 ReadAt::Async(sector) => {
329 record_metadata_bytes = sector
330 .read_at(record_metadata_bytes, record_metadata_offset)
331 .await?;
332 }
333 }
334 let record_metadata = RecordMetadata::decode(&mut record_metadata_bytes.as_ref())
335 .expect("Length is correct, contents doesn't have specific structure to it; qed");
336
337 Ok(record_metadata)
338}
339
340pub async fn read_piece<PosTable, S, A>(
345 piece_offset: PieceOffset,
346 sector_id: &SectorId,
347 sector_metadata: &SectorMetadataChecksummed,
348 sector: &ReadAt<S, A>,
349 erasure_coding: &ErasureCoding,
350 table_generator: &PosTable::Generator,
351) -> Result<Piece, ReadingError>
352where
353 PosTable: Table,
354 S: ReadAtSync,
355 A: ReadAtAsync,
356{
357 let pieces_in_sector = sector_metadata.pieces_in_sector;
358
359 let sector_contents_map = {
360 let mut sector_contents_map_bytes =
361 vec![0; SectorContentsMap::encoded_size(pieces_in_sector)];
362 match sector {
363 ReadAt::Sync(sector) => {
364 sector.read_at(&mut sector_contents_map_bytes, 0)?;
365 }
366 ReadAt::Async(sector) => {
367 sector_contents_map_bytes = sector.read_at(sector_contents_map_bytes, 0).await?;
368 }
369 }
370
371 SectorContentsMap::from_bytes(§or_contents_map_bytes, pieces_in_sector)?
372 };
373
374 let sector_record_chunks = read_sector_record_chunks(
375 piece_offset,
376 pieces_in_sector,
377 §or_metadata.s_bucket_offsets(),
378 §or_contents_map,
379 &table_generator.create_proofs(§or_id.derive_evaluation_seed(piece_offset)),
380 sector,
381 )
382 .await?;
383 let record = recover_source_record(§or_record_chunks, piece_offset, erasure_coding)?;
385
386 let record_metadata = read_record_metadata(piece_offset, pieces_in_sector, sector).await?;
387
388 let mut piece = Piece::default();
389
390 piece.record_mut().copy_from_slice(record.as_slice());
391
392 *piece.root_mut() = record_metadata.root;
393 *piece.parity_chunks_root_mut() = record_metadata.parity_chunks_root;
394 *piece.proof_mut() = record_metadata.proof;
395
396 let actual_checksum = Blake3Hash::from(blake3::hash(piece.as_ref()));
398 if actual_checksum != record_metadata.piece_checksum {
399 debug!(
400 ?sector_id,
401 %piece_offset,
402 %actual_checksum,
403 expected_checksum = %record_metadata.piece_checksum,
404 "Hash doesn't match, plotted piece is corrupted"
405 );
406
407 return Err(ReadingError::ChecksumMismatch);
408 }
409
410 Ok(piece.to_shared())
411}