1use crate::FarmerProtocolInfo;
10use crate::sector::{
11 FoundProofs, RawSector, RecordMetadata, SectorContentsMap, SectorMetadata,
12 SectorMetadataChecksummed, sector_record_chunks_size, sector_size,
13};
14use crate::segment_reconstruction::recover_missing_piece;
15use ab_core_primitives::hashes::Blake3Hash;
16use ab_core_primitives::pieces::{Piece, PieceIndex, PieceOffset, Record, RecordChunk};
17use ab_core_primitives::pos::PosSeed;
18use ab_core_primitives::sectors::{SBucket, SectorId, SectorIndex};
19use ab_core_primitives::segments::HistorySize;
20use ab_core_primitives::solutions::ShardCommitmentHash;
21use ab_data_retrieval::piece_getter::PieceGetter;
22use ab_erasure_coding::ErasureCoding;
23use ab_proof_of_space::{Table, TableGenerator};
24use async_lock::{Mutex as AsyncMutex, Semaphore};
25use backon::{ExponentialBuilder, Retryable};
26use futures::stream::FuturesUnordered;
27use futures::{StreamExt, select};
28use parity_scale_codec::{Decode, Encode};
29use parking_lot::Mutex;
30use std::collections::HashMap;
31use std::simd::Simd;
32use std::sync::Arc;
33use std::sync::atomic::{AtomicBool, Ordering};
34use std::time::Duration;
35use thiserror::Error;
36use tracing::{debug, trace, warn};
37
38const RECONSTRUCTION_CONCURRENCY_LIMIT: usize = 1;
39
40#[derive(Debug, Clone, Encode, Decode)]
42pub struct PlottedSector {
43 pub sector_id: SectorId,
45 pub sector_index: SectorIndex,
47 pub sector_metadata: SectorMetadataChecksummed,
49 pub piece_indexes: Vec<PieceIndex>,
51}
52
53#[derive(Debug, Error)]
55pub enum PlottingError {
56 #[error("Records encoder error: {error}")]
58 RecordsEncoderError {
59 error: anyhow::Error,
61 },
62 #[error("Bad sector output size: provided {provided}, expected {expected}")]
64 BadSectorOutputSize {
65 provided: usize,
67 expected: usize,
69 },
70 #[error("Can't recover missing piece {piece_index}: {error}")]
72 PieceRecoveryFailed {
73 piece_index: PieceIndex,
75 error: anyhow::Error,
77 },
78 #[error("Failed to retrieve pieces: {error}")]
80 FailedToRetrievePieces {
81 error: anyhow::Error,
83 },
84 #[error("Abort early")]
86 AbortEarly,
87}
88
89#[derive(Debug)]
95pub struct PlotSectorOptions<'a, RE, PG> {
96 pub public_key_hash: &'a Blake3Hash,
98 pub shard_commitments_root: &'a ShardCommitmentHash,
100 pub sector_index: SectorIndex,
102 pub piece_getter: &'a PG,
104 pub farmer_protocol_info: FarmerProtocolInfo,
106 pub erasure_coding: &'a ErasureCoding,
108 pub pieces_in_sector: u16,
110 pub sector_output: &'a mut Vec<u8>,
113 pub downloading_semaphore: Option<Arc<Semaphore>>,
116 pub encoding_semaphore: Option<&'a Semaphore>,
119 pub records_encoder: &'a mut RE,
121 pub abort_early: &'a AtomicBool,
123}
124
125pub async fn plot_sector<RE, PG>(
132 options: PlotSectorOptions<'_, RE, PG>,
133) -> Result<PlottedSector, PlottingError>
134where
135 RE: RecordsEncoder,
136 PG: PieceGetter + Send + Sync,
137{
138 let PlotSectorOptions {
139 public_key_hash,
140 shard_commitments_root,
141 sector_index,
142 piece_getter,
143 farmer_protocol_info,
144 erasure_coding,
145 pieces_in_sector,
146 sector_output,
147 downloading_semaphore,
148 encoding_semaphore,
149 records_encoder,
150 abort_early,
151 } = options;
152
153 let _downloading_permit = match downloading_semaphore {
154 Some(downloading_semaphore) => Some(downloading_semaphore.acquire_arc().await),
155 None => None,
156 };
157
158 let download_sector_fut = download_sector(DownloadSectorOptions {
159 public_key_hash,
160 shard_commitments_root,
161 sector_index,
162 piece_getter,
163 farmer_protocol_info,
164 erasure_coding,
165 pieces_in_sector,
166 });
167
168 let _encoding_permit = match encoding_semaphore {
169 Some(encoding_semaphore) => Some(encoding_semaphore.acquire().await),
170 None => None,
171 };
172
173 let encoded_sector = encode_sector(
174 download_sector_fut.await?,
175 EncodeSectorOptions::<RE> {
176 sector_index,
177 records_encoder,
178 abort_early,
179 },
180 )?;
181
182 if abort_early.load(Ordering::Acquire) {
183 return Err(PlottingError::AbortEarly);
184 }
185
186 write_sector(&encoded_sector, sector_output)?;
187
188 Ok(encoded_sector.plotted_sector)
189}
190
191#[derive(Debug)]
193pub struct DownloadedSector {
194 sector_id: SectorId,
195 piece_indices: Vec<PieceIndex>,
196 raw_sector: RawSector,
197 history_size: HistorySize,
198}
199
200#[derive(Debug)]
202pub struct DownloadSectorOptions<'a, PG> {
203 pub public_key_hash: &'a Blake3Hash,
205 pub shard_commitments_root: &'a ShardCommitmentHash,
207 pub sector_index: SectorIndex,
209 pub piece_getter: &'a PG,
211 pub farmer_protocol_info: FarmerProtocolInfo,
213 pub erasure_coding: &'a ErasureCoding,
215 pub pieces_in_sector: u16,
217}
218
219pub async fn download_sector<PG>(
224 options: DownloadSectorOptions<'_, PG>,
225) -> Result<DownloadedSector, PlottingError>
226where
227 PG: PieceGetter + Send + Sync,
228{
229 let DownloadSectorOptions {
230 public_key_hash,
231 shard_commitments_root,
232 sector_index,
233 piece_getter,
234 farmer_protocol_info,
235 erasure_coding,
236 pieces_in_sector,
237 } = options;
238
239 let sector_id = SectorId::new(
240 public_key_hash,
241 shard_commitments_root,
242 sector_index,
243 farmer_protocol_info.history_size,
244 );
245
246 let piece_indices = (PieceOffset::ZERO..)
247 .take(pieces_in_sector.into())
248 .map(|piece_offset| {
249 sector_id.derive_piece_index(
250 piece_offset,
251 farmer_protocol_info.history_size,
252 farmer_protocol_info.max_pieces_in_sector,
253 farmer_protocol_info.recent_segments,
254 farmer_protocol_info.recent_history_fraction,
255 )
256 })
257 .collect::<Vec<_>>();
258
259 let raw_sector = {
260 let mut raw_sector = RawSector::new(pieces_in_sector);
261 let mut pieces_to_download =
262 HashMap::<PieceIndex, Vec<_>>::with_capacity(usize::from(pieces_in_sector));
263 for (piece_index, (record, metadata)) in piece_indices
264 .iter()
265 .copied()
266 .zip(raw_sector.records.iter_mut().zip(&mut raw_sector.metadata))
267 {
268 pieces_to_download
269 .entry(piece_index)
270 .or_default()
271 .push((record, metadata));
272 }
273 let pieces_to_download = AsyncMutex::new(pieces_to_download);
275
276 (|| async {
277 let mut pieces_to_download = pieces_to_download.lock().await;
278
279 if let Err(error) =
280 download_sector_internal(&mut pieces_to_download, piece_getter, erasure_coding)
281 .await
282 {
283 warn!(
284 %sector_index,
285 %error,
286 %pieces_in_sector,
287 remaining_pieces = %pieces_to_download.len(),
288 "Sector downloading attempt failed, will retry later"
289 );
290
291 return Err(error);
292 }
293
294 debug!(%sector_index, "Sector downloaded successfully");
295
296 Ok(())
297 })
298 .retry(
299 ExponentialBuilder::default()
300 .with_min_delay(Duration::from_secs(15))
301 .with_max_delay(Duration::from_mins(10))
302 .without_max_times(),
304 )
305 .await?;
306
307 raw_sector
308 };
309
310 Ok(DownloadedSector {
311 sector_id,
312 piece_indices,
313 raw_sector,
314 history_size: farmer_protocol_info.history_size,
315 })
316}
317
318pub trait RecordsEncoder {
320 fn encode_records(
322 &mut self,
323 sector_id: &SectorId,
324 records: &mut [Record],
325 abort_early: &AtomicBool,
326 ) -> anyhow::Result<SectorContentsMap>;
327}
328
329#[derive(Debug)]
331pub struct CpuRecordsEncoder<'a, PosTable>
332where
333 PosTable: Table,
334{
335 table_generators: &'a [PosTable::Generator],
336 erasure_coding: &'a ErasureCoding,
337 global_mutex: &'a AsyncMutex<()>,
338}
339
340impl<PosTable> RecordsEncoder for CpuRecordsEncoder<'_, PosTable>
341where
342 PosTable: Table,
343{
344 fn encode_records(
345 &mut self,
346 sector_id: &SectorId,
347 records: &mut [Record],
348 abort_early: &AtomicBool,
349 ) -> anyhow::Result<SectorContentsMap> {
350 if self.table_generators.is_empty() {
351 return Err(anyhow::anyhow!("No table generators"));
352 }
353
354 let pieces_in_sector = records
355 .len()
356 .try_into()
357 .map_err(|error| anyhow::anyhow!("Failed to convert pieces in sector: {error}"))?;
358 let mut sector_contents_map = SectorContentsMap::new(pieces_in_sector);
359
360 {
361 let global_mutex = self.global_mutex;
362 let erasure_coding = self.erasure_coding;
363
364 let iter = Mutex::new(
365 (PieceOffset::ZERO..)
366 .zip(records.iter_mut())
367 .zip(sector_contents_map.iter_record_chunks_used_mut()),
368 );
369
370 rayon::scope(|scope| {
371 for table_generator in self.table_generators {
372 scope.spawn(|_scope| {
373 loop {
374 global_mutex.lock_blocking();
376
377 let Some(((piece_offset, record), record_chunks_used)) =
380 iter.lock().next()
381 else {
382 return;
383 };
384 let pos_seed = sector_id.derive_evaluation_seed(piece_offset);
385
386 record_encoding::<PosTable>(
387 &pos_seed,
388 record,
389 record_chunks_used,
390 table_generator,
391 erasure_coding,
392 );
393
394 if abort_early.load(Ordering::Relaxed) {
395 return;
396 }
397 }
398 });
399 }
400 });
401 }
402
403 Ok(sector_contents_map)
404 }
405}
406
407impl<'a, PosTable> CpuRecordsEncoder<'a, PosTable>
408where
409 PosTable: Table,
410{
411 pub fn new(
413 table_generators: &'a [PosTable::Generator],
414 erasure_coding: &'a ErasureCoding,
415 global_mutex: &'a AsyncMutex<()>,
416 ) -> Self {
417 Self {
418 table_generators,
419 erasure_coding,
420 global_mutex,
421 }
422 }
423}
424
425#[derive(Debug)]
431pub struct EncodeSectorOptions<'a, RE>
432where
433 RE: RecordsEncoder,
434{
435 pub sector_index: SectorIndex,
437 pub records_encoder: &'a mut RE,
439 pub abort_early: &'a AtomicBool,
441}
442
443#[derive(Debug)]
445pub struct EncodedSector {
446 pub plotted_sector: PlottedSector,
448 raw_sector: RawSector,
449 sector_contents_map: SectorContentsMap,
450}
451
452pub fn encode_sector<RE>(
457 downloaded_sector: DownloadedSector,
458 encoding_options: EncodeSectorOptions<'_, RE>,
459) -> Result<EncodedSector, PlottingError>
460where
461 RE: RecordsEncoder,
462{
463 let DownloadedSector {
464 sector_id,
465 piece_indices,
466 mut raw_sector,
467 history_size,
468 } = downloaded_sector;
469 let EncodeSectorOptions {
470 sector_index,
471 records_encoder,
472 abort_early,
473 } = encoding_options;
474
475 let pieces_in_sector = raw_sector.records.len().try_into().expect(
476 "Raw sector can only be created in this crate and it is always done correctly; qed",
477 );
478
479 let sector_contents_map = records_encoder
480 .encode_records(§or_id, &mut raw_sector.records, abort_early)
481 .map_err(|error| PlottingError::RecordsEncoderError { error })?;
482
483 let sector_metadata = SectorMetadataChecksummed::from(SectorMetadata {
484 sector_index,
485 pieces_in_sector,
486 s_bucket_sizes: sector_contents_map.s_bucket_sizes(),
487 history_size,
488 });
489
490 Ok(EncodedSector {
491 plotted_sector: PlottedSector {
492 sector_id,
493 sector_index,
494 sector_metadata,
495 piece_indexes: piece_indices,
496 },
497 raw_sector,
498 sector_contents_map,
499 })
500}
501
502pub fn write_sector(
504 encoded_sector: &EncodedSector,
505 sector_output: &mut Vec<u8>,
506) -> Result<(), PlottingError> {
507 let EncodedSector {
508 plotted_sector: _,
509 raw_sector,
510 sector_contents_map,
511 } = encoded_sector;
512
513 let pieces_in_sector = raw_sector.records.len().try_into().expect(
514 "Raw sector can only be created in this crate and it is always done correctly; qed",
515 );
516
517 let sector_size = sector_size(pieces_in_sector);
518
519 if !sector_output.is_empty() && sector_output.len() != sector_size {
520 return Err(PlottingError::BadSectorOutputSize {
521 provided: sector_output.len(),
522 expected: sector_size,
523 });
524 }
525
526 sector_output.resize(sector_size, 0);
527
528 {
534 let (sector_contents_map_region, remaining_bytes) =
535 sector_output.split_at_mut(SectorContentsMap::encoded_size(pieces_in_sector));
536 let (s_buckets_region, metadata_region) =
538 remaining_bytes.split_at_mut(sector_record_chunks_size(pieces_in_sector));
539
540 sector_contents_map
542 .encode_into(sector_contents_map_region)
543 .expect("Chunked into correct size above; qed");
544
545 let mut next_record_chunks_offset = vec![0_usize; pieces_in_sector.into()];
546 for (piece_offset, output) in (SBucket::ZERO..=SBucket::MAX)
548 .flat_map(|s_bucket| {
549 sector_contents_map
550 .iter_s_bucket_piece_offsets(s_bucket)
551 .expect("S-bucket guaranteed to be in range; qed")
552 })
553 .zip(s_buckets_region.as_chunks_mut::<{ RecordChunk::SIZE }>().0)
554 {
555 let next_record_chunks_offset =
556 &mut next_record_chunks_offset[usize::from(piece_offset)];
557
558 let chunk_position = *next_record_chunks_offset;
559 *next_record_chunks_offset += 1;
560 output.copy_from_slice(&raw_sector.records[usize::from(piece_offset)][chunk_position]);
561 }
562
563 let metadata_chunks = metadata_region
564 .as_chunks_mut::<{ RecordMetadata::encoded_size() }>()
565 .0;
566 for (record_metadata, output) in raw_sector.metadata.iter().zip(metadata_chunks) {
567 record_metadata.encode_to(&mut output.as_mut_slice());
568 }
569
570 let (sector_contents, sector_checksum) =
573 sector_output.split_at_mut(sector_size - Blake3Hash::SIZE);
574 sector_checksum.copy_from_slice(
575 {
576 let mut hasher = blake3::Hasher::new();
577 hasher.update_rayon(sector_contents);
578 hasher.finalize()
579 }
580 .as_bytes(),
581 );
582 }
583
584 Ok(())
585}
586
587fn record_encoding<PosTable>(
588 pos_seed: &PosSeed,
589 record: &mut Record,
590 record_chunks_used: &mut FoundProofs,
591 table_generator: &PosTable::Generator,
592 erasure_coding: &ErasureCoding,
593) where
594 PosTable: Table,
595{
596 let pos_proofs = table_generator.create_proofs_parallel(pos_seed);
597
598 let mut parity_record_chunks = Record::new_boxed();
599
600 erasure_coding
602 .extend(record.iter(), parity_record_chunks.iter_mut())
603 .expect("Statically guaranteed valid inputs; qed");
604
605 *record_chunks_used = pos_proofs.found_proofs;
606
607 let mut num_found_proofs = 0_usize;
609 for (s_buckets, found_proofs) in (0..Record::NUM_S_BUCKETS)
610 .array_chunks::<{ u8::BITS as usize }>()
611 .zip(pos_proofs.found_proofs)
612 {
613 for (proof_offset, s_bucket) in s_buckets.into_iter().enumerate() {
614 if (found_proofs & (1 << proof_offset)) != 0 {
615 let record_chunk = if s_bucket < Record::NUM_CHUNKS {
616 record[s_bucket]
617 } else {
618 parity_record_chunks[s_bucket - Record::NUM_CHUNKS]
619 };
620 record[num_found_proofs] = (Simd::from(record_chunk)
622 ^ Simd::from(*pos_proofs.proofs[num_found_proofs].hash()))
623 .to_array();
624 num_found_proofs += 1;
625 }
626 }
627 }
628}
629
630async fn download_sector_internal<PG>(
631 pieces_to_download: &mut HashMap<PieceIndex, Vec<(&mut Record, &mut RecordMetadata)>>,
632 piece_getter: &PG,
633 erasure_coding: &ErasureCoding,
634) -> Result<(), PlottingError>
635where
636 PG: PieceGetter + Send + Sync,
637{
638 let recovery_semaphore = &Semaphore::new(RECONSTRUCTION_CONCURRENCY_LIMIT);
641
642 let piece_indices = pieces_to_download.keys().copied().collect::<Vec<_>>();
644 let mut downloaded_pieces = piece_getter
645 .get_pieces(piece_indices)
646 .await
647 .map_err(|error| PlottingError::FailedToRetrievePieces { error })?
648 .fuse();
649 let mut reconstructed_pieces = FuturesUnordered::new();
650
651 let mut final_result = Ok(());
652
653 loop {
654 let (piece_index, result) = select! {
655 (piece_index, result) = downloaded_pieces.select_next_some() => {
656 match result {
657 Ok(Some(piece)) => (piece_index, Ok(piece)),
658 Ok(None) => {
659 trace!(%piece_index, "Piece was not found, trying reconstruction");
660
661 reconstructed_pieces.push(reconstruct_piece(
662 piece_index,
663 recovery_semaphore,
664 piece_getter,
665 erasure_coding,
666 ));
667 continue;
668 }
669 Err(error) => {
670 trace!(
671 %error,
672 %piece_index,
673 "Failed to download piece, trying reconstruction"
674 );
675
676 reconstructed_pieces.push(reconstruct_piece(
677 piece_index,
678 recovery_semaphore,
679 piece_getter,
680 erasure_coding,
681 ));
682 continue;
683 }
684 }
685 },
686 (piece_index, result) = reconstructed_pieces.select_next_some() => {
687 (piece_index, result)
688 },
689 complete => {
690 break;
691 }
692 };
693
694 match result {
695 Ok(piece) => {
696 process_piece(piece_index, piece, pieces_to_download);
697 }
698 Err(error) => {
699 trace!(%error, %piece_index, "Failed to download piece");
700
701 if final_result.is_ok() {
702 final_result = Err(error);
703 }
704 }
705 }
706 }
707
708 if final_result.is_ok() && !pieces_to_download.is_empty() {
709 return Err(PlottingError::FailedToRetrievePieces {
710 error: anyhow::anyhow!(
711 "Successful result, but not all pieces were downloaded, this is likely a piece \
712 getter implementation bug"
713 ),
714 });
715 }
716
717 final_result
718}
719
720async fn reconstruct_piece<PG>(
721 piece_index: PieceIndex,
722 recovery_semaphore: &Semaphore,
723 piece_getter: &PG,
724 erasure_coding: &ErasureCoding,
725) -> (PieceIndex, Result<Piece, PlottingError>)
726where
727 PG: PieceGetter + Send + Sync,
728{
729 let _permit = recovery_semaphore.acquire().await;
730 let recovered_piece_fut =
731 recover_missing_piece(piece_getter, erasure_coding.clone(), piece_index);
732
733 (
734 piece_index,
735 recovered_piece_fut
736 .await
737 .map_err(|error| PlottingError::PieceRecoveryFailed {
738 piece_index,
739 error: error.into(),
740 }),
741 )
742}
743
744fn process_piece(
745 piece_index: PieceIndex,
746 piece: Piece,
747 pieces_to_download: &mut HashMap<PieceIndex, Vec<(&mut Record, &mut RecordMetadata)>>,
748) {
749 for (record, metadata) in pieces_to_download.remove(&piece_index).unwrap_or_default() {
750 *metadata = RecordMetadata {
751 piece_header: piece.header,
752 piece_checksum: blake3::hash(piece.as_ref()).into(),
753 };
754 record.copy_from_slice(&*piece.record);
757 }
758}