1use ab_core_primitives::checksum::Blake3Checksummed;
10use ab_core_primitives::hashes::Blake3Hash;
11use ab_core_primitives::pieces::{PieceOffset, Record, RecordChunksRoot, RecordProof, RecordRoot};
12use ab_core_primitives::sectors::{SBucket, SectorIndex};
13use ab_core_primitives::segments::{HistorySize, SegmentIndex};
14use ab_io_type::trivial_type::TrivialType;
15use bitvec::prelude::*;
16use parity_scale_codec::{Decode, Encode};
17use rayon::prelude::*;
18use std::mem::ManuallyDrop;
19use std::ops::{Deref, DerefMut};
20use std::{mem, slice};
21use thiserror::Error;
22use tracing::debug;
23
24#[inline]
28pub const fn sector_record_chunks_size(pieces_in_sector: u16) -> usize {
29 pieces_in_sector as usize * Record::SIZE
30}
31
32#[inline]
36pub const fn sector_record_metadata_size(pieces_in_sector: u16) -> usize {
37 pieces_in_sector as usize * RecordMetadata::encoded_size()
38}
39
40#[inline]
48pub const fn sector_size(pieces_in_sector: u16) -> usize {
49 sector_record_chunks_size(pieces_in_sector)
50 + sector_record_metadata_size(pieces_in_sector)
51 + SectorContentsMap::encoded_size(pieces_in_sector)
52 + Blake3Hash::SIZE
53}
54
55#[derive(Debug, Encode, Decode, Clone)]
57pub struct SectorMetadata {
58 pub sector_index: SectorIndex,
60 pub pieces_in_sector: u16,
62 pub s_bucket_sizes: Box<[u16; Record::NUM_S_BUCKETS]>,
64 pub history_size: HistorySize,
66}
67
68impl SectorMetadata {
69 pub fn s_bucket_offsets(&self) -> Box<[u32; Record::NUM_S_BUCKETS]> {
71 let s_bucket_offsets = self
72 .s_bucket_sizes
73 .iter()
74 .map({
75 let mut base_offset = 0;
76
77 move |s_bucket_size| {
78 let offset = base_offset;
79 base_offset += u32::from(*s_bucket_size);
80 offset
81 }
82 })
83 .collect::<Box<_>>();
84
85 assert_eq!(s_bucket_offsets.len(), Record::NUM_S_BUCKETS);
86 let mut s_bucket_offsets = ManuallyDrop::new(s_bucket_offsets);
87 unsafe { Box::from_raw(s_bucket_offsets.as_mut_ptr() as *mut [u32; Record::NUM_S_BUCKETS]) }
89 }
90}
91
92#[derive(Debug, Clone, Encode, Decode)]
94pub struct SectorMetadataChecksummed(Blake3Checksummed<SectorMetadata>);
95
96impl From<SectorMetadata> for SectorMetadataChecksummed {
97 #[inline]
98 fn from(value: SectorMetadata) -> Self {
99 Self(Blake3Checksummed(value))
100 }
101}
102
103impl Deref for SectorMetadataChecksummed {
104 type Target = SectorMetadata;
105
106 #[inline]
107 fn deref(&self) -> &Self::Target {
108 &self.0.0
109 }
110}
111
112impl DerefMut for SectorMetadataChecksummed {
113 #[inline]
114 fn deref_mut(&mut self) -> &mut Self::Target {
115 &mut self.0.0
116 }
117}
118
119impl SectorMetadataChecksummed {
120 #[inline]
124 pub fn encoded_size() -> usize {
125 let default = SectorMetadataChecksummed::from(SectorMetadata {
126 sector_index: SectorIndex::ZERO,
127 pieces_in_sector: 0,
128 s_bucket_sizes: unsafe { Box::new_zeroed().assume_init() },
131 history_size: HistorySize::from(SegmentIndex::ZERO),
132 });
133
134 default.encoded_size()
135 }
136}
137
138#[derive(Debug, Default, Clone, Encode, Decode)]
140pub(crate) struct RecordMetadata {
141 pub(crate) root: RecordRoot,
143 pub(crate) parity_chunks_root: RecordChunksRoot,
145 pub(crate) proof: RecordProof,
147 pub(crate) piece_checksum: Blake3Hash,
149}
150
151impl RecordMetadata {
152 pub(crate) const fn encoded_size() -> usize {
153 RecordProof::SIZE + RecordRoot::SIZE + RecordChunksRoot::SIZE + Blake3Hash::SIZE
154 }
155}
156
157#[derive(Debug, Clone)]
159pub(crate) struct RawSector {
160 pub(crate) records: Vec<Record>,
162 pub(crate) metadata: Vec<RecordMetadata>,
164}
165
166impl RawSector {
167 pub(crate) fn new(pieces_in_sector: u16) -> Self {
169 Self {
170 records: Record::new_zero_vec(usize::from(pieces_in_sector)),
171 metadata: vec![RecordMetadata::default(); usize::from(pieces_in_sector)],
172 }
173 }
174}
175
176type SingleRecordBitArray = BitArray<[u8; Record::NUM_S_BUCKETS / u8::BITS as usize]>;
178
179const SINGLE_RECORD_BIT_ARRAY_SIZE: usize = mem::size_of::<SingleRecordBitArray>();
180
181#[derive(Debug)]
188pub struct EncodedChunksUsed<'a> {
189 encoded_record_chunks_used: &'a mut SingleRecordBitArray,
190 num_encoded_record_chunks: &'a mut SBucket,
191 potentially_updated: bool,
192}
193
194impl Drop for EncodedChunksUsed<'_> {
195 fn drop(&mut self) {
196 if self.potentially_updated {
197 let num_encoded_record_chunks = self.encoded_record_chunks_used.count_ones();
198 assert!(num_encoded_record_chunks <= SBucket::MAX.into());
199 *self.num_encoded_record_chunks = SBucket::try_from(num_encoded_record_chunks)
200 .expect("Checked with explicit assert above; qed");
201 }
202 }
203}
204
205impl EncodedChunksUsed<'_> {
206 pub fn iter(&self) -> impl ExactSizeIterator<Item = impl Deref<Target = bool> + '_> + '_ {
208 self.encoded_record_chunks_used.iter()
209 }
210
211 pub fn iter_mut(
213 &mut self,
214 ) -> impl ExactSizeIterator<Item = impl DerefMut<Target = bool> + '_> + '_ {
215 self.potentially_updated = true;
216 self.encoded_record_chunks_used.iter_mut()
217 }
218}
219
220#[derive(Debug, Error, Copy, Clone, Eq, PartialEq)]
222pub enum SectorContentsMapFromBytesError {
223 #[error("Invalid bytes length, expected {expected}, actual {actual}")]
225 InvalidBytesLength {
226 expected: usize,
228 actual: usize,
230 },
231 #[error("Invalid number of encoded record chunks: {actual}")]
233 InvalidEncodedRecordChunks {
234 actual: usize,
236 max: usize,
238 },
239 #[error("Checksum mismatch")]
241 ChecksumMismatch,
242}
243
244#[derive(Debug, Error, Copy, Clone, Eq, PartialEq)]
246pub enum SectorContentsMapEncodeIntoError {
247 #[error("Invalid bytes length, expected {expected}, actual {actual}")]
249 InvalidBytesLength {
250 expected: usize,
252 actual: usize,
254 },
255}
256
257#[derive(Debug, Error, Copy, Clone, Eq, PartialEq)]
259pub enum SectorContentsMapIterationError {
260 #[error("S-bucket provided {provided} is out of range, max {max}")]
262 SBucketOutOfRange {
263 provided: usize,
265 max: usize,
267 },
268}
269
270#[derive(Debug, Clone, Eq, PartialEq)]
277pub struct SectorContentsMap {
278 num_encoded_record_chunks: Vec<SBucket>,
283 encoded_record_chunks_used: Vec<SingleRecordBitArray>,
286}
287
288impl SectorContentsMap {
289 pub fn new(pieces_in_sector: u16) -> Self {
292 Self {
293 num_encoded_record_chunks: vec![SBucket::default(); usize::from(pieces_in_sector)],
294 encoded_record_chunks_used: vec![
295 SingleRecordBitArray::default();
296 usize::from(pieces_in_sector)
297 ],
298 }
299 }
300
301 pub fn from_bytes(
306 bytes: &[u8],
307 pieces_in_sector: u16,
308 ) -> Result<Self, SectorContentsMapFromBytesError> {
309 if bytes.len() != Self::encoded_size(pieces_in_sector) {
310 return Err(SectorContentsMapFromBytesError::InvalidBytesLength {
311 expected: Self::encoded_size(pieces_in_sector),
312 actual: bytes.len(),
313 });
314 }
315
316 let (single_records_bit_arrays, expected_checksum) =
317 bytes.split_at(bytes.len() - Blake3Hash::SIZE);
318 let expected_checksum = unsafe {
320 Blake3Hash::from_bytes(expected_checksum).expect("No alignment requirements; qed")
321 };
322 let actual_checksum = Blake3Hash::from(blake3::hash(single_records_bit_arrays));
324 if &actual_checksum != expected_checksum {
325 debug!(
326 %actual_checksum,
327 %expected_checksum,
328 "Hash doesn't match, corrupted bytes"
329 );
330
331 return Err(SectorContentsMapFromBytesError::ChecksumMismatch);
332 }
333
334 let mut encoded_record_chunks_used =
335 vec![SingleRecordBitArray::default(); pieces_in_sector.into()];
336
337 let num_encoded_record_chunks = encoded_record_chunks_used
338 .iter_mut()
339 .zip(
340 single_records_bit_arrays
341 .as_chunks::<{ SINGLE_RECORD_BIT_ARRAY_SIZE }>()
342 .0,
343 )
344 .map(|(encoded_record_chunks_used, bytes)| {
345 encoded_record_chunks_used
346 .as_raw_mut_slice()
347 .copy_from_slice(bytes);
348 let num_encoded_record_chunks = encoded_record_chunks_used.count_ones();
349 if num_encoded_record_chunks > Record::NUM_CHUNKS {
350 return Err(
351 SectorContentsMapFromBytesError::InvalidEncodedRecordChunks {
352 actual: num_encoded_record_chunks,
353 max: Record::NUM_CHUNKS,
354 },
355 );
356 }
357 Ok(SBucket::try_from(num_encoded_record_chunks).expect("Verified above; qed"))
358 })
359 .collect::<Result<Vec<_>, _>>()?;
360
361 Ok(Self {
362 num_encoded_record_chunks,
363 encoded_record_chunks_used,
364 })
365 }
366
367 pub const fn encoded_size(pieces_in_sector: u16) -> usize {
370 SINGLE_RECORD_BIT_ARRAY_SIZE * pieces_in_sector as usize + Blake3Hash::SIZE
371 }
372
373 pub fn encode_into(&self, output: &mut [u8]) -> Result<(), SectorContentsMapEncodeIntoError> {
375 if output.len() != Self::encoded_size(self.encoded_record_chunks_used.len() as u16) {
376 return Err(SectorContentsMapEncodeIntoError::InvalidBytesLength {
377 expected: Self::encoded_size(self.encoded_record_chunks_used.len() as u16),
378 actual: output.len(),
379 });
380 }
381
382 let slice = self.encoded_record_chunks_used.as_slice();
383 let slice = unsafe {
385 slice::from_raw_parts(
386 slice.as_ptr() as *const u8,
387 slice.len() * SINGLE_RECORD_BIT_ARRAY_SIZE,
388 )
389 };
390
391 output[..slice.len()].copy_from_slice(slice);
393 output[slice.len()..].copy_from_slice(blake3::hash(slice).as_bytes());
394
395 Ok(())
396 }
397
398 pub fn num_encoded_record_chunks(&self) -> &[SBucket] {
400 &self.num_encoded_record_chunks
401 }
402
403 pub fn iter_record_bitfields(&self) -> &[SingleRecordBitArray] {
405 &self.encoded_record_chunks_used
406 }
407
408 pub fn iter_record_bitfields_mut(
410 &mut self,
411 ) -> impl ExactSizeIterator<Item = EncodedChunksUsed<'_>> + '_ {
412 self.encoded_record_chunks_used
413 .iter_mut()
414 .zip(&mut self.num_encoded_record_chunks)
415 .map(
416 |(encoded_record_chunks_used, num_encoded_record_chunks)| EncodedChunksUsed {
417 encoded_record_chunks_used,
418 num_encoded_record_chunks,
419 potentially_updated: false,
420 },
421 )
422 }
423
424 pub fn s_bucket_sizes(&self) -> Box<[u16; Record::NUM_S_BUCKETS]> {
426 let s_bucket_sizes = (u16::from(SBucket::ZERO)..=u16::from(SBucket::MAX))
428 .into_par_iter()
429 .map(SBucket::from)
430 .map(|s_bucket| {
431 self.iter_s_bucket_records(s_bucket)
432 .expect("S-bucket guaranteed to be in range; qed")
433 .count() as u16
434 })
435 .collect::<Box<_>>();
436
437 assert_eq!(s_bucket_sizes.len(), Record::NUM_S_BUCKETS);
438 let mut s_bucket_sizes = ManuallyDrop::new(s_bucket_sizes);
439 unsafe { Box::from_raw(s_bucket_sizes.as_mut_ptr() as *mut [u16; Record::NUM_S_BUCKETS]) }
441 }
442
443 pub fn iter_record_chunk_to_plot(
448 &self,
449 piece_offset: PieceOffset,
450 ) -> impl Iterator<Item = (SBucket, bool, usize)> + '_ {
451 (SBucket::ZERO..=SBucket::MAX)
453 .flat_map(|s_bucket| {
455 self.iter_s_bucket_records(s_bucket)
456 .expect("S-bucket guaranteed to be in range; qed")
457 .map(move |(current_piece_offset, encoded_chunk_used)| {
458 (s_bucket, current_piece_offset, encoded_chunk_used)
459 })
460 })
461 .enumerate()
464 .filter_map(
466 move |(chunk_location, (s_bucket, current_piece_offset, encoded_chunk_used))| {
467 (current_piece_offset == piece_offset).then_some((
469 s_bucket,
470 encoded_chunk_used,
471 chunk_location,
472 ))
473 },
474 )
475 .take(Record::NUM_CHUNKS)
477 }
478
479 pub fn par_iter_record_chunk_to_plot(
487 &self,
488 piece_offset: PieceOffset,
489 ) -> impl IndexedParallelIterator<Item = Option<(usize, bool)>> + '_ {
490 let piece_offset = usize::from(piece_offset);
491 (u16::from(SBucket::ZERO)..=u16::from(SBucket::MAX))
492 .into_par_iter()
493 .map(SBucket::from)
494 .map(move |s_bucket| {
496 let encoded_chunk_used = record_has_s_bucket_chunk(
497 s_bucket.into(),
498 &self.encoded_record_chunks_used[piece_offset],
499 usize::from(self.num_encoded_record_chunks[piece_offset]),
500 )?;
501
502 let chunk_offset = self
505 .encoded_record_chunks_used
506 .iter()
507 .zip(&self.num_encoded_record_chunks)
508 .take(piece_offset)
509 .filter(move |(record_bitfields, num_encoded_record_chunks)| {
510 record_has_s_bucket_chunk(
511 s_bucket.into(),
512 record_bitfields,
513 usize::from(**num_encoded_record_chunks),
514 )
515 .is_some()
516 })
517 .count();
518
519 Some((chunk_offset, encoded_chunk_used))
520 })
521 }
522
523 pub fn iter_s_bucket_records(
529 &self,
530 s_bucket: SBucket,
531 ) -> Result<impl Iterator<Item = (PieceOffset, bool)> + '_, SectorContentsMapIterationError>
532 {
533 let s_bucket = usize::from(s_bucket);
534
535 if s_bucket >= Record::NUM_S_BUCKETS {
536 return Err(SectorContentsMapIterationError::SBucketOutOfRange {
537 provided: s_bucket,
538 max: Record::NUM_S_BUCKETS,
539 });
540 }
541
542 Ok((PieceOffset::ZERO..)
543 .zip(
544 self.encoded_record_chunks_used
545 .iter()
546 .zip(&self.num_encoded_record_chunks),
547 )
548 .filter_map(
549 move |(piece_offset, (record_bitfields, num_encoded_record_chunks))| {
550 let encoded_chunk_used = record_has_s_bucket_chunk(
551 s_bucket,
552 record_bitfields,
553 usize::from(*num_encoded_record_chunks),
554 )?;
555
556 Some((piece_offset, encoded_chunk_used))
557 },
558 ))
559 }
560
561 pub fn iter_s_bucket_encoded_record_chunks_used(
567 &self,
568 s_bucket: SBucket,
569 ) -> Result<impl Iterator<Item = bool> + '_, SectorContentsMapIterationError> {
570 let s_bucket = usize::from(s_bucket);
571
572 if s_bucket >= Record::NUM_S_BUCKETS {
573 return Err(SectorContentsMapIterationError::SBucketOutOfRange {
574 provided: s_bucket,
575 max: Record::NUM_S_BUCKETS,
576 });
577 }
578
579 Ok(self
580 .encoded_record_chunks_used
581 .iter()
582 .map(move |record_bitfields| record_bitfields[s_bucket]))
583 }
584}
585
586fn record_has_s_bucket_chunk(
590 s_bucket: usize,
591 record_bitfields: &SingleRecordBitArray,
592 num_encoded_record_chunks: usize,
593) -> Option<bool> {
594 if record_bitfields[s_bucket] {
595 Some(true)
597 } else if num_encoded_record_chunks == Record::NUM_CHUNKS {
598 None
599 } else {
600 let encoded_before = record_bitfields[..s_bucket].count_ones();
602 let unencoded_before = s_bucket - encoded_before;
603 let unencoded_total = Record::NUM_CHUNKS.saturating_sub(num_encoded_record_chunks);
606
607 if unencoded_before < unencoded_total {
608 Some(false)
611 } else {
612 None
613 }
614 }
615}