1use crate::sector::{
8 RecordMetadata, SectorContentsMap, SectorContentsMapFromBytesError, SectorMetadataChecksummed,
9 sector_record_chunks_size,
10};
11use crate::{ReadAt, ReadAtAsync, ReadAtSync};
12use ab_core_primitives::hashes::Blake3Hash;
13use ab_core_primitives::pieces::{Piece, PieceOffset, Record, RecordChunk};
14use ab_core_primitives::sectors::{SBucket, SectorId};
15use ab_erasure_coding::{ErasureCoding, ErasureCodingError, RecoveryShardState};
16use ab_proof_of_space::{Table, TableGenerator};
17use futures::StreamExt;
18use futures::stream::FuturesUnordered;
19use parity_scale_codec::Decode;
20use rayon::prelude::*;
21use std::mem::ManuallyDrop;
22use std::simd::Simd;
23use std::str::FromStr;
24use std::{fmt, io};
25use thiserror::Error;
26use tracing::debug;
27
28#[derive(Debug, Error)]
30pub enum ReadingError {
31 #[error("Failed to read chunk at location {chunk_location}: {error}")]
36 FailedToReadChunk {
37 chunk_location: u64,
39 error: io::Error,
41 },
42 #[error("Missing PoS proof for s-bucket {s_bucket}")]
47 MissingPosProof {
48 s_bucket: SBucket,
50 },
51 #[error("Failed to erasure-decode record at offset {piece_offset}: {error}")]
53 FailedToErasureDecodeRecord {
54 piece_offset: PieceOffset,
56 error: ErasureCodingError,
58 },
59 #[error("Wrong record size after decoding: expected {expected}, actual {actual}")]
61 WrongRecordSizeAfterDecoding {
62 expected: usize,
64 actual: usize,
66 },
67 #[error("Failed to decode sector contents map: {0}")]
69 FailedToDecodeSectorContentsMap(#[from] SectorContentsMapFromBytesError),
70 #[error("Reading I/O error: {0}")]
72 Io(#[from] io::Error),
73 #[error("Checksum mismatch")]
75 ChecksumMismatch,
76}
77
78impl ReadingError {
79 pub fn is_fatal(&self) -> bool {
81 match self {
82 ReadingError::FailedToReadChunk { .. } => false,
83 ReadingError::MissingPosProof { .. } => false,
84 ReadingError::FailedToErasureDecodeRecord { .. } => false,
85 ReadingError::WrongRecordSizeAfterDecoding { .. } => false,
86 ReadingError::FailedToDecodeSectorContentsMap(_) => false,
87 ReadingError::Io(_) => true,
88 ReadingError::ChecksumMismatch => false,
89 }
90 }
91}
92
93#[derive(Debug, Copy, Clone)]
98pub enum ReadSectorRecordChunksMode {
99 ConcurrentChunks,
102 WholeSector,
105}
106
107impl fmt::Display for ReadSectorRecordChunksMode {
108 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109 match self {
110 Self::ConcurrentChunks => {
111 write!(f, "ConcurrentChunks")
112 }
113 Self::WholeSector => {
114 write!(f, "WholeSector")
115 }
116 }
117 }
118}
119
120impl FromStr for ReadSectorRecordChunksMode {
121 type Err = String;
122
123 fn from_str(s: &str) -> Result<Self, Self::Err> {
124 match s {
125 "ConcurrentChunks" => Ok(Self::ConcurrentChunks),
126 "WholeSector" => Ok(Self::WholeSector),
127 s => Err(format!("Can't parse {s} as `ReadSectorRecordChunksMode`")),
128 }
129 }
130}
131
132pub async fn read_sector_record_chunks<PosTable, S, A>(
137 piece_offset: PieceOffset,
138 pieces_in_sector: u16,
139 s_bucket_offsets: &[u32; Record::NUM_S_BUCKETS],
140 sector_contents_map: &SectorContentsMap,
141 pos_table: &PosTable,
142 sector: &ReadAt<S, A>,
143 mode: ReadSectorRecordChunksMode,
144) -> Result<Box<[Option<RecordChunk>; Record::NUM_S_BUCKETS]>, ReadingError>
145where
146 PosTable: Table,
147 S: ReadAtSync,
148 A: ReadAtAsync,
149{
150 let mut record_chunks = Box::<[Option<RecordChunk>; Record::NUM_S_BUCKETS]>::try_from(
151 vec![None::<RecordChunk>; Record::NUM_S_BUCKETS].into_boxed_slice(),
152 )
153 .expect("Correct size; qed");
154
155 let read_chunks_inputs = record_chunks
156 .par_iter_mut()
157 .zip(sector_contents_map.par_iter_record_chunk_to_plot(piece_offset))
158 .zip(
159 (u16::from(SBucket::ZERO)..=u16::from(SBucket::MAX))
160 .into_par_iter()
161 .map(SBucket::from)
162 .zip(s_bucket_offsets.par_iter()),
163 )
164 .map(
165 |((maybe_record_chunk, maybe_chunk_details), (s_bucket, &s_bucket_offset))| {
166 let (chunk_offset, encoded_chunk_used) = maybe_chunk_details?;
167
168 let chunk_location = chunk_offset as u64 + u64::from(s_bucket_offset);
169
170 Some((
171 maybe_record_chunk,
172 chunk_location,
173 encoded_chunk_used,
174 s_bucket,
175 ))
176 },
177 )
178 .collect::<Vec<_>>();
179
180 let sector_contents_map_size = SectorContentsMap::encoded_size(pieces_in_sector) as u64;
181 let sector_bytes = match mode {
182 ReadSectorRecordChunksMode::ConcurrentChunks => None,
183 ReadSectorRecordChunksMode::WholeSector => {
184 Some(vec![0u8; crate::sector::sector_size(pieces_in_sector)])
185 }
186 };
187 match sector {
188 ReadAt::Sync(sector) => {
189 let sector_bytes = {
190 if let Some(mut sector_bytes) = sector_bytes {
191 sector.read_at(&mut sector_bytes, 0)?;
192 Some(sector_bytes)
193 } else {
194 None
195 }
196 };
197 read_chunks_inputs.into_par_iter().flatten().try_for_each(
198 |(maybe_record_chunk, chunk_location, encoded_chunk_used, s_bucket)| {
199 let mut record_chunk = [0; RecordChunk::SIZE];
200 if let Some(sector_bytes) = §or_bytes {
201 record_chunk.copy_from_slice(
202 §or_bytes[sector_contents_map_size as usize
203 + chunk_location as usize * RecordChunk::SIZE..]
204 [..RecordChunk::SIZE],
205 );
206 } else {
207 sector
208 .read_at(
209 &mut record_chunk,
210 sector_contents_map_size
211 + chunk_location * RecordChunk::SIZE as u64,
212 )
213 .map_err(|error| ReadingError::FailedToReadChunk {
214 chunk_location,
215 error,
216 })?;
217 }
218
219 if encoded_chunk_used {
221 let proof = pos_table
222 .find_proof(s_bucket.into())
223 .ok_or(ReadingError::MissingPosProof { s_bucket })?;
224
225 record_chunk =
226 Simd::to_array(Simd::from(record_chunk) ^ Simd::from(*proof.hash()));
227 }
228
229 maybe_record_chunk.replace(RecordChunk::from(record_chunk));
230
231 Ok::<_, ReadingError>(())
232 },
233 )?;
234 }
235 ReadAt::Async(sector) => {
236 let sector_bytes = &{
237 if let Some(sector_bytes) = sector_bytes {
238 Some(sector.read_at(sector_bytes, 0).await?)
239 } else {
240 None
241 }
242 };
243 let processing_chunks = read_chunks_inputs
244 .into_iter()
245 .flatten()
246 .map(
247 |(maybe_record_chunk, chunk_location, encoded_chunk_used, s_bucket)| async move {
248 let mut record_chunk = [0; RecordChunk::SIZE];
249 if let Some(sector_bytes) = §or_bytes {
250 record_chunk.copy_from_slice(
251 §or_bytes[sector_contents_map_size as usize
252 + chunk_location as usize * RecordChunk::SIZE..]
253 [..RecordChunk::SIZE],
254 );
255 } else {
256 record_chunk.copy_from_slice(
257 §or
258 .read_at(
259 vec![0; RecordChunk::SIZE],
260 sector_contents_map_size + chunk_location * RecordChunk::SIZE as u64,
261 )
262 .await
263 .map_err(|error| ReadingError::FailedToReadChunk {
264 chunk_location,
265 error,
266 })?
267 );
268 }
269
270
271 if encoded_chunk_used {
273 let proof = pos_table.find_proof(s_bucket.into())
274 .ok_or(ReadingError::MissingPosProof { s_bucket })?;
275
276 record_chunk = Simd::to_array(
277 Simd::from(record_chunk) ^ Simd::from(*proof.hash()),
278 );
279 }
280
281 maybe_record_chunk.replace(RecordChunk::from(record_chunk));
282
283 Ok::<_, ReadingError>(())
284 },
285 )
286 .collect::<FuturesUnordered<_>>()
287 .filter_map(|result| async move {
288 result.err()
289 });
290
291 std::pin::pin!(processing_chunks)
292 .next()
293 .await
294 .map_or(Ok(()), Err)?;
295 }
296 }
297
298 Ok(record_chunks)
299}
300
301pub fn recover_extended_record_chunks(
303 sector_record_chunks: &[Option<RecordChunk>; Record::NUM_S_BUCKETS],
304 piece_offset: PieceOffset,
305 erasure_coding: &ErasureCoding,
306) -> Result<Box<[RecordChunk; Record::NUM_S_BUCKETS]>, ReadingError> {
307 let mut recovered_sector_record_chunks = vec![[0u8; RecordChunk::SIZE]; Record::NUM_S_BUCKETS];
310 {
311 let (source_sector_record_chunks, parity_sector_record_chunks) =
312 sector_record_chunks.split_at(Record::NUM_CHUNKS);
313 let (source_recovered_sector_record_chunks, parity_recovered_sector_record_chunks) =
314 recovered_sector_record_chunks.split_at_mut(Record::NUM_CHUNKS);
315
316 let source = source_sector_record_chunks
317 .iter()
318 .zip(source_recovered_sector_record_chunks.iter_mut())
319 .map(
320 |(maybe_input_chunk, output_chunk)| match maybe_input_chunk {
321 Some(input_chunk) => {
322 output_chunk.copy_from_slice(input_chunk.as_slice());
323 RecoveryShardState::Present(input_chunk.as_slice())
324 }
325 None => RecoveryShardState::MissingRecover(output_chunk.as_mut_slice()),
326 },
327 );
328 let parity = parity_sector_record_chunks
329 .iter()
330 .zip(parity_recovered_sector_record_chunks.iter_mut())
331 .map(
332 |(maybe_input_chunk, output_chunk)| match maybe_input_chunk {
333 Some(input_chunk) => {
334 output_chunk.copy_from_slice(input_chunk.as_slice());
335 RecoveryShardState::Present(input_chunk.as_slice())
336 }
337 None => RecoveryShardState::MissingRecover(output_chunk.as_mut_slice()),
338 },
339 );
340 erasure_coding.recover(source, parity).map_err(|error| {
341 ReadingError::FailedToErasureDecodeRecord {
342 piece_offset,
343 error,
344 }
345 })?;
346 }
347
348 let record_chunks = recovered_sector_record_chunks
351 .into_iter()
352 .map(RecordChunk::from)
353 .collect::<Box<_>>();
354 let mut record_chunks = ManuallyDrop::new(record_chunks);
355 let record_chunks = unsafe { Box::from_raw(record_chunks.as_mut_ptr() as *mut _) };
357
358 Ok(record_chunks)
359}
360
361pub fn recover_source_record(
363 sector_record_chunks: &[Option<RecordChunk>; Record::NUM_S_BUCKETS],
364 piece_offset: PieceOffset,
365 erasure_coding: &ErasureCoding,
366) -> Result<Box<Record>, ReadingError> {
367 let mut recovered_record = Record::new_boxed();
369
370 let (source_sector_record_chunks, parity_sector_record_chunks) =
371 sector_record_chunks.split_at(Record::NUM_CHUNKS);
372 let source = source_sector_record_chunks
373 .iter()
374 .zip(recovered_record.iter_mut())
375 .map(
376 |(maybe_input_chunk, output_chunk)| match maybe_input_chunk {
377 Some(input_chunk) => {
378 output_chunk.copy_from_slice(input_chunk.as_slice());
379 RecoveryShardState::Present(input_chunk.as_slice())
380 }
381 None => RecoveryShardState::MissingRecover(output_chunk.as_mut_slice()),
382 },
383 );
384 let parity =
385 parity_sector_record_chunks
386 .iter()
387 .map(|maybe_input_chunk| match maybe_input_chunk {
388 Some(input_chunk) => RecoveryShardState::Present(input_chunk.as_slice()),
389 None => RecoveryShardState::MissingIgnore,
390 });
391 erasure_coding.recover(source, parity).map_err(|error| {
392 ReadingError::FailedToErasureDecodeRecord {
393 piece_offset,
394 error,
395 }
396 })?;
397
398 Ok(recovered_record)
399}
400
401pub(crate) async fn read_record_metadata<S, A>(
403 piece_offset: PieceOffset,
404 pieces_in_sector: u16,
405 sector: &ReadAt<S, A>,
406) -> Result<RecordMetadata, ReadingError>
407where
408 S: ReadAtSync,
409 A: ReadAtAsync,
410{
411 let sector_metadata_start = SectorContentsMap::encoded_size(pieces_in_sector) as u64
412 + sector_record_chunks_size(pieces_in_sector) as u64;
413 let record_metadata_offset =
415 sector_metadata_start + RecordMetadata::encoded_size() as u64 * u64::from(piece_offset);
416
417 let mut record_metadata_bytes = vec![0; RecordMetadata::encoded_size()];
418 match sector {
419 ReadAt::Sync(sector) => {
420 sector.read_at(&mut record_metadata_bytes, record_metadata_offset)?;
421 }
422 ReadAt::Async(sector) => {
423 record_metadata_bytes = sector
424 .read_at(record_metadata_bytes, record_metadata_offset)
425 .await?;
426 }
427 }
428 let record_metadata = RecordMetadata::decode(&mut record_metadata_bytes.as_ref())
429 .expect("Length is correct, contents doesn't have specific structure to it; qed");
430
431 Ok(record_metadata)
432}
433
434pub async fn read_piece<PosTable, S, A>(
439 piece_offset: PieceOffset,
440 sector_id: &SectorId,
441 sector_metadata: &SectorMetadataChecksummed,
442 sector: &ReadAt<S, A>,
443 erasure_coding: &ErasureCoding,
444 mode: ReadSectorRecordChunksMode,
445 table_generator: &mut PosTable::Generator,
446) -> Result<Piece, ReadingError>
447where
448 PosTable: Table,
449 S: ReadAtSync,
450 A: ReadAtAsync,
451{
452 let pieces_in_sector = sector_metadata.pieces_in_sector;
453
454 let sector_contents_map = {
455 let mut sector_contents_map_bytes =
456 vec![0; SectorContentsMap::encoded_size(pieces_in_sector)];
457 match sector {
458 ReadAt::Sync(sector) => {
459 sector.read_at(&mut sector_contents_map_bytes, 0)?;
460 }
461 ReadAt::Async(sector) => {
462 sector_contents_map_bytes = sector.read_at(sector_contents_map_bytes, 0).await?;
463 }
464 }
465
466 SectorContentsMap::from_bytes(§or_contents_map_bytes, pieces_in_sector)?
467 };
468
469 let sector_record_chunks = read_sector_record_chunks(
470 piece_offset,
471 pieces_in_sector,
472 §or_metadata.s_bucket_offsets(),
473 §or_contents_map,
474 &table_generator.generate(§or_id.derive_evaluation_seed(piece_offset)),
475 sector,
476 mode,
477 )
478 .await?;
479 let record = recover_source_record(§or_record_chunks, piece_offset, erasure_coding)?;
481
482 let record_metadata = read_record_metadata(piece_offset, pieces_in_sector, sector).await?;
483
484 let mut piece = Piece::default();
485
486 piece.record_mut().copy_from_slice(record.as_slice());
487
488 *piece.root_mut() = record_metadata.root;
489 *piece.parity_chunks_root_mut() = record_metadata.parity_chunks_root;
490 *piece.proof_mut() = record_metadata.proof;
491
492 let actual_checksum = Blake3Hash::from(blake3::hash(piece.as_ref()));
494 if actual_checksum != record_metadata.piece_checksum {
495 debug!(
496 ?sector_id,
497 %piece_offset,
498 %actual_checksum,
499 expected_checksum = %record_metadata.piece_checksum,
500 "Hash doesn't match, plotted piece is corrupted"
501 );
502
503 return Err(ReadingError::ChecksumMismatch);
504 }
505
506 Ok(piece.to_shared())
507}