1use crate::FarmerProtocolInfo;
10use crate::sector::{
11 FoundProofs, RawSector, RecordMetadata, SectorContentsMap, SectorMetadata,
12 SectorMetadataChecksummed, sector_record_chunks_size, sector_size,
13};
14use crate::segment_reconstruction::recover_missing_piece;
15use ab_core_primitives::hashes::Blake3Hash;
16use ab_core_primitives::pieces::{Piece, PieceIndex, PieceOffset, Record, RecordChunk};
17use ab_core_primitives::pos::PosSeed;
18use ab_core_primitives::sectors::{SBucket, SectorId, SectorIndex};
19use ab_core_primitives::segments::HistorySize;
20use ab_core_primitives::solutions::ShardCommitmentHash;
21use ab_data_retrieval::piece_getter::PieceGetter;
22use ab_erasure_coding::ErasureCoding;
23use ab_proof_of_space::{Table, TableGenerator};
24use async_lock::{Mutex as AsyncMutex, Semaphore};
25use backoff::future::retry;
26use backoff::{Error as BackoffError, ExponentialBackoff};
27use futures::stream::FuturesUnordered;
28use futures::{StreamExt, select};
29use parity_scale_codec::{Decode, Encode};
30use parking_lot::Mutex;
31use std::collections::HashMap;
32use std::simd::Simd;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, Ordering};
35use std::time::Duration;
36use thiserror::Error;
37use tracing::{debug, trace, warn};
38
39const RECONSTRUCTION_CONCURRENCY_LIMIT: usize = 1;
40
41fn default_backoff() -> ExponentialBackoff {
42 ExponentialBackoff {
43 initial_interval: Duration::from_secs(15),
44 max_interval: Duration::from_secs(10 * 60),
45 max_elapsed_time: None,
47 ..ExponentialBackoff::default()
48 }
49}
50
51#[derive(Debug, Clone, Encode, Decode)]
53pub struct PlottedSector {
54 pub sector_id: SectorId,
56 pub sector_index: SectorIndex,
58 pub sector_metadata: SectorMetadataChecksummed,
60 pub piece_indexes: Vec<PieceIndex>,
62}
63
64#[derive(Debug, Error)]
66pub enum PlottingError {
67 #[error("Records encoder error: {error}")]
69 RecordsEncoderError {
70 error: anyhow::Error,
72 },
73 #[error("Bad sector output size: provided {provided}, expected {expected}")]
75 BadSectorOutputSize {
76 provided: usize,
78 expected: usize,
80 },
81 #[error("Can't recover missing piece {piece_index}: {error}")]
83 PieceRecoveryFailed {
84 piece_index: PieceIndex,
86 error: anyhow::Error,
88 },
89 #[error("Failed to retrieve pieces: {error}")]
91 FailedToRetrievePieces {
92 error: anyhow::Error,
94 },
95 #[error("Abort early")]
97 AbortEarly,
98}
99
100#[derive(Debug)]
106pub struct PlotSectorOptions<'a, RE, PG> {
107 pub public_key_hash: &'a Blake3Hash,
109 pub shard_commitments_root: &'a ShardCommitmentHash,
111 pub sector_index: SectorIndex,
113 pub piece_getter: &'a PG,
115 pub farmer_protocol_info: FarmerProtocolInfo,
117 pub erasure_coding: &'a ErasureCoding,
119 pub pieces_in_sector: u16,
121 pub sector_output: &'a mut Vec<u8>,
124 pub downloading_semaphore: Option<Arc<Semaphore>>,
127 pub encoding_semaphore: Option<&'a Semaphore>,
130 pub records_encoder: &'a mut RE,
132 pub abort_early: &'a AtomicBool,
134}
135
136pub async fn plot_sector<RE, PG>(
143 options: PlotSectorOptions<'_, RE, PG>,
144) -> Result<PlottedSector, PlottingError>
145where
146 RE: RecordsEncoder,
147 PG: PieceGetter + Send + Sync,
148{
149 let PlotSectorOptions {
150 public_key_hash,
151 shard_commitments_root,
152 sector_index,
153 piece_getter,
154 farmer_protocol_info,
155 erasure_coding,
156 pieces_in_sector,
157 sector_output,
158 downloading_semaphore,
159 encoding_semaphore,
160 records_encoder,
161 abort_early,
162 } = options;
163
164 let _downloading_permit = match downloading_semaphore {
165 Some(downloading_semaphore) => Some(downloading_semaphore.acquire_arc().await),
166 None => None,
167 };
168
169 let download_sector_fut = download_sector(DownloadSectorOptions {
170 public_key_hash,
171 shard_commitments_root,
172 sector_index,
173 piece_getter,
174 farmer_protocol_info,
175 erasure_coding,
176 pieces_in_sector,
177 });
178
179 let _encoding_permit = match encoding_semaphore {
180 Some(encoding_semaphore) => Some(encoding_semaphore.acquire().await),
181 None => None,
182 };
183
184 let encoded_sector = encode_sector(
185 download_sector_fut.await?,
186 EncodeSectorOptions::<RE> {
187 sector_index,
188 records_encoder,
189 abort_early,
190 },
191 )?;
192
193 if abort_early.load(Ordering::Acquire) {
194 return Err(PlottingError::AbortEarly);
195 }
196
197 write_sector(&encoded_sector, sector_output)?;
198
199 Ok(encoded_sector.plotted_sector)
200}
201
202#[derive(Debug)]
204pub struct DownloadedSector {
205 sector_id: SectorId,
206 piece_indices: Vec<PieceIndex>,
207 raw_sector: RawSector,
208 history_size: HistorySize,
209}
210
211#[derive(Debug)]
213pub struct DownloadSectorOptions<'a, PG> {
214 pub public_key_hash: &'a Blake3Hash,
216 pub shard_commitments_root: &'a ShardCommitmentHash,
218 pub sector_index: SectorIndex,
220 pub piece_getter: &'a PG,
222 pub farmer_protocol_info: FarmerProtocolInfo,
224 pub erasure_coding: &'a ErasureCoding,
226 pub pieces_in_sector: u16,
228}
229
230pub async fn download_sector<PG>(
235 options: DownloadSectorOptions<'_, PG>,
236) -> Result<DownloadedSector, PlottingError>
237where
238 PG: PieceGetter + Send + Sync,
239{
240 let DownloadSectorOptions {
241 public_key_hash,
242 shard_commitments_root,
243 sector_index,
244 piece_getter,
245 farmer_protocol_info,
246 erasure_coding,
247 pieces_in_sector,
248 } = options;
249
250 let sector_id = SectorId::new(
251 public_key_hash,
252 shard_commitments_root,
253 sector_index,
254 farmer_protocol_info.history_size,
255 );
256
257 let piece_indices = (PieceOffset::ZERO..)
258 .take(pieces_in_sector.into())
259 .map(|piece_offset| {
260 sector_id.derive_piece_index(
261 piece_offset,
262 farmer_protocol_info.history_size,
263 farmer_protocol_info.max_pieces_in_sector,
264 farmer_protocol_info.recent_segments,
265 farmer_protocol_info.recent_history_fraction,
266 )
267 })
268 .collect::<Vec<_>>();
269
270 let raw_sector = {
271 let mut raw_sector = RawSector::new(pieces_in_sector);
272 let mut pieces_to_download =
273 HashMap::<PieceIndex, Vec<_>>::with_capacity(usize::from(pieces_in_sector));
274 for (piece_index, (record, metadata)) in piece_indices
275 .iter()
276 .copied()
277 .zip(raw_sector.records.iter_mut().zip(&mut raw_sector.metadata))
278 {
279 pieces_to_download
280 .entry(piece_index)
281 .or_default()
282 .push((record, metadata));
283 }
284 let pieces_to_download = AsyncMutex::new(pieces_to_download);
286
287 retry(default_backoff(), || async {
288 let mut pieces_to_download = pieces_to_download.lock().await;
289
290 if let Err(error) =
291 download_sector_internal(&mut pieces_to_download, piece_getter, erasure_coding)
292 .await
293 {
294 warn!(
295 %sector_index,
296 %error,
297 %pieces_in_sector,
298 remaining_pieces = %pieces_to_download.len(),
299 "Sector downloading attempt failed, will retry later"
300 );
301
302 return Err(BackoffError::transient(error));
303 }
304
305 debug!(%sector_index, "Sector downloaded successfully");
306
307 Ok(())
308 })
309 .await?;
310
311 raw_sector
312 };
313
314 Ok(DownloadedSector {
315 sector_id,
316 piece_indices,
317 raw_sector,
318 history_size: farmer_protocol_info.history_size,
319 })
320}
321
322pub trait RecordsEncoder {
324 fn encode_records(
326 &mut self,
327 sector_id: &SectorId,
328 records: &mut [Record],
329 abort_early: &AtomicBool,
330 ) -> anyhow::Result<SectorContentsMap>;
331}
332
333#[derive(Debug)]
335pub struct CpuRecordsEncoder<'a, PosTable>
336where
337 PosTable: Table,
338{
339 table_generators: &'a [PosTable::Generator],
340 erasure_coding: &'a ErasureCoding,
341 global_mutex: &'a AsyncMutex<()>,
342}
343
344impl<PosTable> RecordsEncoder for CpuRecordsEncoder<'_, PosTable>
345where
346 PosTable: Table,
347{
348 fn encode_records(
349 &mut self,
350 sector_id: &SectorId,
351 records: &mut [Record],
352 abort_early: &AtomicBool,
353 ) -> anyhow::Result<SectorContentsMap> {
354 if self.table_generators.is_empty() {
355 return Err(anyhow::anyhow!("No table generators"));
356 }
357
358 let pieces_in_sector = records
359 .len()
360 .try_into()
361 .map_err(|error| anyhow::anyhow!("Failed to convert pieces in sector: {error}"))?;
362 let mut sector_contents_map = SectorContentsMap::new(pieces_in_sector);
363
364 {
365 let global_mutex = self.global_mutex;
366 let erasure_coding = self.erasure_coding;
367
368 let iter = Mutex::new(
369 (PieceOffset::ZERO..)
370 .zip(records.iter_mut())
371 .zip(sector_contents_map.iter_record_chunks_used_mut()),
372 );
373
374 rayon::scope(|scope| {
375 for table_generator in self.table_generators {
376 scope.spawn(|_scope| {
377 loop {
378 global_mutex.lock_blocking();
380
381 let Some(((piece_offset, record), record_chunks_used)) =
384 iter.lock().next()
385 else {
386 return;
387 };
388 let pos_seed = sector_id.derive_evaluation_seed(piece_offset);
389
390 record_encoding::<PosTable>(
391 &pos_seed,
392 record,
393 record_chunks_used,
394 table_generator,
395 erasure_coding,
396 );
397
398 if abort_early.load(Ordering::Relaxed) {
399 return;
400 }
401 }
402 });
403 }
404 });
405 }
406
407 Ok(sector_contents_map)
408 }
409}
410
411impl<'a, PosTable> CpuRecordsEncoder<'a, PosTable>
412where
413 PosTable: Table,
414{
415 pub fn new(
417 table_generators: &'a [PosTable::Generator],
418 erasure_coding: &'a ErasureCoding,
419 global_mutex: &'a AsyncMutex<()>,
420 ) -> Self {
421 Self {
422 table_generators,
423 erasure_coding,
424 global_mutex,
425 }
426 }
427}
428
429#[derive(Debug)]
435pub struct EncodeSectorOptions<'a, RE>
436where
437 RE: RecordsEncoder,
438{
439 pub sector_index: SectorIndex,
441 pub records_encoder: &'a mut RE,
443 pub abort_early: &'a AtomicBool,
445}
446
447#[derive(Debug)]
449pub struct EncodedSector {
450 pub plotted_sector: PlottedSector,
452 raw_sector: RawSector,
453 sector_contents_map: SectorContentsMap,
454}
455
456pub fn encode_sector<RE>(
461 downloaded_sector: DownloadedSector,
462 encoding_options: EncodeSectorOptions<'_, RE>,
463) -> Result<EncodedSector, PlottingError>
464where
465 RE: RecordsEncoder,
466{
467 let DownloadedSector {
468 sector_id,
469 piece_indices,
470 mut raw_sector,
471 history_size,
472 } = downloaded_sector;
473 let EncodeSectorOptions {
474 sector_index,
475 records_encoder,
476 abort_early,
477 } = encoding_options;
478
479 let pieces_in_sector = raw_sector.records.len().try_into().expect(
480 "Raw sector can only be created in this crate and it is always done correctly; qed",
481 );
482
483 let sector_contents_map = records_encoder
484 .encode_records(§or_id, &mut raw_sector.records, abort_early)
485 .map_err(|error| PlottingError::RecordsEncoderError { error })?;
486
487 let sector_metadata = SectorMetadataChecksummed::from(SectorMetadata {
488 sector_index,
489 pieces_in_sector,
490 s_bucket_sizes: sector_contents_map.s_bucket_sizes(),
491 history_size,
492 });
493
494 Ok(EncodedSector {
495 plotted_sector: PlottedSector {
496 sector_id,
497 sector_index,
498 sector_metadata,
499 piece_indexes: piece_indices,
500 },
501 raw_sector,
502 sector_contents_map,
503 })
504}
505
506pub fn write_sector(
508 encoded_sector: &EncodedSector,
509 sector_output: &mut Vec<u8>,
510) -> Result<(), PlottingError> {
511 let EncodedSector {
512 plotted_sector: _,
513 raw_sector,
514 sector_contents_map,
515 } = encoded_sector;
516
517 let pieces_in_sector = raw_sector.records.len().try_into().expect(
518 "Raw sector can only be created in this crate and it is always done correctly; qed",
519 );
520
521 let sector_size = sector_size(pieces_in_sector);
522
523 if !sector_output.is_empty() && sector_output.len() != sector_size {
524 return Err(PlottingError::BadSectorOutputSize {
525 provided: sector_output.len(),
526 expected: sector_size,
527 });
528 }
529
530 sector_output.resize(sector_size, 0);
531
532 {
538 let (sector_contents_map_region, remaining_bytes) =
539 sector_output.split_at_mut(SectorContentsMap::encoded_size(pieces_in_sector));
540 let (s_buckets_region, metadata_region) =
542 remaining_bytes.split_at_mut(sector_record_chunks_size(pieces_in_sector));
543
544 sector_contents_map
546 .encode_into(sector_contents_map_region)
547 .expect("Chunked into correct size above; qed");
548
549 let mut next_record_chunks_offset = vec![0_usize; pieces_in_sector.into()];
550 for (piece_offset, output) in (SBucket::ZERO..=SBucket::MAX)
552 .flat_map(|s_bucket| {
553 sector_contents_map
554 .iter_s_bucket_piece_offsets(s_bucket)
555 .expect("S-bucket guaranteed to be in range; qed")
556 })
557 .zip(s_buckets_region.as_chunks_mut::<{ RecordChunk::SIZE }>().0)
558 {
559 let next_record_chunks_offset =
560 &mut next_record_chunks_offset[usize::from(piece_offset)];
561
562 let chunk_position = *next_record_chunks_offset;
563 *next_record_chunks_offset += 1;
564 output.copy_from_slice(&raw_sector.records[usize::from(piece_offset)][chunk_position]);
565 }
566
567 let metadata_chunks = metadata_region
568 .as_chunks_mut::<{ RecordMetadata::encoded_size() }>()
569 .0;
570 for (record_metadata, output) in raw_sector.metadata.iter().zip(metadata_chunks) {
571 record_metadata.encode_to(&mut output.as_mut_slice());
572 }
573
574 let (sector_contents, sector_checksum) =
577 sector_output.split_at_mut(sector_size - Blake3Hash::SIZE);
578 sector_checksum.copy_from_slice(
579 {
580 let mut hasher = blake3::Hasher::new();
581 hasher.update_rayon(sector_contents);
582 hasher.finalize()
583 }
584 .as_bytes(),
585 );
586 }
587
588 Ok(())
589}
590
591fn record_encoding<PosTable>(
592 pos_seed: &PosSeed,
593 record: &mut Record,
594 record_chunks_used: &mut FoundProofs,
595 table_generator: &PosTable::Generator,
596 erasure_coding: &ErasureCoding,
597) where
598 PosTable: Table,
599{
600 let pos_proofs = table_generator.create_proofs_parallel(pos_seed);
601
602 let mut parity_record_chunks = Record::new_boxed();
603
604 erasure_coding
606 .extend(record.iter(), parity_record_chunks.iter_mut())
607 .expect("Statically guaranteed valid inputs; qed");
608
609 *record_chunks_used = pos_proofs.found_proofs;
610
611 let mut num_found_proofs = 0_usize;
613 for (s_buckets, found_proofs) in (0..Record::NUM_S_BUCKETS)
614 .array_chunks::<{ u8::BITS as usize }>()
615 .zip(pos_proofs.found_proofs)
616 {
617 for (proof_offset, s_bucket) in s_buckets.into_iter().enumerate() {
618 if (found_proofs & (1 << proof_offset)) != 0 {
619 let record_chunk = if s_bucket < Record::NUM_CHUNKS {
620 record[s_bucket]
621 } else {
622 parity_record_chunks[s_bucket - Record::NUM_CHUNKS]
623 };
624 record[num_found_proofs] = (Simd::from(record_chunk)
626 ^ Simd::from(*pos_proofs.proofs[num_found_proofs].hash()))
627 .to_array();
628 num_found_proofs += 1;
629 }
630 }
631 }
632}
633
634async fn download_sector_internal<PG>(
635 pieces_to_download: &mut HashMap<PieceIndex, Vec<(&mut Record, &mut RecordMetadata)>>,
636 piece_getter: &PG,
637 erasure_coding: &ErasureCoding,
638) -> Result<(), PlottingError>
639where
640 PG: PieceGetter + Send + Sync,
641{
642 let recovery_semaphore = &Semaphore::new(RECONSTRUCTION_CONCURRENCY_LIMIT);
645
646 let piece_indices = pieces_to_download.keys().copied().collect::<Vec<_>>();
648 let mut downloaded_pieces = piece_getter
649 .get_pieces(piece_indices)
650 .await
651 .map_err(|error| PlottingError::FailedToRetrievePieces { error })?
652 .fuse();
653 let mut reconstructed_pieces = FuturesUnordered::new();
654
655 let mut final_result = Ok(());
656
657 loop {
658 let (piece_index, result) = select! {
659 (piece_index, result) = downloaded_pieces.select_next_some() => {
660 match result {
661 Ok(Some(piece)) => (piece_index, Ok(piece)),
662 Ok(None) => {
663 trace!(%piece_index, "Piece was not found, trying reconstruction");
664
665 reconstructed_pieces.push(reconstruct_piece(
666 piece_index,
667 recovery_semaphore,
668 piece_getter,
669 erasure_coding,
670 ));
671 continue;
672 }
673 Err(error) => {
674 trace!(
675 %error,
676 %piece_index,
677 "Failed to download piece, trying reconstruction"
678 );
679
680 reconstructed_pieces.push(reconstruct_piece(
681 piece_index,
682 recovery_semaphore,
683 piece_getter,
684 erasure_coding,
685 ));
686 continue;
687 }
688 }
689 },
690 (piece_index, result) = reconstructed_pieces.select_next_some() => {
691 (piece_index, result)
692 },
693 complete => {
694 break;
695 }
696 };
697
698 match result {
699 Ok(piece) => {
700 process_piece(piece_index, piece, pieces_to_download);
701 }
702 Err(error) => {
703 trace!(%error, %piece_index, "Failed to download piece");
704
705 if final_result.is_ok() {
706 final_result = Err(error);
707 }
708 }
709 }
710 }
711
712 if final_result.is_ok() && !pieces_to_download.is_empty() {
713 return Err(PlottingError::FailedToRetrievePieces {
714 error: anyhow::anyhow!(
715 "Successful result, but not all pieces were downloaded, this is likely a piece \
716 getter implementation bug"
717 ),
718 });
719 }
720
721 final_result
722}
723
724async fn reconstruct_piece<PG>(
725 piece_index: PieceIndex,
726 recovery_semaphore: &Semaphore,
727 piece_getter: &PG,
728 erasure_coding: &ErasureCoding,
729) -> (PieceIndex, Result<Piece, PlottingError>)
730where
731 PG: PieceGetter + Send + Sync,
732{
733 let _permit = recovery_semaphore.acquire().await;
734 let recovered_piece_fut =
735 recover_missing_piece(piece_getter, erasure_coding.clone(), piece_index);
736
737 (
738 piece_index,
739 recovered_piece_fut
740 .await
741 .map_err(|error| PlottingError::PieceRecoveryFailed {
742 piece_index,
743 error: error.into(),
744 }),
745 )
746}
747
748fn process_piece(
749 piece_index: PieceIndex,
750 piece: Piece,
751 pieces_to_download: &mut HashMap<PieceIndex, Vec<(&mut Record, &mut RecordMetadata)>>,
752) {
753 for (record, metadata) in pieces_to_download.remove(&piece_index).unwrap_or_default() {
754 record
758 .as_flattened_mut()
759 .copy_from_slice(piece.record().as_flattened());
760 *metadata = RecordMetadata {
761 root: *piece.root(),
762 parity_chunks_root: *piece.parity_chunks_root(),
763 proof: *piece.proof(),
764 piece_checksum: blake3::hash(piece.as_ref()).into(),
765 };
766 }
767}