1use crate::FarmerProtocolInfo;
10use crate::sector::{
11 RawSector, RecordMetadata, SectorContentsMap, SectorMetadata, SectorMetadataChecksummed,
12 SingleRecordBitArray, sector_record_chunks_size, sector_size,
13};
14use crate::segment_reconstruction::recover_missing_piece;
15use ab_core_primitives::hashes::Blake3Hash;
16use ab_core_primitives::pieces::{Piece, PieceIndex, PieceOffset, Record, RecordChunk};
17use ab_core_primitives::pos::PosSeed;
18use ab_core_primitives::sectors::{SBucket, SectorId, SectorIndex};
19use ab_core_primitives::segments::HistorySize;
20use ab_data_retrieval::piece_getter::PieceGetter;
21use ab_erasure_coding::ErasureCoding;
22use ab_proof_of_space::{Table, TableGenerator};
23use async_lock::{Mutex as AsyncMutex, Semaphore};
24use backoff::future::retry;
25use backoff::{Error as BackoffError, ExponentialBackoff};
26use futures::stream::FuturesUnordered;
27use futures::{StreamExt, select};
28use parity_scale_codec::{Decode, Encode};
29use parking_lot::Mutex;
30use std::collections::HashMap;
31use std::simd::Simd;
32use std::sync::Arc;
33use std::sync::atomic::{AtomicBool, Ordering};
34use std::time::Duration;
35use thiserror::Error;
36use tracing::{debug, trace, warn};
37
38const RECONSTRUCTION_CONCURRENCY_LIMIT: usize = 1;
39
40fn default_backoff() -> ExponentialBackoff {
41 ExponentialBackoff {
42 initial_interval: Duration::from_secs(15),
43 max_interval: Duration::from_secs(10 * 60),
44 max_elapsed_time: None,
46 ..ExponentialBackoff::default()
47 }
48}
49
50#[derive(Debug, Clone, Encode, Decode)]
52pub struct PlottedSector {
53 pub sector_id: SectorId,
55 pub sector_index: SectorIndex,
57 pub sector_metadata: SectorMetadataChecksummed,
59 pub piece_indexes: Vec<PieceIndex>,
61}
62
63#[derive(Debug, Error)]
65pub enum PlottingError {
66 #[error("Records encoder error: {error}")]
68 RecordsEncoderError {
69 error: anyhow::Error,
71 },
72 #[error("Bad sector output size: provided {provided}, expected {expected}")]
74 BadSectorOutputSize {
75 provided: usize,
77 expected: usize,
79 },
80 #[error("Can't recover missing piece {piece_index}: {error}")]
82 PieceRecoveryFailed {
83 piece_index: PieceIndex,
85 error: anyhow::Error,
87 },
88 #[error("Failed to retrieve pieces: {error}")]
90 FailedToRetrievePieces {
91 error: anyhow::Error,
93 },
94 #[error("Abort early")]
96 AbortEarly,
97}
98
99#[derive(Debug)]
105pub struct PlotSectorOptions<'a, RE, PG> {
106 pub public_key_hash: &'a Blake3Hash,
108 pub sector_index: SectorIndex,
110 pub piece_getter: &'a PG,
112 pub farmer_protocol_info: FarmerProtocolInfo,
114 pub erasure_coding: &'a ErasureCoding,
116 pub pieces_in_sector: u16,
118 pub sector_output: &'a mut Vec<u8>,
121 pub downloading_semaphore: Option<Arc<Semaphore>>,
124 pub encoding_semaphore: Option<&'a Semaphore>,
127 pub records_encoder: &'a mut RE,
129 pub abort_early: &'a AtomicBool,
131}
132
133pub async fn plot_sector<RE, PG>(
140 options: PlotSectorOptions<'_, RE, PG>,
141) -> Result<PlottedSector, PlottingError>
142where
143 RE: RecordsEncoder,
144 PG: PieceGetter + Send + Sync,
145{
146 let PlotSectorOptions {
147 public_key_hash,
148 sector_index,
149 piece_getter,
150 farmer_protocol_info,
151 erasure_coding,
152 pieces_in_sector,
153 sector_output,
154 downloading_semaphore,
155 encoding_semaphore,
156 records_encoder,
157 abort_early,
158 } = options;
159
160 let _downloading_permit = match downloading_semaphore {
161 Some(downloading_semaphore) => Some(downloading_semaphore.acquire_arc().await),
162 None => None,
163 };
164
165 let download_sector_fut = download_sector(DownloadSectorOptions {
166 public_key_hash,
167 sector_index,
168 piece_getter,
169 farmer_protocol_info,
170 erasure_coding,
171 pieces_in_sector,
172 });
173
174 let _encoding_permit = match encoding_semaphore {
175 Some(encoding_semaphore) => Some(encoding_semaphore.acquire().await),
176 None => None,
177 };
178
179 let encoded_sector = encode_sector(
180 download_sector_fut.await?,
181 EncodeSectorOptions::<RE> {
182 sector_index,
183 records_encoder,
184 abort_early,
185 },
186 )?;
187
188 if abort_early.load(Ordering::Acquire) {
189 return Err(PlottingError::AbortEarly);
190 }
191
192 write_sector(&encoded_sector, sector_output)?;
193
194 Ok(encoded_sector.plotted_sector)
195}
196
197#[derive(Debug)]
199pub struct DownloadedSector {
200 sector_id: SectorId,
201 piece_indices: Vec<PieceIndex>,
202 raw_sector: RawSector,
203 history_size: HistorySize,
204}
205
206#[derive(Debug)]
208pub struct DownloadSectorOptions<'a, PG> {
209 pub public_key_hash: &'a Blake3Hash,
211 pub sector_index: SectorIndex,
213 pub piece_getter: &'a PG,
215 pub farmer_protocol_info: FarmerProtocolInfo,
217 pub erasure_coding: &'a ErasureCoding,
219 pub pieces_in_sector: u16,
221}
222
223pub async fn download_sector<PG>(
228 options: DownloadSectorOptions<'_, PG>,
229) -> Result<DownloadedSector, PlottingError>
230where
231 PG: PieceGetter + Send + Sync,
232{
233 let DownloadSectorOptions {
234 public_key_hash,
235 sector_index,
236 piece_getter,
237 farmer_protocol_info,
238 erasure_coding,
239 pieces_in_sector,
240 } = options;
241
242 let sector_id = SectorId::new(
243 public_key_hash,
244 sector_index,
245 farmer_protocol_info.history_size,
246 );
247
248 let piece_indices = (PieceOffset::ZERO..)
249 .take(pieces_in_sector.into())
250 .map(|piece_offset| {
251 sector_id.derive_piece_index(
252 piece_offset,
253 farmer_protocol_info.history_size,
254 farmer_protocol_info.max_pieces_in_sector,
255 farmer_protocol_info.recent_segments,
256 farmer_protocol_info.recent_history_fraction,
257 )
258 })
259 .collect::<Vec<_>>();
260
261 let raw_sector = {
262 let mut raw_sector = RawSector::new(pieces_in_sector);
263 let mut pieces_to_download =
264 HashMap::<PieceIndex, Vec<_>>::with_capacity(usize::from(pieces_in_sector));
265 for (piece_index, (record, metadata)) in piece_indices
266 .iter()
267 .copied()
268 .zip(raw_sector.records.iter_mut().zip(&mut raw_sector.metadata))
269 {
270 pieces_to_download
271 .entry(piece_index)
272 .or_default()
273 .push((record, metadata));
274 }
275 let pieces_to_download = AsyncMutex::new(pieces_to_download);
277
278 retry(default_backoff(), || async {
279 let mut pieces_to_download = pieces_to_download.lock().await;
280
281 if let Err(error) =
282 download_sector_internal(&mut pieces_to_download, piece_getter, erasure_coding)
283 .await
284 {
285 warn!(
286 %sector_index,
287 %error,
288 %pieces_in_sector,
289 remaining_pieces = %pieces_to_download.len(),
290 "Sector downloading attempt failed, will retry later"
291 );
292
293 return Err(BackoffError::transient(error));
294 }
295
296 debug!(%sector_index, "Sector downloaded successfully");
297
298 Ok(())
299 })
300 .await?;
301
302 raw_sector
303 };
304
305 Ok(DownloadedSector {
306 sector_id,
307 piece_indices,
308 raw_sector,
309 history_size: farmer_protocol_info.history_size,
310 })
311}
312
313pub trait RecordsEncoder {
315 fn encode_records(
317 &mut self,
318 sector_id: &SectorId,
319 records: &mut [Record],
320 abort_early: &AtomicBool,
321 ) -> anyhow::Result<SectorContentsMap>;
322}
323
324#[derive(Debug)]
326pub struct CpuRecordsEncoder<'a, PosTable>
327where
328 PosTable: Table,
329{
330 table_generators: &'a [PosTable::Generator],
331 erasure_coding: &'a ErasureCoding,
332 global_mutex: &'a AsyncMutex<()>,
333}
334
335impl<PosTable> RecordsEncoder for CpuRecordsEncoder<'_, PosTable>
336where
337 PosTable: Table,
338{
339 fn encode_records(
340 &mut self,
341 sector_id: &SectorId,
342 records: &mut [Record],
343 abort_early: &AtomicBool,
344 ) -> anyhow::Result<SectorContentsMap> {
345 if self.table_generators.is_empty() {
346 return Err(anyhow::anyhow!("No table generators"));
347 }
348
349 let pieces_in_sector = records
350 .len()
351 .try_into()
352 .map_err(|error| anyhow::anyhow!("Failed to convert pieces in sector: {error}"))?;
353 let mut sector_contents_map = SectorContentsMap::new(pieces_in_sector);
354
355 {
356 let global_mutex = self.global_mutex;
357 let erasure_coding = self.erasure_coding;
358
359 let iter = Mutex::new(
360 (PieceOffset::ZERO..)
361 .zip(records.iter_mut())
362 .zip(sector_contents_map.iter_record_chunks_used_mut()),
363 );
364
365 rayon::scope(|scope| {
366 for table_generator in self.table_generators {
367 scope.spawn(|_scope| {
368 loop {
369 global_mutex.lock_blocking();
371
372 let Some(((piece_offset, record), record_chunks_used)) =
375 iter.lock().next()
376 else {
377 return;
378 };
379 let pos_seed = sector_id.derive_evaluation_seed(piece_offset);
380
381 record_encoding::<PosTable>(
382 &pos_seed,
383 record,
384 record_chunks_used,
385 table_generator,
386 erasure_coding,
387 );
388
389 if abort_early.load(Ordering::Relaxed) {
390 return;
391 }
392 }
393 });
394 }
395 });
396 }
397
398 Ok(sector_contents_map)
399 }
400}
401
402impl<'a, PosTable> CpuRecordsEncoder<'a, PosTable>
403where
404 PosTable: Table,
405{
406 pub fn new(
408 table_generators: &'a [PosTable::Generator],
409 erasure_coding: &'a ErasureCoding,
410 global_mutex: &'a AsyncMutex<()>,
411 ) -> Self {
412 Self {
413 table_generators,
414 erasure_coding,
415 global_mutex,
416 }
417 }
418}
419
420#[derive(Debug)]
426pub struct EncodeSectorOptions<'a, RE>
427where
428 RE: RecordsEncoder,
429{
430 pub sector_index: SectorIndex,
432 pub records_encoder: &'a mut RE,
434 pub abort_early: &'a AtomicBool,
436}
437
438#[derive(Debug)]
440pub struct EncodedSector {
441 pub plotted_sector: PlottedSector,
443 raw_sector: RawSector,
444 sector_contents_map: SectorContentsMap,
445}
446
447pub fn encode_sector<RE>(
452 downloaded_sector: DownloadedSector,
453 encoding_options: EncodeSectorOptions<'_, RE>,
454) -> Result<EncodedSector, PlottingError>
455where
456 RE: RecordsEncoder,
457{
458 let DownloadedSector {
459 sector_id,
460 piece_indices,
461 mut raw_sector,
462 history_size,
463 } = downloaded_sector;
464 let EncodeSectorOptions {
465 sector_index,
466 records_encoder,
467 abort_early,
468 } = encoding_options;
469
470 let pieces_in_sector = raw_sector.records.len().try_into().expect(
471 "Raw sector can only be created in this crate and it is always done correctly; qed",
472 );
473
474 let sector_contents_map = records_encoder
475 .encode_records(§or_id, &mut raw_sector.records, abort_early)
476 .map_err(|error| PlottingError::RecordsEncoderError { error })?;
477
478 let sector_metadata = SectorMetadataChecksummed::from(SectorMetadata {
479 sector_index,
480 pieces_in_sector,
481 s_bucket_sizes: sector_contents_map.s_bucket_sizes(),
482 history_size,
483 });
484
485 Ok(EncodedSector {
486 plotted_sector: PlottedSector {
487 sector_id,
488 sector_index,
489 sector_metadata,
490 piece_indexes: piece_indices,
491 },
492 raw_sector,
493 sector_contents_map,
494 })
495}
496
497pub fn write_sector(
499 encoded_sector: &EncodedSector,
500 sector_output: &mut Vec<u8>,
501) -> Result<(), PlottingError> {
502 let EncodedSector {
503 plotted_sector: _,
504 raw_sector,
505 sector_contents_map,
506 } = encoded_sector;
507
508 let pieces_in_sector = raw_sector.records.len().try_into().expect(
509 "Raw sector can only be created in this crate and it is always done correctly; qed",
510 );
511
512 let sector_size = sector_size(pieces_in_sector);
513
514 if !sector_output.is_empty() && sector_output.len() != sector_size {
515 return Err(PlottingError::BadSectorOutputSize {
516 provided: sector_output.len(),
517 expected: sector_size,
518 });
519 }
520
521 sector_output.resize(sector_size, 0);
522
523 {
529 let (sector_contents_map_region, remaining_bytes) =
530 sector_output.split_at_mut(SectorContentsMap::encoded_size(pieces_in_sector));
531 let (s_buckets_region, metadata_region) =
533 remaining_bytes.split_at_mut(sector_record_chunks_size(pieces_in_sector));
534
535 sector_contents_map
537 .encode_into(sector_contents_map_region)
538 .expect("Chunked into correct size above; qed");
539
540 let mut next_record_chunks_offset = vec![0_usize; pieces_in_sector.into()];
541 for (piece_offset, output) in (SBucket::ZERO..=SBucket::MAX)
543 .flat_map(|s_bucket| {
544 sector_contents_map
545 .iter_s_bucket_piece_offsets(s_bucket)
546 .expect("S-bucket guaranteed to be in range; qed")
547 })
548 .zip(s_buckets_region.as_chunks_mut::<{ RecordChunk::SIZE }>().0)
549 {
550 let next_record_chunks_offset =
551 &mut next_record_chunks_offset[usize::from(piece_offset)];
552
553 let chunk_position = *next_record_chunks_offset;
554 *next_record_chunks_offset += 1;
555 output.copy_from_slice(&raw_sector.records[usize::from(piece_offset)][chunk_position]);
556 }
557
558 let metadata_chunks = metadata_region
559 .as_chunks_mut::<{ RecordMetadata::encoded_size() }>()
560 .0;
561 for (record_metadata, output) in raw_sector.metadata.iter().zip(metadata_chunks) {
562 record_metadata.encode_to(&mut output.as_mut_slice());
563 }
564
565 let (sector_contents, sector_checksum) =
568 sector_output.split_at_mut(sector_size - Blake3Hash::SIZE);
569 sector_checksum.copy_from_slice(
570 {
571 let mut hasher = blake3::Hasher::new();
572 hasher.update_rayon(sector_contents);
573 hasher.finalize()
574 }
575 .as_bytes(),
576 );
577 }
578
579 Ok(())
580}
581
582fn record_encoding<PosTable>(
583 pos_seed: &PosSeed,
584 record: &mut Record,
585 record_chunks_used: &mut SingleRecordBitArray,
586 table_generator: &PosTable::Generator,
587 erasure_coding: &ErasureCoding,
588) where
589 PosTable: Table,
590{
591 let pos_proofs = table_generator.create_proofs_parallel(pos_seed);
592
593 let mut parity_record_chunks = Record::new_boxed();
594
595 erasure_coding
597 .extend(record.iter(), parity_record_chunks.iter_mut())
598 .expect("Statically guaranteed valid inputs; qed");
599
600 record_chunks_used.data = pos_proofs.found_proofs;
601
602 let mut num_found_proofs = 0_usize;
604 for (s_buckets, found_proofs) in (0..Record::NUM_S_BUCKETS)
605 .array_chunks::<{ u8::BITS as usize }>()
606 .zip(pos_proofs.found_proofs)
607 {
608 for (proof_offset, s_bucket) in s_buckets.into_iter().enumerate() {
609 if (found_proofs & (1 << proof_offset)) != 0 {
610 let record_chunk = if s_bucket < Record::NUM_CHUNKS {
611 record[s_bucket]
612 } else {
613 parity_record_chunks[s_bucket - Record::NUM_CHUNKS]
614 };
615 record[num_found_proofs] = (Simd::from(record_chunk)
617 ^ Simd::from(*pos_proofs.proofs[num_found_proofs].hash()))
618 .to_array();
619 num_found_proofs += 1;
620 }
621 }
622 }
623}
624
625async fn download_sector_internal<PG>(
626 pieces_to_download: &mut HashMap<PieceIndex, Vec<(&mut Record, &mut RecordMetadata)>>,
627 piece_getter: &PG,
628 erasure_coding: &ErasureCoding,
629) -> Result<(), PlottingError>
630where
631 PG: PieceGetter + Send + Sync,
632{
633 let recovery_semaphore = &Semaphore::new(RECONSTRUCTION_CONCURRENCY_LIMIT);
636
637 let piece_indices = pieces_to_download.keys().copied().collect::<Vec<_>>();
639 let mut downloaded_pieces = piece_getter
640 .get_pieces(piece_indices)
641 .await
642 .map_err(|error| PlottingError::FailedToRetrievePieces { error })?
643 .fuse();
644 let mut reconstructed_pieces = FuturesUnordered::new();
645
646 let mut final_result = Ok(());
647
648 loop {
649 let (piece_index, result) = select! {
650 (piece_index, result) = downloaded_pieces.select_next_some() => {
651 match result {
652 Ok(Some(piece)) => (piece_index, Ok(piece)),
653 Ok(None) => {
654 trace!(%piece_index, "Piece was not found, trying reconstruction");
655
656 reconstructed_pieces.push(reconstruct_piece(
657 piece_index,
658 recovery_semaphore,
659 piece_getter,
660 erasure_coding,
661 ));
662 continue;
663 }
664 Err(error) => {
665 trace!(
666 %error,
667 %piece_index,
668 "Failed to download piece, trying reconstruction"
669 );
670
671 reconstructed_pieces.push(reconstruct_piece(
672 piece_index,
673 recovery_semaphore,
674 piece_getter,
675 erasure_coding,
676 ));
677 continue;
678 }
679 }
680 },
681 (piece_index, result) = reconstructed_pieces.select_next_some() => {
682 (piece_index, result)
683 },
684 complete => {
685 break;
686 }
687 };
688
689 match result {
690 Ok(piece) => {
691 process_piece(piece_index, piece, pieces_to_download);
692 }
693 Err(error) => {
694 trace!(%error, %piece_index, "Failed to download piece");
695
696 if final_result.is_ok() {
697 final_result = Err(error);
698 }
699 }
700 }
701 }
702
703 if final_result.is_ok() && !pieces_to_download.is_empty() {
704 return Err(PlottingError::FailedToRetrievePieces {
705 error: anyhow::anyhow!(
706 "Successful result, but not all pieces were downloaded, this is likely a piece \
707 getter implementation bug"
708 ),
709 });
710 }
711
712 final_result
713}
714
715async fn reconstruct_piece<PG>(
716 piece_index: PieceIndex,
717 recovery_semaphore: &Semaphore,
718 piece_getter: &PG,
719 erasure_coding: &ErasureCoding,
720) -> (PieceIndex, Result<Piece, PlottingError>)
721where
722 PG: PieceGetter + Send + Sync,
723{
724 let _permit = recovery_semaphore.acquire().await;
725 let recovered_piece_fut =
726 recover_missing_piece(piece_getter, erasure_coding.clone(), piece_index);
727
728 (
729 piece_index,
730 recovered_piece_fut
731 .await
732 .map_err(|error| PlottingError::PieceRecoveryFailed {
733 piece_index,
734 error: error.into(),
735 }),
736 )
737}
738
739fn process_piece(
740 piece_index: PieceIndex,
741 piece: Piece,
742 pieces_to_download: &mut HashMap<PieceIndex, Vec<(&mut Record, &mut RecordMetadata)>>,
743) {
744 for (record, metadata) in pieces_to_download.remove(&piece_index).unwrap_or_default() {
745 record
749 .as_flattened_mut()
750 .copy_from_slice(piece.record().as_flattened());
751 *metadata = RecordMetadata {
752 root: *piece.root(),
753 parity_chunks_root: *piece.parity_chunks_root(),
754 proof: *piece.proof(),
755 piece_checksum: blake3::hash(piece.as_ref()).into(),
756 };
757 }
758}