1use crate::FarmerProtocolInfo;
10use crate::sector::{
11 FoundProofs, RawSector, RecordMetadata, SectorContentsMap, SectorMetadata,
12 SectorMetadataChecksummed, sector_record_chunks_size, sector_size,
13};
14use crate::segment_reconstruction::recover_missing_piece;
15use ab_core_primitives::hashes::Blake3Hash;
16use ab_core_primitives::pieces::{Piece, PieceIndex, PieceOffset, Record, RecordChunk};
17use ab_core_primitives::pos::PosSeed;
18use ab_core_primitives::sectors::{SBucket, SectorId, SectorIndex};
19use ab_core_primitives::segments::HistorySize;
20use ab_core_primitives::solutions::ShardCommitmentHash;
21use ab_data_retrieval::piece_getter::PieceGetter;
22use ab_erasure_coding::ErasureCoding;
23use ab_proof_of_space::{Table, TableGenerator};
24use async_lock::{Mutex as AsyncMutex, Semaphore};
25use backon::{ExponentialBuilder, Retryable};
26use futures::stream::FuturesUnordered;
27use futures::{StreamExt, select};
28use parity_scale_codec::{Decode, Encode};
29use parking_lot::Mutex;
30use std::collections::HashMap;
31use std::simd::Simd;
32use std::sync::Arc;
33use std::sync::atomic::{AtomicBool, Ordering};
34use std::time::Duration;
35use thiserror::Error;
36use tracing::{debug, trace, warn};
37
38const RECONSTRUCTION_CONCURRENCY_LIMIT: usize = 1;
39
40#[derive(Debug, Clone, Encode, Decode)]
42pub struct PlottedSector {
43 pub sector_id: SectorId,
45 pub sector_index: SectorIndex,
47 pub sector_metadata: SectorMetadataChecksummed,
49 pub piece_indexes: Vec<PieceIndex>,
51}
52
53#[derive(Debug, Error)]
55pub enum PlottingError {
56 #[error("Records encoder error: {error}")]
58 RecordsEncoderError {
59 error: anyhow::Error,
61 },
62 #[error("Bad sector output size: provided {provided}, expected {expected}")]
64 BadSectorOutputSize {
65 provided: usize,
67 expected: usize,
69 },
70 #[error("Can't recover missing piece {piece_index}: {error}")]
72 PieceRecoveryFailed {
73 piece_index: PieceIndex,
75 error: anyhow::Error,
77 },
78 #[error("Failed to retrieve pieces: {error}")]
80 FailedToRetrievePieces {
81 error: anyhow::Error,
83 },
84 #[error("Abort early")]
86 AbortEarly,
87}
88
89#[derive(Debug)]
95pub struct PlotSectorOptions<'a, RE, PG> {
96 pub public_key_hash: &'a Blake3Hash,
98 pub shard_commitments_root: &'a ShardCommitmentHash,
100 pub sector_index: SectorIndex,
102 pub piece_getter: &'a PG,
104 pub farmer_protocol_info: FarmerProtocolInfo,
106 pub erasure_coding: &'a ErasureCoding,
108 pub pieces_in_sector: u16,
110 pub sector_output: &'a mut Vec<u8>,
113 pub downloading_semaphore: Option<Arc<Semaphore>>,
116 pub encoding_semaphore: Option<&'a Semaphore>,
119 pub records_encoder: &'a mut RE,
121 pub abort_early: &'a AtomicBool,
123}
124
125pub async fn plot_sector<RE, PG>(
132 options: PlotSectorOptions<'_, RE, PG>,
133) -> Result<PlottedSector, PlottingError>
134where
135 RE: RecordsEncoder,
136 PG: PieceGetter + Send + Sync,
137{
138 let PlotSectorOptions {
139 public_key_hash,
140 shard_commitments_root,
141 sector_index,
142 piece_getter,
143 farmer_protocol_info,
144 erasure_coding,
145 pieces_in_sector,
146 sector_output,
147 downloading_semaphore,
148 encoding_semaphore,
149 records_encoder,
150 abort_early,
151 } = options;
152
153 let _downloading_permit = match downloading_semaphore {
154 Some(downloading_semaphore) => Some(downloading_semaphore.acquire_arc().await),
155 None => None,
156 };
157
158 let download_sector_fut = download_sector(DownloadSectorOptions {
159 public_key_hash,
160 shard_commitments_root,
161 sector_index,
162 piece_getter,
163 farmer_protocol_info,
164 erasure_coding,
165 pieces_in_sector,
166 });
167
168 let _encoding_permit = match encoding_semaphore {
169 Some(encoding_semaphore) => Some(encoding_semaphore.acquire().await),
170 None => None,
171 };
172
173 let encoded_sector = encode_sector(
174 download_sector_fut.await?,
175 EncodeSectorOptions::<RE> {
176 sector_index,
177 records_encoder,
178 abort_early,
179 },
180 )?;
181
182 if abort_early.load(Ordering::Acquire) {
183 return Err(PlottingError::AbortEarly);
184 }
185
186 write_sector(&encoded_sector, sector_output)?;
187
188 Ok(encoded_sector.plotted_sector)
189}
190
191#[derive(Debug)]
193pub struct DownloadedSector {
194 sector_id: SectorId,
195 piece_indices: Vec<PieceIndex>,
196 raw_sector: RawSector,
197 history_size: HistorySize,
198}
199
200#[derive(Debug)]
202pub struct DownloadSectorOptions<'a, PG> {
203 pub public_key_hash: &'a Blake3Hash,
205 pub shard_commitments_root: &'a ShardCommitmentHash,
207 pub sector_index: SectorIndex,
209 pub piece_getter: &'a PG,
211 pub farmer_protocol_info: FarmerProtocolInfo,
213 pub erasure_coding: &'a ErasureCoding,
215 pub pieces_in_sector: u16,
217}
218
219pub async fn download_sector<PG>(
224 options: DownloadSectorOptions<'_, PG>,
225) -> Result<DownloadedSector, PlottingError>
226where
227 PG: PieceGetter + Send + Sync,
228{
229 let DownloadSectorOptions {
230 public_key_hash,
231 shard_commitments_root,
232 sector_index,
233 piece_getter,
234 farmer_protocol_info,
235 erasure_coding,
236 pieces_in_sector,
237 } = options;
238
239 let sector_id = SectorId::new(
240 public_key_hash,
241 shard_commitments_root,
242 sector_index,
243 farmer_protocol_info.history_size,
244 );
245
246 let piece_indices = (PieceOffset::ZERO..)
247 .take(pieces_in_sector.into())
248 .map(|piece_offset| {
249 sector_id.derive_piece_index(
250 piece_offset,
251 farmer_protocol_info.history_size,
252 farmer_protocol_info.max_pieces_in_sector,
253 farmer_protocol_info.recent_segments,
254 farmer_protocol_info.recent_history_fraction,
255 )
256 })
257 .collect::<Vec<_>>();
258
259 let raw_sector = {
260 let mut raw_sector = RawSector::new(pieces_in_sector);
261 let mut pieces_to_download =
262 HashMap::<PieceIndex, Vec<_>>::with_capacity(usize::from(pieces_in_sector));
263 for (piece_index, (record, metadata)) in piece_indices
264 .iter()
265 .copied()
266 .zip(raw_sector.records.iter_mut().zip(&mut raw_sector.metadata))
267 {
268 pieces_to_download
269 .entry(piece_index)
270 .or_default()
271 .push((record, metadata));
272 }
273 let pieces_to_download = AsyncMutex::new(pieces_to_download);
275
276 (|| async {
277 let mut pieces_to_download = pieces_to_download.lock().await;
278
279 if let Err(error) =
280 download_sector_internal(&mut pieces_to_download, piece_getter, erasure_coding)
281 .await
282 {
283 warn!(
284 %sector_index,
285 %error,
286 %pieces_in_sector,
287 remaining_pieces = %pieces_to_download.len(),
288 "Sector downloading attempt failed, will retry later"
289 );
290
291 return Err(error);
292 }
293
294 debug!(%sector_index, "Sector downloaded successfully");
295
296 Ok(())
297 })
298 .retry(
299 ExponentialBuilder::default()
300 .with_min_delay(Duration::from_secs(15))
301 .with_max_delay(Duration::from_mins(10))
302 .without_max_times(),
304 )
305 .await?;
306
307 raw_sector
308 };
309
310 Ok(DownloadedSector {
311 sector_id,
312 piece_indices,
313 raw_sector,
314 history_size: farmer_protocol_info.history_size,
315 })
316}
317
318pub trait RecordsEncoder {
320 fn encode_records(
322 &mut self,
323 sector_id: &SectorId,
324 records: &mut [Record],
325 abort_early: &AtomicBool,
326 ) -> anyhow::Result<SectorContentsMap>;
327}
328
329#[derive(Debug)]
331pub struct CpuRecordsEncoder<'a, PosTable>
332where
333 PosTable: Table,
334{
335 table_generators: &'a [PosTable::Generator],
336 erasure_coding: &'a ErasureCoding,
337 global_mutex: &'a AsyncMutex<()>,
338}
339
340impl<PosTable> RecordsEncoder for CpuRecordsEncoder<'_, PosTable>
341where
342 PosTable: Table,
343{
344 fn encode_records(
345 &mut self,
346 sector_id: &SectorId,
347 records: &mut [Record],
348 abort_early: &AtomicBool,
349 ) -> anyhow::Result<SectorContentsMap> {
350 if self.table_generators.is_empty() {
351 return Err(anyhow::anyhow!("No table generators"));
352 }
353
354 let pieces_in_sector = records
355 .len()
356 .try_into()
357 .map_err(|error| anyhow::anyhow!("Failed to convert pieces in sector: {error}"))?;
358 let mut sector_contents_map = SectorContentsMap::new(pieces_in_sector);
359
360 {
361 let global_mutex = self.global_mutex;
362 let erasure_coding = self.erasure_coding;
363
364 let iter = Mutex::new(
365 (PieceOffset::ZERO..)
366 .zip(records.iter_mut())
367 .zip(sector_contents_map.iter_record_chunks_used_mut()),
368 );
369
370 rayon::scope(|scope| {
371 for table_generator in self.table_generators {
372 scope.spawn(|_scope| {
373 loop {
374 global_mutex.lock_blocking();
376
377 let Some(((piece_offset, record), record_chunks_used)) =
380 iter.lock().next()
381 else {
382 return;
383 };
384 let pos_seed = sector_id.derive_evaluation_seed(piece_offset);
385
386 record_encoding::<PosTable>(
387 &pos_seed,
388 record,
389 record_chunks_used,
390 table_generator,
391 erasure_coding,
392 );
393
394 if abort_early.load(Ordering::Relaxed) {
395 return;
396 }
397 }
398 });
399 }
400 });
401 }
402
403 Ok(sector_contents_map)
404 }
405}
406
407impl<'a, PosTable> CpuRecordsEncoder<'a, PosTable>
408where
409 PosTable: Table,
410{
411 pub fn new(
413 table_generators: &'a [PosTable::Generator],
414 erasure_coding: &'a ErasureCoding,
415 global_mutex: &'a AsyncMutex<()>,
416 ) -> Self {
417 Self {
418 table_generators,
419 erasure_coding,
420 global_mutex,
421 }
422 }
423}
424
425#[derive(Debug)]
431pub struct EncodeSectorOptions<'a, RE>
432where
433 RE: RecordsEncoder,
434{
435 pub sector_index: SectorIndex,
437 pub records_encoder: &'a mut RE,
439 pub abort_early: &'a AtomicBool,
441}
442
443#[derive(Debug)]
445#[expect(
446 clippy::partial_pub_fields,
447 reason = "Intentionally hiding some fields such that they can only be used by internal APIs"
448)]
449pub struct EncodedSector {
450 pub plotted_sector: PlottedSector,
452 raw_sector: RawSector,
453 sector_contents_map: SectorContentsMap,
454}
455
456pub fn encode_sector<RE>(
461 downloaded_sector: DownloadedSector,
462 encoding_options: EncodeSectorOptions<'_, RE>,
463) -> Result<EncodedSector, PlottingError>
464where
465 RE: RecordsEncoder,
466{
467 let DownloadedSector {
468 sector_id,
469 piece_indices,
470 mut raw_sector,
471 history_size,
472 } = downloaded_sector;
473 let EncodeSectorOptions {
474 sector_index,
475 records_encoder,
476 abort_early,
477 } = encoding_options;
478
479 let pieces_in_sector = raw_sector.records.len().try_into().expect(
480 "Raw sector can only be created in this crate and it is always done correctly; qed",
481 );
482
483 let sector_contents_map = records_encoder
484 .encode_records(§or_id, &mut raw_sector.records, abort_early)
485 .map_err(|error| PlottingError::RecordsEncoderError { error })?;
486
487 let sector_metadata = SectorMetadataChecksummed::from(SectorMetadata {
488 sector_index,
489 pieces_in_sector,
490 s_bucket_sizes: sector_contents_map.s_bucket_sizes(),
491 history_size,
492 });
493
494 Ok(EncodedSector {
495 plotted_sector: PlottedSector {
496 sector_id,
497 sector_index,
498 sector_metadata,
499 piece_indexes: piece_indices,
500 },
501 raw_sector,
502 sector_contents_map,
503 })
504}
505
506pub fn write_sector(
508 encoded_sector: &EncodedSector,
509 sector_output: &mut Vec<u8>,
510) -> Result<(), PlottingError> {
511 let EncodedSector {
512 plotted_sector: _,
513 raw_sector,
514 sector_contents_map,
515 } = encoded_sector;
516
517 let pieces_in_sector = raw_sector.records.len().try_into().expect(
518 "Raw sector can only be created in this crate and it is always done correctly; qed",
519 );
520
521 let sector_size = sector_size(pieces_in_sector);
522
523 if !sector_output.is_empty() && sector_output.len() != sector_size {
524 return Err(PlottingError::BadSectorOutputSize {
525 provided: sector_output.len(),
526 expected: sector_size,
527 });
528 }
529
530 sector_output.resize(sector_size, 0);
531
532 {
538 let (sector_contents_map_region, remaining_bytes) =
539 sector_output.split_at_mut(SectorContentsMap::encoded_size(pieces_in_sector));
540 let (s_buckets_region, metadata_region) =
542 remaining_bytes.split_at_mut(sector_record_chunks_size(pieces_in_sector));
543
544 sector_contents_map
546 .encode_into(sector_contents_map_region)
547 .expect("Chunked into correct size above; qed");
548
549 let mut next_record_chunks_offset = vec![0usize; pieces_in_sector.into()];
550 for (piece_offset, output) in (SBucket::ZERO..=SBucket::MAX)
552 .flat_map(|s_bucket| {
553 sector_contents_map
554 .iter_s_bucket_piece_offsets(s_bucket)
555 .expect("S-bucket guaranteed to be in range; qed")
556 })
557 .zip(s_buckets_region.as_chunks_mut::<{ RecordChunk::SIZE }>().0)
558 {
559 let next_record_chunks_offset =
560 &mut next_record_chunks_offset[usize::from(piece_offset)];
561
562 let chunk_position = *next_record_chunks_offset;
563 *next_record_chunks_offset += 1;
564 output.copy_from_slice(&raw_sector.records[usize::from(piece_offset)][chunk_position]);
565 }
566
567 let metadata_chunks = metadata_region
568 .as_chunks_mut::<{ RecordMetadata::encoded_size() }>()
569 .0;
570 for (record_metadata, output) in raw_sector.metadata.iter().zip(metadata_chunks) {
571 record_metadata.encode_to(&mut output.as_mut_slice());
572 }
573
574 let (sector_contents, sector_checksum) =
577 sector_output.split_at_mut(sector_size - Blake3Hash::SIZE);
578 sector_checksum.copy_from_slice(
579 {
580 let mut hasher = blake3::Hasher::new();
581 hasher.update_rayon(sector_contents);
582 hasher.finalize()
583 }
584 .as_bytes(),
585 );
586 }
587
588 Ok(())
589}
590
591fn record_encoding<PosTable>(
592 pos_seed: &PosSeed,
593 record: &mut Record,
594 record_chunks_used: &mut FoundProofs,
595 table_generator: &PosTable::Generator,
596 erasure_coding: &ErasureCoding,
597) where
598 PosTable: Table,
599{
600 let pos_proofs = table_generator.create_proofs_parallel(pos_seed);
601
602 let mut parity_record_chunks = Record::new_boxed();
603
604 erasure_coding
606 .extend(record.iter(), parity_record_chunks.iter_mut())
607 .expect("Statically guaranteed valid inputs; qed");
608
609 *record_chunks_used = pos_proofs.found_proofs;
610
611 let mut num_found_proofs = 0_usize;
613 for (s_buckets, found_proofs) in (0..Record::NUM_S_BUCKETS)
614 .array_chunks::<{ u8::BITS as usize }>()
615 .zip(pos_proofs.found_proofs)
616 {
617 for (proof_offset, s_bucket) in s_buckets.into_iter().enumerate() {
618 if (found_proofs & (1 << proof_offset)) != 0 {
619 let record_chunk = if s_bucket < Record::NUM_CHUNKS {
620 record[s_bucket]
621 } else {
622 parity_record_chunks[s_bucket - Record::NUM_CHUNKS]
623 };
624 record[num_found_proofs] = (Simd::from(record_chunk)
626 ^ Simd::from(*pos_proofs.proofs[num_found_proofs].hash()))
627 .to_array();
628 num_found_proofs += 1;
629 }
630 }
631 }
632}
633
634async fn download_sector_internal<PG>(
635 pieces_to_download: &mut HashMap<PieceIndex, Vec<(&mut Record, &mut RecordMetadata)>>,
636 piece_getter: &PG,
637 erasure_coding: &ErasureCoding,
638) -> Result<(), PlottingError>
639where
640 PG: PieceGetter + Send + Sync,
641{
642 let recovery_semaphore = &Semaphore::new(RECONSTRUCTION_CONCURRENCY_LIMIT);
645
646 let piece_indices = pieces_to_download.keys().copied().collect::<Vec<_>>();
648 let mut downloaded_pieces = piece_getter
649 .get_pieces(piece_indices)
650 .await
651 .map_err(|error| PlottingError::FailedToRetrievePieces { error })?
652 .fuse();
653 let mut reconstructed_pieces = FuturesUnordered::new();
654
655 let mut final_result = Ok(());
656
657 loop {
658 let (piece_index, result) = select! {
659 (piece_index, result) = downloaded_pieces.select_next_some() => {
660 match result {
661 Ok(Some(piece)) => (piece_index, Ok(piece)),
662 Ok(None) => {
663 trace!(%piece_index, "Piece was not found, trying reconstruction");
664
665 reconstructed_pieces.push(reconstruct_piece(
666 piece_index,
667 recovery_semaphore,
668 piece_getter,
669 erasure_coding,
670 ));
671 continue;
672 }
673 Err(error) => {
674 trace!(
675 %error,
676 %piece_index,
677 "Failed to download piece, trying reconstruction"
678 );
679
680 reconstructed_pieces.push(reconstruct_piece(
681 piece_index,
682 recovery_semaphore,
683 piece_getter,
684 erasure_coding,
685 ));
686 continue;
687 }
688 }
689 },
690 (piece_index, result) = reconstructed_pieces.select_next_some() => {
691 (piece_index, result)
692 },
693 complete => {
694 break;
695 }
696 };
697
698 match result {
699 Ok(piece) => {
700 process_piece(piece_index, piece, pieces_to_download);
701 }
702 Err(error) => {
703 trace!(%error, %piece_index, "Failed to download piece");
704
705 if final_result.is_ok() {
706 final_result = Err(error);
707 }
708 }
709 }
710 }
711
712 if final_result.is_ok() && !pieces_to_download.is_empty() {
713 return Err(PlottingError::FailedToRetrievePieces {
714 error: anyhow::anyhow!(
715 "Successful result, but not all pieces were downloaded, this is likely a piece \
716 getter implementation bug"
717 ),
718 });
719 }
720
721 final_result
722}
723
724async fn reconstruct_piece<PG>(
725 piece_index: PieceIndex,
726 recovery_semaphore: &Semaphore,
727 piece_getter: &PG,
728 erasure_coding: &ErasureCoding,
729) -> (PieceIndex, Result<Piece, PlottingError>)
730where
731 PG: PieceGetter + Send + Sync,
732{
733 let _permit = recovery_semaphore.acquire().await;
734 let recovered_piece_fut =
735 recover_missing_piece(piece_getter, erasure_coding.clone(), piece_index);
736
737 (
738 piece_index,
739 recovered_piece_fut
740 .await
741 .map_err(|error| PlottingError::PieceRecoveryFailed {
742 piece_index,
743 error: error.into(),
744 }),
745 )
746}
747
748fn process_piece(
749 piece_index: PieceIndex,
750 piece: Piece,
751 pieces_to_download: &mut HashMap<PieceIndex, Vec<(&mut Record, &mut RecordMetadata)>>,
752) {
753 for (record, metadata) in pieces_to_download.remove(&piece_index).unwrap_or_default() {
754 *metadata = RecordMetadata {
755 piece_header: piece.header,
756 piece_checksum: blake3::hash(piece.as_ref()).into(),
757 };
758 record.copy_from_slice(&*piece.record);
761 }
762}