1use crate::FarmerProtocolInfo;
10use crate::sector::{
11 EncodedChunksUsed, RawSector, RecordMetadata, SectorContentsMap, SectorMetadata,
12 SectorMetadataChecksummed, sector_record_chunks_size, sector_size,
13};
14use crate::segment_reconstruction::recover_missing_piece;
15use ab_core_primitives::hashes::Blake3Hash;
16use ab_core_primitives::pieces::{Piece, PieceIndex, PieceOffset, Record, RecordChunk};
17use ab_core_primitives::pos::PosSeed;
18use ab_core_primitives::sectors::{SBucket, SectorId, SectorIndex};
19use ab_core_primitives::segments::HistorySize;
20use ab_data_retrieval::piece_getter::PieceGetter;
21use ab_erasure_coding::ErasureCoding;
22use ab_proof_of_space::{Table, TableGenerator};
23use async_lock::{Mutex as AsyncMutex, Semaphore};
24use backoff::future::retry;
25use backoff::{Error as BackoffError, ExponentialBackoff};
26use futures::stream::FuturesUnordered;
27use futures::{StreamExt, select};
28use parity_scale_codec::{Decode, Encode};
29use parking_lot::Mutex;
30use rayon::prelude::*;
31use std::collections::HashMap;
32use std::simd::Simd;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, Ordering};
35use std::time::Duration;
36use thiserror::Error;
37use tracing::{debug, trace, warn};
38
39const RECONSTRUCTION_CONCURRENCY_LIMIT: usize = 1;
40
41fn default_backoff() -> ExponentialBackoff {
42 ExponentialBackoff {
43 initial_interval: Duration::from_secs(15),
44 max_interval: Duration::from_secs(10 * 60),
45 max_elapsed_time: None,
47 ..ExponentialBackoff::default()
48 }
49}
50
51#[derive(Debug, Clone, Encode, Decode)]
53pub struct PlottedSector {
54 pub sector_id: SectorId,
56 pub sector_index: SectorIndex,
58 pub sector_metadata: SectorMetadataChecksummed,
60 pub piece_indexes: Vec<PieceIndex>,
62}
63
64#[derive(Debug, Error)]
66pub enum PlottingError {
67 #[error("Records encoder error: {error}")]
69 RecordsEncoderError {
70 error: anyhow::Error,
72 },
73 #[error("Bad sector output size: provided {provided}, expected {expected}")]
75 BadSectorOutputSize {
76 provided: usize,
78 expected: usize,
80 },
81 #[error("Can't recover missing piece {piece_index}: {error}")]
83 PieceRecoveryFailed {
84 piece_index: PieceIndex,
86 error: anyhow::Error,
88 },
89 #[error("Failed to retrieve pieces: {error}")]
91 FailedToRetrievePieces {
92 error: anyhow::Error,
94 },
95 #[error("Abort early")]
97 AbortEarly,
98}
99
100#[derive(Debug)]
106pub struct PlotSectorOptions<'a, RE, PG> {
107 pub public_key_hash: &'a Blake3Hash,
109 pub sector_index: SectorIndex,
111 pub piece_getter: &'a PG,
113 pub farmer_protocol_info: FarmerProtocolInfo,
115 pub erasure_coding: &'a ErasureCoding,
117 pub pieces_in_sector: u16,
119 pub sector_output: &'a mut Vec<u8>,
122 pub downloading_semaphore: Option<Arc<Semaphore>>,
125 pub encoding_semaphore: Option<&'a Semaphore>,
128 pub records_encoder: &'a mut RE,
130 pub abort_early: &'a AtomicBool,
132}
133
134pub async fn plot_sector<RE, PG>(
141 options: PlotSectorOptions<'_, RE, PG>,
142) -> Result<PlottedSector, PlottingError>
143where
144 RE: RecordsEncoder,
145 PG: PieceGetter + Send + Sync,
146{
147 let PlotSectorOptions {
148 public_key_hash,
149 sector_index,
150 piece_getter,
151 farmer_protocol_info,
152 erasure_coding,
153 pieces_in_sector,
154 sector_output,
155 downloading_semaphore,
156 encoding_semaphore,
157 records_encoder,
158 abort_early,
159 } = options;
160
161 let _downloading_permit = match downloading_semaphore {
162 Some(downloading_semaphore) => Some(downloading_semaphore.acquire_arc().await),
163 None => None,
164 };
165
166 let download_sector_fut = download_sector(DownloadSectorOptions {
167 public_key_hash,
168 sector_index,
169 piece_getter,
170 farmer_protocol_info,
171 erasure_coding,
172 pieces_in_sector,
173 });
174
175 let _encoding_permit = match encoding_semaphore {
176 Some(encoding_semaphore) => Some(encoding_semaphore.acquire().await),
177 None => None,
178 };
179
180 let encoded_sector = encode_sector(
181 download_sector_fut.await?,
182 EncodeSectorOptions::<RE> {
183 sector_index,
184 records_encoder,
185 abort_early,
186 },
187 )?;
188
189 if abort_early.load(Ordering::Acquire) {
190 return Err(PlottingError::AbortEarly);
191 }
192
193 write_sector(&encoded_sector, sector_output)?;
194
195 Ok(encoded_sector.plotted_sector)
196}
197
198#[derive(Debug)]
200pub struct DownloadedSector {
201 sector_id: SectorId,
202 piece_indices: Vec<PieceIndex>,
203 raw_sector: RawSector,
204 history_size: HistorySize,
205}
206
207#[derive(Debug)]
209pub struct DownloadSectorOptions<'a, PG> {
210 pub public_key_hash: &'a Blake3Hash,
212 pub sector_index: SectorIndex,
214 pub piece_getter: &'a PG,
216 pub farmer_protocol_info: FarmerProtocolInfo,
218 pub erasure_coding: &'a ErasureCoding,
220 pub pieces_in_sector: u16,
222}
223
224pub async fn download_sector<PG>(
229 options: DownloadSectorOptions<'_, PG>,
230) -> Result<DownloadedSector, PlottingError>
231where
232 PG: PieceGetter + Send + Sync,
233{
234 let DownloadSectorOptions {
235 public_key_hash,
236 sector_index,
237 piece_getter,
238 farmer_protocol_info,
239 erasure_coding,
240 pieces_in_sector,
241 } = options;
242
243 let sector_id = SectorId::new(
244 public_key_hash,
245 sector_index,
246 farmer_protocol_info.history_size,
247 );
248
249 let piece_indices = (PieceOffset::ZERO..)
250 .take(pieces_in_sector.into())
251 .map(|piece_offset| {
252 sector_id.derive_piece_index(
253 piece_offset,
254 farmer_protocol_info.history_size,
255 farmer_protocol_info.max_pieces_in_sector,
256 farmer_protocol_info.recent_segments,
257 farmer_protocol_info.recent_history_fraction,
258 )
259 })
260 .collect::<Vec<_>>();
261
262 let raw_sector = {
263 let mut raw_sector = RawSector::new(pieces_in_sector);
264 let mut pieces_to_download =
265 HashMap::<PieceIndex, Vec<_>>::with_capacity(usize::from(pieces_in_sector));
266 for (piece_index, (record, metadata)) in piece_indices
267 .iter()
268 .copied()
269 .zip(raw_sector.records.iter_mut().zip(&mut raw_sector.metadata))
270 {
271 pieces_to_download
272 .entry(piece_index)
273 .or_default()
274 .push((record, metadata));
275 }
276 let pieces_to_download = AsyncMutex::new(pieces_to_download);
278
279 retry(default_backoff(), || async {
280 let mut pieces_to_download = pieces_to_download.lock().await;
281
282 if let Err(error) =
283 download_sector_internal(&mut pieces_to_download, piece_getter, erasure_coding)
284 .await
285 {
286 warn!(
287 %sector_index,
288 %error,
289 %pieces_in_sector,
290 remaining_pieces = %pieces_to_download.len(),
291 "Sector downloading attempt failed, will retry later"
292 );
293
294 return Err(BackoffError::transient(error));
295 }
296
297 debug!(%sector_index, "Sector downloaded successfully");
298
299 Ok(())
300 })
301 .await?;
302
303 raw_sector
304 };
305
306 Ok(DownloadedSector {
307 sector_id,
308 piece_indices,
309 raw_sector,
310 history_size: farmer_protocol_info.history_size,
311 })
312}
313
314pub trait RecordsEncoder {
316 fn encode_records(
318 &mut self,
319 sector_id: &SectorId,
320 records: &mut [Record],
321 abort_early: &AtomicBool,
322 ) -> anyhow::Result<SectorContentsMap>;
323}
324
325#[derive(Debug)]
327pub struct CpuRecordsEncoder<'a, PosTable>
328where
329 PosTable: Table,
330{
331 table_generators: &'a mut [PosTable::Generator],
332 erasure_coding: &'a ErasureCoding,
333 global_mutex: &'a AsyncMutex<()>,
334}
335
336impl<PosTable> RecordsEncoder for CpuRecordsEncoder<'_, PosTable>
337where
338 PosTable: Table,
339{
340 fn encode_records(
341 &mut self,
342 sector_id: &SectorId,
343 records: &mut [Record],
344 abort_early: &AtomicBool,
345 ) -> anyhow::Result<SectorContentsMap> {
346 if self.table_generators.is_empty() {
347 return Err(anyhow::anyhow!("No table generators"));
348 }
349
350 let pieces_in_sector = records
351 .len()
352 .try_into()
353 .map_err(|error| anyhow::anyhow!("Failed to convert pieces in sector: {error}"))?;
354 let mut sector_contents_map = SectorContentsMap::new(pieces_in_sector);
355
356 {
357 let table_generators = &mut *self.table_generators;
358 let global_mutex = self.global_mutex;
359 let erasure_coding = self.erasure_coding;
360
361 let iter = Mutex::new(
362 (PieceOffset::ZERO..)
363 .zip(records.iter_mut())
364 .zip(sector_contents_map.iter_record_bitfields_mut()),
365 );
366
367 rayon::scope(|scope| {
368 for table_generator in table_generators {
369 scope.spawn(|_scope| {
370 let mut chunks_scratch = Vec::with_capacity(Record::NUM_S_BUCKETS);
371
372 loop {
373 global_mutex.lock_blocking();
375
376 let Some(((piece_offset, record), encoded_chunks_used)) =
379 iter.lock().next()
380 else {
381 return;
382 };
383 let pos_seed = sector_id.derive_evaluation_seed(piece_offset);
384
385 record_encoding::<PosTable>(
386 &pos_seed,
387 record,
388 encoded_chunks_used,
389 table_generator,
390 erasure_coding,
391 &mut chunks_scratch,
392 );
393
394 if abort_early.load(Ordering::Relaxed) {
395 return;
396 }
397 }
398 });
399 }
400 });
401 }
402
403 Ok(sector_contents_map)
404 }
405}
406
407impl<'a, PosTable> CpuRecordsEncoder<'a, PosTable>
408where
409 PosTable: Table,
410{
411 pub fn new(
413 table_generators: &'a mut [PosTable::Generator],
414 erasure_coding: &'a ErasureCoding,
415 global_mutex: &'a AsyncMutex<()>,
416 ) -> Self {
417 Self {
418 table_generators,
419 erasure_coding,
420 global_mutex,
421 }
422 }
423}
424
425#[derive(Debug)]
431pub struct EncodeSectorOptions<'a, RE>
432where
433 RE: RecordsEncoder,
434{
435 pub sector_index: SectorIndex,
437 pub records_encoder: &'a mut RE,
439 pub abort_early: &'a AtomicBool,
441}
442
443#[derive(Debug)]
445pub struct EncodedSector {
446 pub plotted_sector: PlottedSector,
448 raw_sector: RawSector,
449 sector_contents_map: SectorContentsMap,
450}
451
452pub fn encode_sector<RE>(
457 downloaded_sector: DownloadedSector,
458 encoding_options: EncodeSectorOptions<'_, RE>,
459) -> Result<EncodedSector, PlottingError>
460where
461 RE: RecordsEncoder,
462{
463 let DownloadedSector {
464 sector_id,
465 piece_indices,
466 mut raw_sector,
467 history_size,
468 } = downloaded_sector;
469 let EncodeSectorOptions {
470 sector_index,
471 records_encoder,
472 abort_early,
473 } = encoding_options;
474
475 let pieces_in_sector = raw_sector.records.len().try_into().expect(
476 "Raw sector can only be created in this crate and it is always done correctly; qed",
477 );
478
479 let sector_contents_map = records_encoder
480 .encode_records(§or_id, &mut raw_sector.records, abort_early)
481 .map_err(|error| PlottingError::RecordsEncoderError { error })?;
482
483 let sector_metadata = SectorMetadataChecksummed::from(SectorMetadata {
484 sector_index,
485 pieces_in_sector,
486 s_bucket_sizes: sector_contents_map.s_bucket_sizes(),
487 history_size,
488 });
489
490 Ok(EncodedSector {
491 plotted_sector: PlottedSector {
492 sector_id,
493 sector_index,
494 sector_metadata,
495 piece_indexes: piece_indices,
496 },
497 raw_sector,
498 sector_contents_map,
499 })
500}
501
502pub fn write_sector(
504 encoded_sector: &EncodedSector,
505 sector_output: &mut Vec<u8>,
506) -> Result<(), PlottingError> {
507 let EncodedSector {
508 plotted_sector: _,
509 raw_sector,
510 sector_contents_map,
511 } = encoded_sector;
512
513 let pieces_in_sector = raw_sector.records.len().try_into().expect(
514 "Raw sector can only be created in this crate and it is always done correctly; qed",
515 );
516
517 let sector_size = sector_size(pieces_in_sector);
518
519 if !sector_output.is_empty() && sector_output.len() != sector_size {
520 return Err(PlottingError::BadSectorOutputSize {
521 provided: sector_output.len(),
522 expected: sector_size,
523 });
524 }
525
526 sector_output.resize(sector_size, 0);
527
528 {
534 let (sector_contents_map_region, remaining_bytes) =
535 sector_output.split_at_mut(SectorContentsMap::encoded_size(pieces_in_sector));
536 let (s_buckets_region, metadata_region) =
538 remaining_bytes.split_at_mut(sector_record_chunks_size(pieces_in_sector));
539
540 sector_contents_map
542 .encode_into(sector_contents_map_region)
543 .expect("Chunked into correct size above; qed");
544
545 let num_encoded_record_chunks = sector_contents_map.num_encoded_record_chunks();
546 let mut next_encoded_record_chunks_offset = vec![0_usize; pieces_in_sector.into()];
547 let mut next_unencoded_record_chunks_offset = vec![0_usize; pieces_in_sector.into()];
548 for ((piece_offset, encoded_chunk_used), output) in (SBucket::ZERO..=SBucket::MAX)
550 .flat_map(|s_bucket| {
551 sector_contents_map
552 .iter_s_bucket_records(s_bucket)
553 .expect("S-bucket guaranteed to be in range; qed")
554 })
555 .zip(s_buckets_region.array_chunks_mut::<{ RecordChunk::SIZE }>())
556 {
557 let num_encoded_record_chunks =
558 usize::from(num_encoded_record_chunks[usize::from(piece_offset)]);
559 let next_encoded_record_chunks_offset =
560 &mut next_encoded_record_chunks_offset[usize::from(piece_offset)];
561 let next_unencoded_record_chunks_offset =
562 &mut next_unencoded_record_chunks_offset[usize::from(piece_offset)];
563
564 let chunk_position;
568 if encoded_chunk_used {
569 chunk_position = *next_encoded_record_chunks_offset;
570 *next_encoded_record_chunks_offset += 1;
571 } else {
572 chunk_position = num_encoded_record_chunks + *next_unencoded_record_chunks_offset;
573 *next_unencoded_record_chunks_offset += 1;
574 }
575 output.copy_from_slice(&raw_sector.records[usize::from(piece_offset)][chunk_position]);
576 }
577
578 let metadata_chunks =
579 metadata_region.array_chunks_mut::<{ RecordMetadata::encoded_size() }>();
580 for (record_metadata, output) in raw_sector.metadata.iter().zip(metadata_chunks) {
581 record_metadata.encode_to(&mut output.as_mut_slice());
582 }
583
584 let (sector_contents, sector_checksum) =
587 sector_output.split_at_mut(sector_size - Blake3Hash::SIZE);
588 sector_checksum.copy_from_slice(
589 {
590 let mut hasher = blake3::Hasher::new();
591 hasher.update_rayon(sector_contents);
592 hasher.finalize()
593 }
594 .as_bytes(),
595 );
596 }
597
598 Ok(())
599}
600
601fn record_encoding<PosTable>(
602 pos_seed: &PosSeed,
603 record: &mut Record,
604 mut encoded_chunks_used: EncodedChunksUsed<'_>,
605 table_generator: &mut PosTable::Generator,
606 erasure_coding: &ErasureCoding,
607 chunks_scratch: &mut Vec<[u8; RecordChunk::SIZE]>,
608) where
609 PosTable: Table,
610{
611 let pos_table = table_generator.generate_parallel(pos_seed);
613
614 let mut parity_record_chunks = Record::new_boxed();
615
616 erasure_coding
618 .extend(record.iter(), parity_record_chunks.iter_mut())
619 .expect("Statically guaranteed valid inputs; qed");
620 let source_record_chunks = record.to_vec();
621
622 chunks_scratch.clear();
623 (u16::from(SBucket::ZERO)..=u16::from(SBucket::MAX))
626 .into_par_iter()
627 .map(SBucket::from)
628 .zip(
629 source_record_chunks
630 .par_iter()
631 .chain(parity_record_chunks.par_iter()),
632 )
633 .map(|(s_bucket, record_chunk)| {
634 if let Some(proof) = pos_table.find_proof(s_bucket.into()) {
635 (Simd::from(*record_chunk) ^ Simd::from(*proof.hash())).to_array()
636 } else {
637 [0; RecordChunk::SIZE]
639 }
640 })
641 .collect_into_vec(chunks_scratch);
642 let num_successfully_encoded_chunks = chunks_scratch
643 .drain(..)
644 .zip(encoded_chunks_used.iter_mut())
645 .filter_map(|(maybe_encoded_chunk, mut encoded_chunk_used)| {
646 if maybe_encoded_chunk == [0; RecordChunk::SIZE] {
648 None
649 } else {
650 *encoded_chunk_used = true;
651
652 Some(maybe_encoded_chunk)
653 }
654 })
655 .take(record.len())
660 .zip(record.iter_mut())
661 .map(|(input_chunk, output_chunk)| {
663 *output_chunk = input_chunk;
664 })
665 .count();
666
667 source_record_chunks
670 .iter()
671 .chain(parity_record_chunks.iter())
672 .zip(encoded_chunks_used.iter())
673 .filter_map(|(record_chunk, encoded_chunk_used)| {
675 if *encoded_chunk_used {
676 None
677 } else {
678 Some(record_chunk)
679 }
680 })
681 .zip(record.iter_mut().skip(num_successfully_encoded_chunks))
683 .for_each(|(input_chunk, output_chunk)| {
685 *output_chunk = *input_chunk;
686 });
687}
688
689async fn download_sector_internal<PG>(
690 pieces_to_download: &mut HashMap<PieceIndex, Vec<(&mut Record, &mut RecordMetadata)>>,
691 piece_getter: &PG,
692 erasure_coding: &ErasureCoding,
693) -> Result<(), PlottingError>
694where
695 PG: PieceGetter + Send + Sync,
696{
697 let recovery_semaphore = &Semaphore::new(RECONSTRUCTION_CONCURRENCY_LIMIT);
700
701 let piece_indices = pieces_to_download.keys().copied().collect::<Vec<_>>();
703 let mut downloaded_pieces = piece_getter
704 .get_pieces(piece_indices)
705 .await
706 .map_err(|error| PlottingError::FailedToRetrievePieces { error })?
707 .fuse();
708 let mut reconstructed_pieces = FuturesUnordered::new();
709
710 let mut final_result = Ok(());
711
712 loop {
713 let (piece_index, result) = select! {
714 (piece_index, result) = downloaded_pieces.select_next_some() => {
715 match result {
716 Ok(Some(piece)) => (piece_index, Ok(piece)),
717 Ok(None) => {
718 trace!(%piece_index, "Piece was not found, trying reconstruction");
719
720 reconstructed_pieces.push(reconstruct_piece(
721 piece_index,
722 recovery_semaphore,
723 piece_getter,
724 erasure_coding,
725 ));
726 continue;
727 }
728 Err(error) => {
729 trace!(
730 %error,
731 %piece_index,
732 "Failed to download piece, trying reconstruction"
733 );
734
735 reconstructed_pieces.push(reconstruct_piece(
736 piece_index,
737 recovery_semaphore,
738 piece_getter,
739 erasure_coding,
740 ));
741 continue;
742 }
743 }
744 },
745 (piece_index, result) = reconstructed_pieces.select_next_some() => {
746 (piece_index, result)
747 },
748 complete => {
749 break;
750 }
751 };
752
753 match result {
754 Ok(piece) => {
755 process_piece(piece_index, piece, pieces_to_download);
756 }
757 Err(error) => {
758 trace!(%error, %piece_index, "Failed to download piece");
759
760 if final_result.is_ok() {
761 final_result = Err(error);
762 }
763 }
764 }
765 }
766
767 if final_result.is_ok() && !pieces_to_download.is_empty() {
768 return Err(PlottingError::FailedToRetrievePieces {
769 error: anyhow::anyhow!(
770 "Successful result, but not all pieces were downloaded, this is likely a piece \
771 getter implementation bug"
772 ),
773 });
774 }
775
776 final_result
777}
778
779async fn reconstruct_piece<PG>(
780 piece_index: PieceIndex,
781 recovery_semaphore: &Semaphore,
782 piece_getter: &PG,
783 erasure_coding: &ErasureCoding,
784) -> (PieceIndex, Result<Piece, PlottingError>)
785where
786 PG: PieceGetter + Send + Sync,
787{
788 let _permit = recovery_semaphore.acquire().await;
789 let recovered_piece_fut =
790 recover_missing_piece(piece_getter, erasure_coding.clone(), piece_index);
791
792 (
793 piece_index,
794 recovered_piece_fut
795 .await
796 .map_err(|error| PlottingError::PieceRecoveryFailed {
797 piece_index,
798 error: error.into(),
799 }),
800 )
801}
802
803fn process_piece(
804 piece_index: PieceIndex,
805 piece: Piece,
806 pieces_to_download: &mut HashMap<PieceIndex, Vec<(&mut Record, &mut RecordMetadata)>>,
807) {
808 for (record, metadata) in pieces_to_download.remove(&piece_index).unwrap_or_default() {
809 record
813 .as_flattened_mut()
814 .copy_from_slice(piece.record().as_flattened());
815 *metadata = RecordMetadata {
816 root: *piece.root(),
817 parity_chunks_root: *piece.parity_chunks_root(),
818 proof: *piece.proof(),
819 piece_checksum: blake3::hash(piece.as_ref()).into(),
820 };
821 }
822}