1use crate::sector::{
8 RecordMetadata, SectorContentsMap, SectorContentsMapFromBytesError, SectorMetadataChecksummed,
9 sector_record_chunks_size,
10};
11use crate::{ReadAt, ReadAtAsync, ReadAtSync};
12use ab_core_primitives::hashes::Blake3Hash;
13use ab_core_primitives::pieces::{Piece, PieceOffset, Record, RecordChunk};
14use ab_core_primitives::sectors::{SBucket, SectorId};
15use ab_erasure_coding::{ErasureCoding, ErasureCodingError, RecoveryShardState};
16use ab_proof_of_space::{Table, TableGenerator};
17use futures::StreamExt;
18use futures::stream::FuturesUnordered;
19use parity_scale_codec::Decode;
20use rayon::prelude::*;
21use std::mem::ManuallyDrop;
22use std::simd::Simd;
23use std::str::FromStr;
24use std::{fmt, io};
25use thiserror::Error;
26use tracing::debug;
27
28#[derive(Debug, Error)]
30pub enum ReadingError {
31 #[error("Failed to read chunk at location {chunk_location}: {error}")]
36 FailedToReadChunk {
37 chunk_location: u64,
39 error: io::Error,
41 },
42 #[error("Missing PoS proof for s-bucket {s_bucket}")]
47 MissingPosProof {
48 s_bucket: SBucket,
50 },
51 #[error("Failed to erasure-decode record at offset {piece_offset}: {error}")]
53 FailedToErasureDecodeRecord {
54 piece_offset: PieceOffset,
56 error: ErasureCodingError,
58 },
59 #[error("Wrong record size after decoding: expected {expected}, actual {actual}")]
61 WrongRecordSizeAfterDecoding {
62 expected: usize,
64 actual: usize,
66 },
67 #[error("Failed to decode sector contents map: {0}")]
69 FailedToDecodeSectorContentsMap(#[from] SectorContentsMapFromBytesError),
70 #[error("Reading I/O error: {0}")]
72 Io(#[from] io::Error),
73 #[error("Checksum mismatch")]
75 ChecksumMismatch,
76}
77
78impl ReadingError {
79 pub fn is_fatal(&self) -> bool {
81 match self {
82 ReadingError::FailedToReadChunk { .. } => false,
83 ReadingError::MissingPosProof { .. } => false,
84 ReadingError::FailedToErasureDecodeRecord { .. } => false,
85 ReadingError::WrongRecordSizeAfterDecoding { .. } => false,
86 ReadingError::FailedToDecodeSectorContentsMap(_) => false,
87 ReadingError::Io(_) => true,
88 ReadingError::ChecksumMismatch => false,
89 }
90 }
91}
92
93#[derive(Debug, Copy, Clone)]
98pub enum ReadSectorRecordChunksMode {
99 ConcurrentChunks,
102 WholeSector,
105}
106
107impl fmt::Display for ReadSectorRecordChunksMode {
108 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109 match self {
110 Self::ConcurrentChunks => {
111 write!(f, "ConcurrentChunks")
112 }
113 Self::WholeSector => {
114 write!(f, "WholeSector")
115 }
116 }
117 }
118}
119
120impl FromStr for ReadSectorRecordChunksMode {
121 type Err = String;
122
123 fn from_str(s: &str) -> Result<Self, Self::Err> {
124 match s {
125 "ConcurrentChunks" => Ok(Self::ConcurrentChunks),
126 "WholeSector" => Ok(Self::WholeSector),
127 s => Err(format!("Can't parse {s} as `ReadSectorRecordChunksMode`")),
128 }
129 }
130}
131
132const _: () = {
135 assert!(65536 == Record::NUM_S_BUCKETS);
136};
137pub async fn read_sector_record_chunks<PosTable, S, A>(
142 piece_offset: PieceOffset,
143 pieces_in_sector: u16,
144 s_bucket_offsets: &[u32; 65536],
147 sector_contents_map: &SectorContentsMap,
148 pos_table: &PosTable,
149 sector: &ReadAt<S, A>,
150 mode: ReadSectorRecordChunksMode,
151) -> Result<Box<[Option<RecordChunk>; Record::NUM_S_BUCKETS]>, ReadingError>
152where
153 PosTable: Table,
154 S: ReadAtSync,
155 A: ReadAtAsync,
156{
157 let mut record_chunks = Box::<[Option<RecordChunk>; Record::NUM_S_BUCKETS]>::try_from(
158 vec![None::<RecordChunk>; Record::NUM_S_BUCKETS].into_boxed_slice(),
159 )
160 .expect("Correct size; qed");
161
162 let read_chunks_inputs = record_chunks
163 .par_iter_mut()
164 .zip(sector_contents_map.par_iter_record_chunk_to_plot(piece_offset))
165 .zip(
166 (u16::from(SBucket::ZERO)..=u16::from(SBucket::MAX))
167 .into_par_iter()
168 .map(SBucket::from)
169 .zip(s_bucket_offsets.par_iter()),
170 )
171 .map(
172 |((maybe_record_chunk, maybe_chunk_details), (s_bucket, &s_bucket_offset))| {
173 let (chunk_offset, encoded_chunk_used) = maybe_chunk_details?;
174
175 let chunk_location = chunk_offset as u64 + u64::from(s_bucket_offset);
176
177 Some((
178 maybe_record_chunk,
179 chunk_location,
180 encoded_chunk_used,
181 s_bucket,
182 ))
183 },
184 )
185 .collect::<Vec<_>>();
186
187 let sector_contents_map_size = SectorContentsMap::encoded_size(pieces_in_sector) as u64;
188 let sector_bytes = match mode {
189 ReadSectorRecordChunksMode::ConcurrentChunks => None,
190 ReadSectorRecordChunksMode::WholeSector => {
191 Some(vec![0u8; crate::sector::sector_size(pieces_in_sector)])
192 }
193 };
194 match sector {
195 ReadAt::Sync(sector) => {
196 let sector_bytes = {
197 if let Some(mut sector_bytes) = sector_bytes {
198 sector.read_at(&mut sector_bytes, 0)?;
199 Some(sector_bytes)
200 } else {
201 None
202 }
203 };
204 read_chunks_inputs.into_par_iter().flatten().try_for_each(
205 |(maybe_record_chunk, chunk_location, encoded_chunk_used, s_bucket)| {
206 let mut record_chunk = [0; RecordChunk::SIZE];
207 if let Some(sector_bytes) = §or_bytes {
208 record_chunk.copy_from_slice(
209 §or_bytes[sector_contents_map_size as usize
210 + chunk_location as usize * RecordChunk::SIZE..]
211 [..RecordChunk::SIZE],
212 );
213 } else {
214 sector
215 .read_at(
216 &mut record_chunk,
217 sector_contents_map_size
218 + chunk_location * RecordChunk::SIZE as u64,
219 )
220 .map_err(|error| ReadingError::FailedToReadChunk {
221 chunk_location,
222 error,
223 })?;
224 }
225
226 if encoded_chunk_used {
228 let proof = pos_table
229 .find_proof(s_bucket.into())
230 .ok_or(ReadingError::MissingPosProof { s_bucket })?;
231
232 record_chunk =
233 Simd::to_array(Simd::from(record_chunk) ^ Simd::from(*proof.hash()));
234 }
235
236 maybe_record_chunk.replace(RecordChunk::from(record_chunk));
237
238 Ok::<_, ReadingError>(())
239 },
240 )?;
241 }
242 ReadAt::Async(sector) => {
243 let sector_bytes = &{
244 if let Some(sector_bytes) = sector_bytes {
245 Some(sector.read_at(sector_bytes, 0).await?)
246 } else {
247 None
248 }
249 };
250 let processing_chunks = read_chunks_inputs
251 .into_iter()
252 .flatten()
253 .map(
254 |(maybe_record_chunk, chunk_location, encoded_chunk_used, s_bucket)| async move {
255 let mut record_chunk = [0; RecordChunk::SIZE];
256 if let Some(sector_bytes) = §or_bytes {
257 record_chunk.copy_from_slice(
258 §or_bytes[sector_contents_map_size as usize
259 + chunk_location as usize * RecordChunk::SIZE..]
260 [..RecordChunk::SIZE],
261 );
262 } else {
263 record_chunk.copy_from_slice(
264 §or
265 .read_at(
266 vec![0; RecordChunk::SIZE],
267 sector_contents_map_size + chunk_location * RecordChunk::SIZE as u64,
268 )
269 .await
270 .map_err(|error| ReadingError::FailedToReadChunk {
271 chunk_location,
272 error,
273 })?
274 );
275 }
276
277
278 if encoded_chunk_used {
280 let proof = pos_table.find_proof(s_bucket.into())
281 .ok_or(ReadingError::MissingPosProof { s_bucket })?;
282
283 record_chunk = Simd::to_array(
284 Simd::from(record_chunk) ^ Simd::from(*proof.hash()),
285 );
286 }
287
288 maybe_record_chunk.replace(RecordChunk::from(record_chunk));
289
290 Ok::<_, ReadingError>(())
291 },
292 )
293 .collect::<FuturesUnordered<_>>()
294 .filter_map(|result| async move {
295 result.err()
296 });
297
298 std::pin::pin!(processing_chunks)
299 .next()
300 .await
301 .map_or(Ok(()), Err)?;
302 }
303 }
304
305 Ok(record_chunks)
306}
307
308pub fn recover_extended_record_chunks(
310 sector_record_chunks: &[Option<RecordChunk>; Record::NUM_S_BUCKETS],
311 piece_offset: PieceOffset,
312 erasure_coding: &ErasureCoding,
313) -> Result<Box<[RecordChunk; Record::NUM_S_BUCKETS]>, ReadingError> {
314 let mut recovered_sector_record_chunks = vec![[0u8; RecordChunk::SIZE]; Record::NUM_S_BUCKETS];
317 {
318 let (source_sector_record_chunks, parity_sector_record_chunks) =
319 sector_record_chunks.split_at(Record::NUM_CHUNKS);
320 let (source_recovered_sector_record_chunks, parity_recovered_sector_record_chunks) =
321 recovered_sector_record_chunks.split_at_mut(Record::NUM_CHUNKS);
322
323 let source = source_sector_record_chunks
324 .iter()
325 .zip(source_recovered_sector_record_chunks.iter_mut())
326 .map(
327 |(maybe_input_chunk, output_chunk)| match maybe_input_chunk {
328 Some(input_chunk) => {
329 output_chunk.copy_from_slice(input_chunk.as_slice());
330 RecoveryShardState::Present(input_chunk.as_slice())
331 }
332 None => RecoveryShardState::MissingRecover(output_chunk.as_mut_slice()),
333 },
334 );
335 let parity = parity_sector_record_chunks
336 .iter()
337 .zip(parity_recovered_sector_record_chunks.iter_mut())
338 .map(
339 |(maybe_input_chunk, output_chunk)| match maybe_input_chunk {
340 Some(input_chunk) => {
341 output_chunk.copy_from_slice(input_chunk.as_slice());
342 RecoveryShardState::Present(input_chunk.as_slice())
343 }
344 None => RecoveryShardState::MissingRecover(output_chunk.as_mut_slice()),
345 },
346 );
347 erasure_coding.recover(source, parity).map_err(|error| {
348 ReadingError::FailedToErasureDecodeRecord {
349 piece_offset,
350 error,
351 }
352 })?;
353 }
354
355 let record_chunks = recovered_sector_record_chunks
358 .into_iter()
359 .map(RecordChunk::from)
360 .collect::<Box<_>>();
361 let mut record_chunks = ManuallyDrop::new(record_chunks);
362 let record_chunks = unsafe { Box::from_raw(record_chunks.as_mut_ptr() as *mut _) };
364
365 Ok(record_chunks)
366}
367
368pub fn recover_source_record(
370 sector_record_chunks: &[Option<RecordChunk>; Record::NUM_S_BUCKETS],
371 piece_offset: PieceOffset,
372 erasure_coding: &ErasureCoding,
373) -> Result<Box<Record>, ReadingError> {
374 let mut recovered_record = Record::new_boxed();
376
377 let (source_sector_record_chunks, parity_sector_record_chunks) =
378 sector_record_chunks.split_at(Record::NUM_CHUNKS);
379 let source = source_sector_record_chunks
380 .iter()
381 .zip(recovered_record.iter_mut())
382 .map(
383 |(maybe_input_chunk, output_chunk)| match maybe_input_chunk {
384 Some(input_chunk) => {
385 output_chunk.copy_from_slice(input_chunk.as_slice());
386 RecoveryShardState::Present(input_chunk.as_slice())
387 }
388 None => RecoveryShardState::MissingRecover(output_chunk.as_mut_slice()),
389 },
390 );
391 let parity =
392 parity_sector_record_chunks
393 .iter()
394 .map(|maybe_input_chunk| match maybe_input_chunk {
395 Some(input_chunk) => RecoveryShardState::Present(input_chunk.as_slice()),
396 None => RecoveryShardState::MissingIgnore,
397 });
398 erasure_coding.recover(source, parity).map_err(|error| {
399 ReadingError::FailedToErasureDecodeRecord {
400 piece_offset,
401 error,
402 }
403 })?;
404
405 Ok(recovered_record)
406}
407
408pub(crate) async fn read_record_metadata<S, A>(
410 piece_offset: PieceOffset,
411 pieces_in_sector: u16,
412 sector: &ReadAt<S, A>,
413) -> Result<RecordMetadata, ReadingError>
414where
415 S: ReadAtSync,
416 A: ReadAtAsync,
417{
418 let sector_metadata_start = SectorContentsMap::encoded_size(pieces_in_sector) as u64
419 + sector_record_chunks_size(pieces_in_sector) as u64;
420 let record_metadata_offset =
422 sector_metadata_start + RecordMetadata::encoded_size() as u64 * u64::from(piece_offset);
423
424 let mut record_metadata_bytes = vec![0; RecordMetadata::encoded_size()];
425 match sector {
426 ReadAt::Sync(sector) => {
427 sector.read_at(&mut record_metadata_bytes, record_metadata_offset)?;
428 }
429 ReadAt::Async(sector) => {
430 record_metadata_bytes = sector
431 .read_at(record_metadata_bytes, record_metadata_offset)
432 .await?;
433 }
434 }
435 let record_metadata = RecordMetadata::decode(&mut record_metadata_bytes.as_ref())
436 .expect("Length is correct, contents doesn't have specific structure to it; qed");
437
438 Ok(record_metadata)
439}
440
441pub async fn read_piece<PosTable, S, A>(
446 piece_offset: PieceOffset,
447 sector_id: &SectorId,
448 sector_metadata: &SectorMetadataChecksummed,
449 sector: &ReadAt<S, A>,
450 erasure_coding: &ErasureCoding,
451 mode: ReadSectorRecordChunksMode,
452 table_generator: &mut PosTable::Generator,
453) -> Result<Piece, ReadingError>
454where
455 PosTable: Table,
456 S: ReadAtSync,
457 A: ReadAtAsync,
458{
459 let pieces_in_sector = sector_metadata.pieces_in_sector;
460
461 let sector_contents_map = {
462 let mut sector_contents_map_bytes =
463 vec![0; SectorContentsMap::encoded_size(pieces_in_sector)];
464 match sector {
465 ReadAt::Sync(sector) => {
466 sector.read_at(&mut sector_contents_map_bytes, 0)?;
467 }
468 ReadAt::Async(sector) => {
469 sector_contents_map_bytes = sector.read_at(sector_contents_map_bytes, 0).await?;
470 }
471 }
472
473 SectorContentsMap::from_bytes(§or_contents_map_bytes, pieces_in_sector)?
474 };
475
476 let sector_record_chunks = read_sector_record_chunks(
477 piece_offset,
478 pieces_in_sector,
479 §or_metadata.s_bucket_offsets(),
480 §or_contents_map,
481 &table_generator.generate(§or_id.derive_evaluation_seed(piece_offset)),
482 sector,
483 mode,
484 )
485 .await?;
486 let record = recover_source_record(§or_record_chunks, piece_offset, erasure_coding)?;
488
489 let record_metadata = read_record_metadata(piece_offset, pieces_in_sector, sector).await?;
490
491 let mut piece = Piece::default();
492
493 piece.record_mut().copy_from_slice(record.as_slice());
494
495 *piece.root_mut() = record_metadata.root;
496 *piece.parity_chunks_root_mut() = record_metadata.parity_chunks_root;
497 *piece.proof_mut() = record_metadata.proof;
498
499 let actual_checksum = Blake3Hash::from(blake3::hash(piece.as_ref()));
501 if actual_checksum != record_metadata.piece_checksum {
502 debug!(
503 ?sector_id,
504 %piece_offset,
505 %actual_checksum,
506 expected_checksum = %record_metadata.piece_checksum,
507 "Hash doesn't match, plotted piece is corrupted"
508 );
509
510 return Err(ReadingError::ChecksumMismatch);
511 }
512
513 Ok(piece.to_shared())
514}