1use ab_core_primitives::checksum::Blake3Checksummed;
10use ab_core_primitives::hashes::Blake3Hash;
11use ab_core_primitives::pieces::{PieceOffset, Record, RecordChunksRoot, RecordProof, RecordRoot};
12use ab_core_primitives::sectors::{SBucket, SectorIndex};
13use ab_core_primitives::segments::{HistorySize, SegmentIndex};
14use ab_io_type::trivial_type::TrivialType;
15use bitvec::prelude::*;
16use parity_scale_codec::{Decode, Encode};
17use rayon::prelude::*;
18use std::mem::ManuallyDrop;
19use std::ops::{Deref, DerefMut};
20use std::{mem, slice};
21use thiserror::Error;
22use tracing::debug;
23
24#[inline]
28pub const fn sector_record_chunks_size(pieces_in_sector: u16) -> usize {
29 pieces_in_sector as usize * Record::SIZE
30}
31
32#[inline]
36pub const fn sector_record_metadata_size(pieces_in_sector: u16) -> usize {
37 pieces_in_sector as usize * RecordMetadata::encoded_size()
38}
39
40#[inline]
48pub const fn sector_size(pieces_in_sector: u16) -> usize {
49 sector_record_chunks_size(pieces_in_sector)
50 + sector_record_metadata_size(pieces_in_sector)
51 + SectorContentsMap::encoded_size(pieces_in_sector)
52 + Blake3Hash::SIZE
53}
54
55#[derive(Debug, Encode, Decode, Clone)]
57pub struct SectorMetadata {
58 pub sector_index: SectorIndex,
60 pub pieces_in_sector: u16,
62 pub s_bucket_sizes: Box<[u16; Record::NUM_S_BUCKETS]>,
64 pub history_size: HistorySize,
66}
67
68impl SectorMetadata {
69 pub fn s_bucket_offsets(&self) -> Box<[u32; Record::NUM_S_BUCKETS]> {
71 let s_bucket_offsets = self
72 .s_bucket_sizes
73 .iter()
74 .map({
75 let mut base_offset = 0;
76
77 move |s_bucket_size| {
78 let offset = base_offset;
79 base_offset += u32::from(*s_bucket_size);
80 offset
81 }
82 })
83 .collect::<Box<_>>();
84
85 assert_eq!(s_bucket_offsets.len(), Record::NUM_S_BUCKETS);
86 let mut s_bucket_offsets = ManuallyDrop::new(s_bucket_offsets);
87 unsafe { Box::from_raw(s_bucket_offsets.as_mut_ptr() as *mut [u32; Record::NUM_S_BUCKETS]) }
89 }
90}
91
92#[derive(Debug, Clone, Encode, Decode)]
94pub struct SectorMetadataChecksummed(Blake3Checksummed<SectorMetadata>);
95
96impl From<SectorMetadata> for SectorMetadataChecksummed {
97 #[inline]
98 fn from(value: SectorMetadata) -> Self {
99 Self(Blake3Checksummed(value))
100 }
101}
102
103impl Deref for SectorMetadataChecksummed {
104 type Target = SectorMetadata;
105
106 #[inline]
107 fn deref(&self) -> &Self::Target {
108 &self.0.0
109 }
110}
111
112impl DerefMut for SectorMetadataChecksummed {
113 #[inline]
114 fn deref_mut(&mut self) -> &mut Self::Target {
115 &mut self.0.0
116 }
117}
118
119impl SectorMetadataChecksummed {
120 #[inline]
124 pub fn encoded_size() -> usize {
125 let default = SectorMetadataChecksummed::from(SectorMetadata {
126 sector_index: SectorIndex::ZERO,
127 pieces_in_sector: 0,
128 s_bucket_sizes: unsafe { Box::new_zeroed().assume_init() },
131 history_size: HistorySize::from(SegmentIndex::ZERO),
132 });
133
134 default.encoded_size()
135 }
136}
137
138#[derive(Debug, Default, Clone, Encode, Decode)]
140pub(crate) struct RecordMetadata {
141 pub(crate) root: RecordRoot,
143 pub(crate) parity_chunks_root: RecordChunksRoot,
145 pub(crate) proof: RecordProof,
147 pub(crate) piece_checksum: Blake3Hash,
149}
150
151impl RecordMetadata {
152 pub(crate) const fn encoded_size() -> usize {
153 RecordProof::SIZE + RecordRoot::SIZE + RecordChunksRoot::SIZE + Blake3Hash::SIZE
154 }
155}
156
157#[derive(Debug, Clone)]
159pub(crate) struct RawSector {
160 pub(crate) records: Vec<Record>,
162 pub(crate) metadata: Vec<RecordMetadata>,
164}
165
166impl RawSector {
167 pub(crate) fn new(pieces_in_sector: u16) -> Self {
169 Self {
170 records: Record::new_zero_vec(usize::from(pieces_in_sector)),
171 metadata: vec![RecordMetadata::default(); usize::from(pieces_in_sector)],
172 }
173 }
174}
175
176type SingleRecordBitArray = BitArray<[u8; Record::NUM_S_BUCKETS / u8::BITS as usize]>;
178
179const SINGLE_RECORD_BIT_ARRAY_SIZE: usize = mem::size_of::<SingleRecordBitArray>();
180
181#[derive(Debug)]
188pub struct EncodedChunksUsed<'a> {
189 encoded_record_chunks_used: &'a mut SingleRecordBitArray,
190 num_encoded_record_chunks: &'a mut SBucket,
191 potentially_updated: bool,
192}
193
194impl Drop for EncodedChunksUsed<'_> {
195 fn drop(&mut self) {
196 if self.potentially_updated {
197 let num_encoded_record_chunks = self.encoded_record_chunks_used.count_ones();
198 assert!(num_encoded_record_chunks <= SBucket::MAX.into());
199 *self.num_encoded_record_chunks = SBucket::try_from(num_encoded_record_chunks)
200 .expect("Checked with explicit assert above; qed");
201 }
202 }
203}
204
205impl EncodedChunksUsed<'_> {
206 pub fn iter(&self) -> impl ExactSizeIterator<Item = impl Deref<Target = bool> + '_> + '_ {
208 self.encoded_record_chunks_used.iter()
209 }
210
211 pub fn iter_mut(
213 &mut self,
214 ) -> impl ExactSizeIterator<Item = impl DerefMut<Target = bool> + '_> + '_ {
215 self.potentially_updated = true;
216 self.encoded_record_chunks_used.iter_mut()
217 }
218}
219
220#[derive(Debug, Error, Copy, Clone, Eq, PartialEq)]
222pub enum SectorContentsMapFromBytesError {
223 #[error("Invalid bytes length, expected {expected}, actual {actual}")]
225 InvalidBytesLength {
226 expected: usize,
228 actual: usize,
230 },
231 #[error("Invalid number of encoded record chunks: {actual}")]
233 InvalidEncodedRecordChunks {
234 actual: usize,
236 max: usize,
238 },
239 #[error("Checksum mismatch")]
241 ChecksumMismatch,
242}
243
244#[derive(Debug, Error, Copy, Clone, Eq, PartialEq)]
246pub enum SectorContentsMapEncodeIntoError {
247 #[error("Invalid bytes length, expected {expected}, actual {actual}")]
249 InvalidBytesLength {
250 expected: usize,
252 actual: usize,
254 },
255}
256
257#[derive(Debug, Error, Copy, Clone, Eq, PartialEq)]
259pub enum SectorContentsMapIterationError {
260 #[error("S-bucket provided {provided} is out of range, max {max}")]
262 SBucketOutOfRange {
263 provided: usize,
265 max: usize,
267 },
268}
269
270#[derive(Debug, Clone, Eq, PartialEq)]
277pub struct SectorContentsMap {
278 num_encoded_record_chunks: Vec<SBucket>,
283 encoded_record_chunks_used: Vec<SingleRecordBitArray>,
286}
287
288impl SectorContentsMap {
289 pub fn new(pieces_in_sector: u16) -> Self {
292 Self {
293 num_encoded_record_chunks: vec![SBucket::default(); usize::from(pieces_in_sector)],
294 encoded_record_chunks_used: vec![
295 SingleRecordBitArray::default();
296 usize::from(pieces_in_sector)
297 ],
298 }
299 }
300
301 pub fn from_bytes(
306 bytes: &[u8],
307 pieces_in_sector: u16,
308 ) -> Result<Self, SectorContentsMapFromBytesError> {
309 if bytes.len() != Self::encoded_size(pieces_in_sector) {
310 return Err(SectorContentsMapFromBytesError::InvalidBytesLength {
311 expected: Self::encoded_size(pieces_in_sector),
312 actual: bytes.len(),
313 });
314 }
315
316 let (single_records_bit_arrays, expected_checksum) =
317 bytes.split_at(bytes.len() - Blake3Hash::SIZE);
318 let expected_checksum = unsafe {
320 Blake3Hash::from_bytes(expected_checksum).expect("No alignment requirements; qed")
321 };
322 let actual_checksum = Blake3Hash::from(blake3::hash(single_records_bit_arrays));
324 if &actual_checksum != expected_checksum {
325 debug!(
326 %actual_checksum,
327 %expected_checksum,
328 "Hash doesn't match, corrupted bytes"
329 );
330
331 return Err(SectorContentsMapFromBytesError::ChecksumMismatch);
332 }
333
334 let mut encoded_record_chunks_used =
335 vec![SingleRecordBitArray::default(); pieces_in_sector.into()];
336
337 let num_encoded_record_chunks = encoded_record_chunks_used
338 .iter_mut()
339 .zip(single_records_bit_arrays.array_chunks::<{ SINGLE_RECORD_BIT_ARRAY_SIZE }>())
340 .map(|(encoded_record_chunks_used, bytes)| {
341 encoded_record_chunks_used
342 .as_raw_mut_slice()
343 .copy_from_slice(bytes);
344 let num_encoded_record_chunks = encoded_record_chunks_used.count_ones();
345 if num_encoded_record_chunks > Record::NUM_CHUNKS {
346 return Err(
347 SectorContentsMapFromBytesError::InvalidEncodedRecordChunks {
348 actual: num_encoded_record_chunks,
349 max: Record::NUM_CHUNKS,
350 },
351 );
352 }
353 Ok(SBucket::try_from(num_encoded_record_chunks).expect("Verified above; qed"))
354 })
355 .collect::<Result<Vec<_>, _>>()?;
356
357 Ok(Self {
358 num_encoded_record_chunks,
359 encoded_record_chunks_used,
360 })
361 }
362
363 pub const fn encoded_size(pieces_in_sector: u16) -> usize {
366 SINGLE_RECORD_BIT_ARRAY_SIZE * pieces_in_sector as usize + Blake3Hash::SIZE
367 }
368
369 pub fn encode_into(&self, output: &mut [u8]) -> Result<(), SectorContentsMapEncodeIntoError> {
371 if output.len() != Self::encoded_size(self.encoded_record_chunks_used.len() as u16) {
372 return Err(SectorContentsMapEncodeIntoError::InvalidBytesLength {
373 expected: Self::encoded_size(self.encoded_record_chunks_used.len() as u16),
374 actual: output.len(),
375 });
376 }
377
378 let slice = self.encoded_record_chunks_used.as_slice();
379 let slice = unsafe {
381 slice::from_raw_parts(
382 slice.as_ptr() as *const u8,
383 slice.len() * SINGLE_RECORD_BIT_ARRAY_SIZE,
384 )
385 };
386
387 output[..slice.len()].copy_from_slice(slice);
389 output[slice.len()..].copy_from_slice(blake3::hash(slice).as_bytes());
390
391 Ok(())
392 }
393
394 pub fn num_encoded_record_chunks(&self) -> &[SBucket] {
396 &self.num_encoded_record_chunks
397 }
398
399 pub fn iter_record_bitfields(&self) -> &[SingleRecordBitArray] {
401 &self.encoded_record_chunks_used
402 }
403
404 pub fn iter_record_bitfields_mut(
406 &mut self,
407 ) -> impl ExactSizeIterator<Item = EncodedChunksUsed<'_>> + '_ {
408 self.encoded_record_chunks_used
409 .iter_mut()
410 .zip(&mut self.num_encoded_record_chunks)
411 .map(
412 |(encoded_record_chunks_used, num_encoded_record_chunks)| EncodedChunksUsed {
413 encoded_record_chunks_used,
414 num_encoded_record_chunks,
415 potentially_updated: false,
416 },
417 )
418 }
419
420 pub fn s_bucket_sizes(&self) -> Box<[u16; Record::NUM_S_BUCKETS]> {
422 let s_bucket_sizes = (u16::from(SBucket::ZERO)..=u16::from(SBucket::MAX))
424 .into_par_iter()
425 .map(SBucket::from)
426 .map(|s_bucket| {
427 self.iter_s_bucket_records(s_bucket)
428 .expect("S-bucket guaranteed to be in range; qed")
429 .count() as u16
430 })
431 .collect::<Box<_>>();
432
433 assert_eq!(s_bucket_sizes.len(), Record::NUM_S_BUCKETS);
434 let mut s_bucket_sizes = ManuallyDrop::new(s_bucket_sizes);
435 unsafe { Box::from_raw(s_bucket_sizes.as_mut_ptr() as *mut [u16; Record::NUM_S_BUCKETS]) }
437 }
438
439 pub fn iter_record_chunk_to_plot(
444 &self,
445 piece_offset: PieceOffset,
446 ) -> impl Iterator<Item = (SBucket, bool, usize)> + '_ {
447 (SBucket::ZERO..=SBucket::MAX)
449 .flat_map(|s_bucket| {
451 self.iter_s_bucket_records(s_bucket)
452 .expect("S-bucket guaranteed to be in range; qed")
453 .map(move |(current_piece_offset, encoded_chunk_used)| {
454 (s_bucket, current_piece_offset, encoded_chunk_used)
455 })
456 })
457 .enumerate()
460 .filter_map(
462 move |(chunk_location, (s_bucket, current_piece_offset, encoded_chunk_used))| {
463 (current_piece_offset == piece_offset).then_some((
465 s_bucket,
466 encoded_chunk_used,
467 chunk_location,
468 ))
469 },
470 )
471 .take(Record::NUM_CHUNKS)
473 }
474
475 pub fn par_iter_record_chunk_to_plot(
483 &self,
484 piece_offset: PieceOffset,
485 ) -> impl IndexedParallelIterator<Item = Option<(usize, bool)>> + '_ {
486 let piece_offset = usize::from(piece_offset);
487 (u16::from(SBucket::ZERO)..=u16::from(SBucket::MAX))
488 .into_par_iter()
489 .map(SBucket::from)
490 .map(move |s_bucket| {
492 let encoded_chunk_used = record_has_s_bucket_chunk(
493 s_bucket.into(),
494 &self.encoded_record_chunks_used[piece_offset],
495 usize::from(self.num_encoded_record_chunks[piece_offset]),
496 )?;
497
498 let chunk_offset = self
501 .encoded_record_chunks_used
502 .iter()
503 .zip(&self.num_encoded_record_chunks)
504 .take(piece_offset)
505 .filter(move |(record_bitfields, num_encoded_record_chunks)| {
506 record_has_s_bucket_chunk(
507 s_bucket.into(),
508 record_bitfields,
509 usize::from(**num_encoded_record_chunks),
510 )
511 .is_some()
512 })
513 .count();
514
515 Some((chunk_offset, encoded_chunk_used))
516 })
517 }
518
519 pub fn iter_s_bucket_records(
525 &self,
526 s_bucket: SBucket,
527 ) -> Result<impl Iterator<Item = (PieceOffset, bool)> + '_, SectorContentsMapIterationError>
528 {
529 let s_bucket = usize::from(s_bucket);
530
531 if s_bucket >= Record::NUM_S_BUCKETS {
532 return Err(SectorContentsMapIterationError::SBucketOutOfRange {
533 provided: s_bucket,
534 max: Record::NUM_S_BUCKETS,
535 });
536 }
537
538 Ok((PieceOffset::ZERO..)
539 .zip(
540 self.encoded_record_chunks_used
541 .iter()
542 .zip(&self.num_encoded_record_chunks),
543 )
544 .filter_map(
545 move |(piece_offset, (record_bitfields, num_encoded_record_chunks))| {
546 let encoded_chunk_used = record_has_s_bucket_chunk(
547 s_bucket,
548 record_bitfields,
549 usize::from(*num_encoded_record_chunks),
550 )?;
551
552 Some((piece_offset, encoded_chunk_used))
553 },
554 ))
555 }
556
557 pub fn iter_s_bucket_encoded_record_chunks_used(
563 &self,
564 s_bucket: SBucket,
565 ) -> Result<impl Iterator<Item = bool> + '_, SectorContentsMapIterationError> {
566 let s_bucket = usize::from(s_bucket);
567
568 if s_bucket >= Record::NUM_S_BUCKETS {
569 return Err(SectorContentsMapIterationError::SBucketOutOfRange {
570 provided: s_bucket,
571 max: Record::NUM_S_BUCKETS,
572 });
573 }
574
575 Ok(self
576 .encoded_record_chunks_used
577 .iter()
578 .map(move |record_bitfields| record_bitfields[s_bucket]))
579 }
580}
581
582fn record_has_s_bucket_chunk(
586 s_bucket: usize,
587 record_bitfields: &SingleRecordBitArray,
588 num_encoded_record_chunks: usize,
589) -> Option<bool> {
590 if record_bitfields[s_bucket] {
591 Some(true)
593 } else if num_encoded_record_chunks == Record::NUM_CHUNKS {
594 None
595 } else {
596 let encoded_before = record_bitfields[..s_bucket].count_ones();
598 let unencoded_before = s_bucket - encoded_before;
599 let unencoded_total = Record::NUM_CHUNKS.saturating_sub(num_encoded_record_chunks);
602
603 if unencoded_before < unencoded_total {
604 Some(false)
607 } else {
608 None
609 }
610 }
611}