1mod block_sealing;
8pub mod direct_io_file_wrapper;
9pub mod farming;
10pub mod identity;
11mod metrics;
12pub mod piece_cache;
13pub mod piece_reader;
14pub mod plot_cache;
15mod plotted_sectors;
16mod plotting;
17
18use crate::disk_piece_cache::{DiskPieceCache, DiskPieceCacheError};
19use crate::farm::{
20 Farm, FarmId, FarmingError, FarmingNotification, HandlerFn, PieceCacheId, PieceReader,
21 PlottedSectors, SectorUpdate,
22};
23use crate::node_client::NodeClient;
24use crate::plotter::Plotter;
25use crate::single_disk_farm::block_sealing::block_sealing;
26use crate::single_disk_farm::direct_io_file_wrapper::{DISK_PAGE_SIZE, DirectIoFileWrapper};
27use crate::single_disk_farm::farming::rayon_files::RayonFiles;
28use crate::single_disk_farm::farming::{
29 FarmingOptions, PlotAudit, farming, slot_notification_forwarder,
30};
31use crate::single_disk_farm::identity::{Identity, IdentityError};
32use crate::single_disk_farm::metrics::SingleDiskFarmMetrics;
33use crate::single_disk_farm::piece_cache::SingleDiskPieceCache;
34use crate::single_disk_farm::piece_reader::DiskPieceReader;
35use crate::single_disk_farm::plot_cache::DiskPlotCache;
36use crate::single_disk_farm::plotted_sectors::SingleDiskPlottedSectors;
37pub use crate::single_disk_farm::plotting::PlottingError;
38use crate::single_disk_farm::plotting::{
39 PlottingOptions, PlottingSchedulerOptions, SectorPlottingOptions, plotting, plotting_scheduler,
40};
41use crate::utils::{AsyncJoinOnDrop, tokio_rayon_spawn_handler};
42use crate::{KNOWN_PEERS_CACHE_SIZE, farm};
43use ab_core_primitives::address::Address;
44use ab_core_primitives::block::BlockRoot;
45use ab_core_primitives::ed25519::Ed25519PublicKey;
46use ab_core_primitives::hashes::Blake3Hash;
47use ab_core_primitives::pieces::Record;
48use ab_core_primitives::sectors::SectorIndex;
49use ab_core_primitives::segments::{HistorySize, SegmentIndex};
50use ab_erasure_coding::ErasureCoding;
51use ab_farmer_components::FarmerProtocolInfo;
52use ab_farmer_components::file_ext::FileExt;
53use ab_farmer_components::sector::{SectorMetadata, SectorMetadataChecksummed, sector_size};
54use ab_farmer_components::shard_commitment::ShardCommitmentsRootsCache;
55use ab_farmer_rpc_primitives::{FarmerAppInfo, SolutionResponse};
56use ab_networking::KnownPeersManager;
57use ab_proof_of_space::Table;
58use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
59use async_trait::async_trait;
60use bytesize::ByteSize;
61use event_listener_primitives::{Bag, HandlerId};
62use futures::channel::{mpsc, oneshot};
63use futures::stream::FuturesUnordered;
64use futures::{FutureExt, StreamExt, select};
65use parity_scale_codec::{Decode, Encode};
66use parking_lot::Mutex;
67use prometheus_client::registry::Registry;
68use rayon::prelude::*;
69use rayon::{ThreadPoolBuildError, ThreadPoolBuilder};
70use serde::{Deserialize, Serialize};
71use std::collections::HashSet;
72use std::fs::{File, OpenOptions};
73use std::future::Future;
74use std::io::Write;
75use std::num::{NonZeroU32, NonZeroUsize};
76use std::path::{Path, PathBuf};
77use std::pin::Pin;
78use std::str::FromStr;
79use std::sync::Arc;
80use std::sync::atomic::{AtomicUsize, Ordering};
81use std::time::Duration;
82use std::{fmt, fs, io, mem};
83use thiserror::Error;
84use tokio::runtime::Handle;
85use tokio::sync::broadcast;
86use tokio::task;
87use tracing::{Instrument, Span, error, info, trace, warn};
88
89const _: () = {
92 assert!(mem::size_of::<usize>() >= mem::size_of::<u64>());
93};
94
95const RESERVED_PLOT_METADATA: u64 = 1024 * 1024;
97const RESERVED_FARM_INFO: u64 = 1024 * 1024;
99const NEW_SEGMENT_PROCESSING_DELAY: Duration = Duration::from_mins(10);
100
101#[derive(Debug)]
104#[must_use = "Lock file must be kept around or as long as farm is used"]
105pub struct SingleDiskFarmInfoLock {
106 _file: File,
107}
108
109#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
111#[serde(rename_all = "camelCase")]
112pub enum SingleDiskFarmInfo {
113 #[serde(rename_all = "camelCase")]
115 V0 {
116 id: FarmId,
118 genesis_root: BlockRoot,
120 public_key: Ed25519PublicKey,
122 shard_commitments_seed: Blake3Hash,
124 pieces_in_sector: u16,
126 allocated_space: u64,
128 },
129}
130
131impl SingleDiskFarmInfo {
132 const FILE_NAME: &'static str = "single_disk_farm.json";
133
134 pub fn new(
136 id: FarmId,
137 genesis_root: BlockRoot,
138 public_key: Ed25519PublicKey,
139 shard_commitments_seed: Blake3Hash,
140 pieces_in_sector: u16,
141 allocated_space: u64,
142 ) -> Self {
143 Self::V0 {
144 id,
145 genesis_root,
146 public_key,
147 shard_commitments_seed,
148 pieces_in_sector,
149 allocated_space,
150 }
151 }
152
153 pub fn load_from(directory: &Path) -> io::Result<Option<Self>> {
156 let bytes = match fs::read(directory.join(Self::FILE_NAME)) {
157 Ok(bytes) => bytes,
158 Err(error) => {
159 return if error.kind() == io::ErrorKind::NotFound {
160 Ok(None)
161 } else {
162 Err(error)
163 };
164 }
165 };
166
167 serde_json::from_slice(&bytes)
168 .map(Some)
169 .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))
170 }
171
172 pub fn store_to(
176 &self,
177 directory: &Path,
178 lock: bool,
179 ) -> io::Result<Option<SingleDiskFarmInfoLock>> {
180 let mut file = OpenOptions::new()
181 .write(true)
182 .create(true)
183 .truncate(false)
184 .open(directory.join(Self::FILE_NAME))?;
185 if lock {
186 fs4::fs_std::FileExt::try_lock_exclusive(&file)?;
187 }
188 file.set_len(0)?;
189 file.write_all(&serde_json::to_vec(self).expect("Info serialization never fails; qed"))?;
190
191 Ok(lock.then_some(SingleDiskFarmInfoLock { _file: file }))
192 }
193
194 pub fn try_lock(directory: &Path) -> io::Result<SingleDiskFarmInfoLock> {
197 let file = File::open(directory.join(Self::FILE_NAME))?;
198 fs4::fs_std::FileExt::try_lock_exclusive(&file)?;
199
200 Ok(SingleDiskFarmInfoLock { _file: file })
201 }
202
203 pub fn id(&self) -> &FarmId {
205 let Self::V0 { id, .. } = self;
206 id
207 }
208
209 pub fn genesis_root(&self) -> &BlockRoot {
211 let Self::V0 { genesis_root, .. } = self;
212 genesis_root
213 }
214
215 pub fn public_key(&self) -> &Ed25519PublicKey {
217 let Self::V0 { public_key, .. } = self;
218 public_key
219 }
220
221 pub fn shard_commitments_seed(&self) -> &Blake3Hash {
223 let Self::V0 {
224 shard_commitments_seed,
225 ..
226 } = self;
227 shard_commitments_seed
228 }
229
230 pub fn pieces_in_sector(&self) -> u16 {
232 match self {
233 SingleDiskFarmInfo::V0 {
234 pieces_in_sector, ..
235 } => *pieces_in_sector,
236 }
237 }
238
239 pub fn allocated_space(&self) -> u64 {
241 match self {
242 SingleDiskFarmInfo::V0 {
243 allocated_space, ..
244 } => *allocated_space,
245 }
246 }
247}
248
249#[derive(Debug)]
251pub enum SingleDiskFarmSummary {
252 Found {
254 info: SingleDiskFarmInfo,
256 directory: PathBuf,
258 },
259 NotFound {
261 directory: PathBuf,
263 },
264 Error {
266 directory: PathBuf,
268 error: io::Error,
270 },
271}
272
273#[derive(Debug, Encode, Decode)]
274struct PlotMetadataHeader {
275 version: u8,
276 plotted_sector_count: u16,
277}
278
279impl PlotMetadataHeader {
280 #[inline]
281 fn encoded_size() -> usize {
282 let default = PlotMetadataHeader {
283 version: 0,
284 plotted_sector_count: 0,
285 };
286
287 default.encoded_size()
288 }
289}
290
291#[derive(Debug)]
293pub struct SingleDiskFarmOptions<'a, NC>
294where
295 NC: Clone,
296{
297 pub directory: PathBuf,
299 pub farmer_app_info: FarmerAppInfo,
301 pub allocated_space: u64,
303 pub max_pieces_in_sector: u16,
305 pub node_client: NC,
307 pub reward_address: Address,
309 pub plotter: Arc<dyn Plotter + Send + Sync>,
311 pub erasure_coding: ErasureCoding,
313 pub cache_percentage: u8,
315 pub farming_thread_pool_size: usize,
318 pub plotting_delay: Option<oneshot::Receiver<()>>,
321 pub global_mutex: Arc<AsyncMutex<()>>,
325 pub max_plotting_sectors_per_farm: NonZeroUsize,
327 pub disable_farm_locking: bool,
329 pub registry: Option<&'a Mutex<&'a mut Registry>>,
331 pub create: bool,
333}
334
335#[derive(Debug, Error)]
337pub enum SingleDiskFarmError {
338 #[error("Failed to open or create identity: {0}")]
340 FailedToOpenIdentity(#[from] IdentityError),
341 #[error("Farm is likely already in use, make sure no other farmer is using it: {0}")]
343 LikelyAlreadyInUse(io::Error),
344 #[error("Single disk farm I/O error: {0}")]
346 Io(#[from] io::Error),
347 #[error("Failed to spawn task for blocking thread: {0}")]
349 TokioJoinError(#[from] task::JoinError),
350 #[error("Piece cache error: {0}")]
352 PieceCacheError(#[from] DiskPieceCacheError),
353 #[error("Can't preallocate metadata file, probably not enough space on disk: {0}")]
355 CantPreallocateMetadataFile(io::Error),
356 #[error("Can't preallocate plot file, probably not enough space on disk: {0}")]
358 CantPreallocatePlotFile(io::Error),
359 #[error(
361 "Genesis hash of farm {id} {wrong_chain} is different from {correct_chain} when farm was \
362 created, it is not possible to use farm on a different chain"
363 )]
364 WrongChain {
365 id: FarmId,
367 correct_chain: String,
370 wrong_chain: String,
372 },
373 #[error(
375 "Public key of farm {id} {wrong_public_key} is different from {correct_public_key} when \
376 farm was created, something went wrong, likely due to manual edits"
377 )]
378 IdentityMismatch {
379 id: FarmId,
381 correct_public_key: Ed25519PublicKey,
383 wrong_public_key: Ed25519PublicKey,
385 },
386 #[error(
388 "Invalid number pieces in sector: max supported {max_supported}, farm initialized with \
389 {initialized_with}"
390 )]
391 InvalidPiecesInSector {
392 id: FarmId,
394 max_supported: u16,
396 initialized_with: u16,
398 },
399 #[error("Failed to decode metadata header: {0}")]
401 FailedToDecodeMetadataHeader(parity_scale_codec::Error),
402 #[error("Unexpected metadata version {0}")]
404 UnexpectedMetadataVersion(u8),
405 #[error(
407 "Allocated space is not enough for one sector. \
408 The lowest acceptable value for allocated space is {min_space} bytes, \
409 provided {allocated_space} bytes."
410 )]
411 InsufficientAllocatedSpace {
412 min_space: u64,
414 allocated_space: u64,
416 },
417 #[error(
419 "Farm is too large: allocated {allocated_sectors} sectors ({allocated_space} bytes), max \
420 supported is {max_sectors} ({max_space} bytes). Consider creating multiple smaller farms \
421 instead."
422 )]
423 FarmTooLarge {
424 allocated_space: u64,
426 allocated_sectors: u64,
428 max_space: u64,
430 max_sectors: u16,
432 },
433 #[error("Failed to create thread pool: {0}")]
435 FailedToCreateThreadPool(ThreadPoolBuildError),
436}
437
438#[derive(Debug, Error)]
440pub enum SingleDiskFarmScrubError {
441 #[error("Farm is likely already in use, make sure no other farmer is using it: {0}")]
443 LikelyAlreadyInUse(io::Error),
444 #[error("Failed to file size of {file}: {error}")]
446 FailedToDetermineFileSize {
447 file: PathBuf,
449 error: io::Error,
451 },
452 #[error("Failed to read {size} bytes from {file} at offset {offset}: {error}")]
454 FailedToReadBytes {
455 file: PathBuf,
457 size: u64,
459 offset: u64,
461 error: io::Error,
463 },
464 #[error("Failed to write {size} bytes from {file} at offset {offset}: {error}")]
466 FailedToWriteBytes {
467 file: PathBuf,
469 size: u64,
471 offset: u64,
473 error: io::Error,
475 },
476 #[error("Farm info file does not exist at {file}")]
478 FarmInfoFileDoesNotExist {
479 file: PathBuf,
481 },
482 #[error("Farm info at {file} can't be opened: {error}")]
484 FarmInfoCantBeOpened {
485 file: PathBuf,
487 error: io::Error,
489 },
490 #[error("Identity file does not exist at {file}")]
492 IdentityFileDoesNotExist {
493 file: PathBuf,
495 },
496 #[error("Identity at {file} can't be opened: {error}")]
498 IdentityCantBeOpened {
499 file: PathBuf,
501 error: IdentityError,
503 },
504 #[error("Identity public key {identity} doesn't match public key in the disk farm info {info}")]
506 PublicKeyMismatch {
507 identity: Ed25519PublicKey,
509 info: Ed25519PublicKey,
511 },
512 #[error("Metadata file does not exist at {file}")]
514 MetadataFileDoesNotExist {
515 file: PathBuf,
517 },
518 #[error("Metadata at {file} can't be opened: {error}")]
520 MetadataCantBeOpened {
521 file: PathBuf,
523 error: io::Error,
525 },
526 #[error(
528 "Metadata file at {file} is too small: reserved size is {reserved_size} bytes, file size \
529 is {size}"
530 )]
531 MetadataFileTooSmall {
532 file: PathBuf,
534 reserved_size: u64,
536 size: u64,
538 },
539 #[error("Failed to decode metadata header: {0}")]
541 FailedToDecodeMetadataHeader(parity_scale_codec::Error),
542 #[error("Unexpected metadata version {0}")]
544 UnexpectedMetadataVersion(u8),
545 #[error("Cache at {file} can't be opened: {error}")]
547 CacheCantBeOpened {
548 file: PathBuf,
550 error: io::Error,
552 },
553}
554
555#[derive(Debug, Error)]
557pub enum BackgroundTaskError {
558 #[error(transparent)]
560 Plotting(#[from] PlottingError),
561 #[error(transparent)]
563 Farming(#[from] FarmingError),
564 #[error(transparent)]
566 BlockSealing(#[from] anyhow::Error),
567 #[error("Background task {task} panicked")]
569 BackgroundTaskPanicked {
570 task: String,
572 },
573}
574
575type BackgroundTask = Pin<Box<dyn Future<Output = Result<(), BackgroundTaskError>> + Send>>;
576
577#[derive(Debug, Copy, Clone)]
579pub enum ScrubTarget {
580 All,
582 Metadata,
584 Plot,
586 Cache,
588}
589
590impl fmt::Display for ScrubTarget {
591 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
592 match self {
593 Self::All => f.write_str("all"),
594 Self::Metadata => f.write_str("metadata"),
595 Self::Plot => f.write_str("plot"),
596 Self::Cache => f.write_str("cache"),
597 }
598 }
599}
600
601impl FromStr for ScrubTarget {
602 type Err = String;
603
604 fn from_str(s: &str) -> Result<Self, Self::Err> {
605 match s {
606 "all" => Ok(Self::All),
607 "metadata" => Ok(Self::Metadata),
608 "plot" => Ok(Self::Plot),
609 "cache" => Ok(Self::Cache),
610 s => Err(format!("Can't parse {s} as `ScrubTarget`")),
611 }
612 }
613}
614
615impl ScrubTarget {
616 fn metadata(&self) -> bool {
617 match self {
618 Self::All | Self::Metadata | Self::Plot => true,
619 Self::Cache => false,
620 }
621 }
622
623 fn plot(&self) -> bool {
624 match self {
625 Self::All | Self::Plot => true,
626 Self::Metadata | Self::Cache => false,
627 }
628 }
629
630 fn cache(&self) -> bool {
631 match self {
632 Self::All | Self::Cache => true,
633 Self::Metadata | Self::Plot => false,
634 }
635 }
636}
637
638struct AllocatedSpaceDistribution {
639 piece_cache_file_size: u64,
640 piece_cache_capacity: u32,
641 plot_file_size: u64,
642 target_sector_count: u16,
643 metadata_file_size: u64,
644}
645
646impl AllocatedSpaceDistribution {
647 fn new(
648 allocated_space: u64,
649 sector_size: u64,
650 cache_percentage: u8,
651 sector_metadata_size: u64,
652 ) -> Result<Self, SingleDiskFarmError> {
653 let single_sector_overhead = sector_size + sector_metadata_size;
654 let fixed_space_usage = RESERVED_PLOT_METADATA
656 + RESERVED_FARM_INFO
657 + Identity::file_size() as u64
658 + KnownPeersManager::file_size(KNOWN_PEERS_CACHE_SIZE) as u64;
659 let target_sector_count = {
661 let potentially_plottable_space = allocated_space.saturating_sub(fixed_space_usage)
662 / 100
663 * (100 - u64::from(cache_percentage));
664 (potentially_plottable_space - DISK_PAGE_SIZE as u64) / single_sector_overhead
667 };
668
669 if target_sector_count == 0 {
670 let mut single_plot_with_cache_space =
671 single_sector_overhead.div_ceil(100 - u64::from(cache_percentage)) * 100;
672 if single_plot_with_cache_space - single_sector_overhead
675 < DiskPieceCache::element_size() as u64
676 {
677 single_plot_with_cache_space =
678 single_sector_overhead + DiskPieceCache::element_size() as u64;
679 }
680
681 return Err(SingleDiskFarmError::InsufficientAllocatedSpace {
682 min_space: fixed_space_usage + single_plot_with_cache_space,
683 allocated_space,
684 });
685 }
686 let plot_file_size = target_sector_count * sector_size;
687 let plot_file_size = plot_file_size.div_ceil(DISK_PAGE_SIZE as u64) * DISK_PAGE_SIZE as u64;
689
690 let piece_cache_capacity = if cache_percentage > 0 {
692 let cache_space = allocated_space
693 - fixed_space_usage
694 - plot_file_size
695 - (sector_metadata_size * target_sector_count);
696 (cache_space / u64::from(DiskPieceCache::element_size())) as u32
697 } else {
698 0
699 };
700 let target_sector_count = match u16::try_from(target_sector_count).map(SectorIndex::from) {
701 Ok(target_sector_count) if target_sector_count < SectorIndex::MAX => {
702 u16::from(target_sector_count)
703 }
704 _ => {
705 let max_sectors = u16::from(SectorIndex::MAX) - 1;
708 return Err(SingleDiskFarmError::FarmTooLarge {
709 allocated_space: target_sector_count * sector_size,
710 allocated_sectors: target_sector_count,
711 max_space: max_sectors as u64 * sector_size,
712 max_sectors,
713 });
714 }
715 };
716
717 Ok(Self {
718 piece_cache_file_size: u64::from(piece_cache_capacity)
719 * u64::from(DiskPieceCache::element_size()),
720 piece_cache_capacity,
721 plot_file_size,
722 target_sector_count,
723 metadata_file_size: RESERVED_PLOT_METADATA
724 + sector_metadata_size * u64::from(target_sector_count),
725 })
726 }
727}
728
729type Handler<A> = Bag<HandlerFn<A>, A>;
730
731#[derive(Default, Debug)]
732struct Handlers {
733 sector_update: Handler<(SectorIndex, SectorUpdate)>,
734 farming_notification: Handler<FarmingNotification>,
735 solution: Handler<SolutionResponse>,
736}
737
738struct SingleDiskFarmInit {
739 identity: Identity,
740 single_disk_farm_info: SingleDiskFarmInfo,
741 single_disk_farm_info_lock: Option<SingleDiskFarmInfoLock>,
742 plot_file: Arc<DirectIoFileWrapper>,
743 metadata_file: DirectIoFileWrapper,
744 metadata_header: PlotMetadataHeader,
745 target_sector_count: u16,
746 sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
747 piece_cache_capacity: u32,
748 plot_cache: DiskPlotCache,
749}
750
751#[derive(Debug)]
756#[must_use = "Plot does not function properly unless run() method is called"]
757pub struct SingleDiskFarm {
758 farmer_protocol_info: FarmerProtocolInfo,
759 single_disk_farm_info: SingleDiskFarmInfo,
760 shard_commitments_roots_cache: ShardCommitmentsRootsCache,
761 sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
763 pieces_in_sector: u16,
764 total_sectors_count: u16,
765 span: Span,
766 tasks: FuturesUnordered<BackgroundTask>,
767 handlers: Arc<Handlers>,
768 piece_cache: SingleDiskPieceCache,
769 plot_cache: DiskPlotCache,
770 piece_reader: DiskPieceReader,
771 start_sender: Option<broadcast::Sender<()>>,
773 stop_sender: Option<broadcast::Sender<()>>,
775 _single_disk_farm_info_lock: Option<SingleDiskFarmInfoLock>,
776}
777
778impl Drop for SingleDiskFarm {
779 #[inline]
780 fn drop(&mut self) {
781 self.piece_reader.close_all_readers();
782 self.start_sender.take();
784 self.stop_sender.take();
786 }
787}
788
789#[async_trait(?Send)]
790impl Farm for SingleDiskFarm {
791 fn id(&self) -> &FarmId {
792 self.id()
793 }
794
795 fn total_sectors_count(&self) -> u16 {
796 self.total_sectors_count
797 }
798
799 fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static> {
800 Arc::new(self.plotted_sectors())
801 }
802
803 fn piece_reader(&self) -> Arc<dyn PieceReader + 'static> {
804 Arc::new(self.piece_reader())
805 }
806
807 fn on_sector_update(
808 &self,
809 callback: HandlerFn<(SectorIndex, SectorUpdate)>,
810 ) -> Box<dyn farm::HandlerId> {
811 Box::new(self.on_sector_update(callback))
812 }
813
814 fn on_farming_notification(
815 &self,
816 callback: HandlerFn<FarmingNotification>,
817 ) -> Box<dyn farm::HandlerId> {
818 Box::new(self.on_farming_notification(callback))
819 }
820
821 fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn farm::HandlerId> {
822 Box::new(self.on_solution(callback))
823 }
824
825 fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
826 Box::pin((*self).run())
827 }
828}
829
830impl SingleDiskFarm {
831 pub const PLOT_FILE: &'static str = "plot.bin";
833 pub const METADATA_FILE: &'static str = "metadata.bin";
835 const SUPPORTED_PLOT_VERSION: u8 = 0;
836
837 pub async fn new<NC, PosTable>(
839 options: SingleDiskFarmOptions<'_, NC>,
840 farm_index: usize,
841 ) -> Result<Self, SingleDiskFarmError>
842 where
843 NC: NodeClient + Clone,
844 PosTable: Table,
845 {
846 let span = Span::current();
847
848 let SingleDiskFarmOptions {
849 directory,
850 farmer_app_info,
851 allocated_space,
852 max_pieces_in_sector,
853 node_client,
854 reward_address,
855 plotter,
856 erasure_coding,
857 cache_percentage,
858 farming_thread_pool_size,
859 plotting_delay,
860 global_mutex,
861 max_plotting_sectors_per_farm,
862 disable_farm_locking,
863 registry,
864 create,
865 } = options;
866
867 let single_disk_farm_init_fut = task::spawn_blocking({
868 let directory = directory.clone();
869 let farmer_app_info = farmer_app_info.clone();
870 let span = span.clone();
871
872 move || {
873 let _span_guard = span.enter();
874 Self::init(
875 &directory,
876 &farmer_app_info,
877 allocated_space,
878 max_pieces_in_sector,
879 cache_percentage,
880 disable_farm_locking,
881 create,
882 )
883 }
884 });
885
886 let single_disk_farm_init =
887 AsyncJoinOnDrop::new(single_disk_farm_init_fut, false).await??;
888
889 let SingleDiskFarmInit {
890 identity,
891 single_disk_farm_info,
892 single_disk_farm_info_lock,
893 plot_file,
894 metadata_file,
895 metadata_header,
896 target_sector_count,
897 sectors_metadata,
898 piece_cache_capacity,
899 plot_cache,
900 } = single_disk_farm_init;
901
902 let piece_cache = {
903 let FarmId::Ulid(id) = *single_disk_farm_info.id();
905 let id = PieceCacheId::Ulid(id);
906
907 SingleDiskPieceCache::new(
908 id,
909 if let Some(piece_cache_capacity) = NonZeroU32::new(piece_cache_capacity) {
910 Some(task::block_in_place(|| {
911 if let Some(registry) = registry {
912 DiskPieceCache::open(
913 &directory,
914 piece_cache_capacity,
915 Some(id),
916 Some(*registry.lock()),
917 )
918 } else {
919 DiskPieceCache::open(&directory, piece_cache_capacity, Some(id), None)
920 }
921 })?)
922 } else {
923 None
924 },
925 )
926 };
927
928 let public_key = *single_disk_farm_info.public_key();
929 let shard_commitments_roots_cache =
930 ShardCommitmentsRootsCache::new(*single_disk_farm_info.shard_commitments_seed());
931 let pieces_in_sector = single_disk_farm_info.pieces_in_sector();
932 let sector_size = sector_size(pieces_in_sector);
933
934 let metrics = registry.map(|registry| {
935 Arc::new(SingleDiskFarmMetrics::new(
936 *registry.lock(),
937 single_disk_farm_info.id(),
938 target_sector_count,
939 sectors_metadata.read_blocking().len() as u16,
940 ))
941 });
942
943 let (error_sender, error_receiver) = oneshot::channel();
944 let error_sender = Arc::new(Mutex::new(Some(error_sender)));
945
946 let tasks = FuturesUnordered::<BackgroundTask>::new();
947
948 tasks.push(Box::pin(async move {
949 if let Ok(error) = error_receiver.await {
950 return Err(error);
951 }
952
953 Ok(())
954 }));
955
956 let handlers = Arc::<Handlers>::default();
957 let (start_sender, mut start_receiver) = broadcast::channel::<()>(1);
958 let (stop_sender, mut stop_receiver) = broadcast::channel::<()>(1);
959 let sectors_being_modified = Arc::<AsyncRwLock<HashSet<SectorIndex>>>::default();
960 let (sectors_to_plot_sender, sectors_to_plot_receiver) = mpsc::channel(1);
961 let sectors_indices_left_to_plot = SectorIndex::new(metadata_header.plotted_sector_count)
963 ..SectorIndex::new(target_sector_count);
964
965 let farming_thread_pool = ThreadPoolBuilder::new()
966 .thread_name(move |thread_index| format!("farming-{farm_index}.{thread_index}"))
967 .num_threads(farming_thread_pool_size)
968 .spawn_handler(tokio_rayon_spawn_handler())
969 .build()
970 .map_err(SingleDiskFarmError::FailedToCreateThreadPool)?;
971 let farming_plot_fut = task::spawn_blocking(|| {
972 farming_thread_pool
973 .install(move || {
974 RayonFiles::open_with(directory.join(Self::PLOT_FILE), |path| {
975 DirectIoFileWrapper::open(path)
976 })
977 })
978 .map(|farming_plot| (farming_plot, farming_thread_pool))
979 });
980
981 let (farming_plot, farming_thread_pool) =
982 AsyncJoinOnDrop::new(farming_plot_fut, false).await??;
983
984 let plotting_join_handle = task::spawn_blocking({
985 let shard_commitments_roots_cache = shard_commitments_roots_cache.clone();
986 let sectors_metadata = Arc::clone(§ors_metadata);
987 let handlers = Arc::clone(&handlers);
988 let sectors_being_modified = Arc::clone(§ors_being_modified);
989 let node_client = node_client.clone();
990 let plot_file = Arc::clone(&plot_file);
991 let error_sender = Arc::clone(&error_sender);
992 let span = span.clone();
993 let global_mutex = Arc::clone(&global_mutex);
994 let metrics = metrics.clone();
995
996 move || {
997 let _span_guard = span.enter();
998
999 let plotting_options = PlottingOptions {
1000 metadata_header,
1001 sectors_metadata: §ors_metadata,
1002 sectors_being_modified: §ors_being_modified,
1003 sectors_to_plot_receiver,
1004 sector_plotting_options: SectorPlottingOptions {
1005 public_key,
1006 shard_commitments_roots_cache,
1007 node_client: &node_client,
1008 pieces_in_sector,
1009 sector_size,
1010 plot_file,
1011 metadata_file: Arc::new(metadata_file),
1012 handlers: &handlers,
1013 global_mutex: &global_mutex,
1014 plotter,
1015 metrics,
1016 },
1017 max_plotting_sectors_per_farm,
1018 };
1019
1020 let plotting_fut = async {
1021 if start_receiver.recv().await.is_err() {
1022 return Ok(());
1024 }
1025
1026 if let Some(plotting_delay) = plotting_delay
1027 && plotting_delay.await.is_err()
1028 {
1029 return Ok(());
1031 }
1032
1033 plotting(plotting_options).await
1034 };
1035
1036 Handle::current().block_on(async {
1037 select! {
1038 plotting_result = plotting_fut.fuse() => {
1039 if let Err(error) = plotting_result
1040 && let Some(error_sender) = error_sender.lock().take()
1041 && let Err(error) = error_sender.send(error.into())
1042 {
1043 error!(
1044 %error,
1045 "Plotting failed to send error to background task"
1046 );
1047 }
1048 }
1049 _ = stop_receiver.recv().fuse() => {
1050 }
1052 }
1053 });
1054 }
1055 });
1056 let plotting_join_handle = AsyncJoinOnDrop::new(plotting_join_handle, false);
1057
1058 tasks.push(Box::pin(async move {
1059 plotting_join_handle.await.map_err(|_error| {
1061 BackgroundTaskError::BackgroundTaskPanicked {
1062 task: format!("plotting-{farm_index}"),
1063 }
1064 })
1065 }));
1066
1067 let public_key_hash = public_key.hash();
1068
1069 let plotting_scheduler_options = PlottingSchedulerOptions {
1070 public_key_hash,
1071 shard_commitments_roots_cache: shard_commitments_roots_cache.clone(),
1072 sectors_indices_left_to_plot,
1073 target_sector_count,
1074 last_archived_segment_index: farmer_app_info.protocol_info.history_size.segment_index(),
1075 min_sector_lifetime: farmer_app_info.protocol_info.min_sector_lifetime,
1076 node_client: node_client.clone(),
1077 handlers: Arc::clone(&handlers),
1078 sectors_metadata: Arc::clone(§ors_metadata),
1079 sectors_to_plot_sender,
1080 new_segment_processing_delay: NEW_SEGMENT_PROCESSING_DELAY,
1081 metrics: metrics.clone(),
1082 };
1083 tasks.push(Box::pin(plotting_scheduler(plotting_scheduler_options)));
1084
1085 let (slot_info_forwarder_sender, slot_info_forwarder_receiver) = mpsc::channel(0);
1086
1087 tasks.push(Box::pin({
1088 let node_client = node_client.clone();
1089 let metrics = metrics.clone();
1090
1091 async move {
1092 slot_notification_forwarder(&node_client, slot_info_forwarder_sender, metrics)
1093 .await
1094 .map_err(BackgroundTaskError::Farming)
1095 }
1096 }));
1097
1098 let farming_join_handle = task::spawn_blocking({
1099 let shard_commitments_roots_cache = shard_commitments_roots_cache.clone();
1100 let erasure_coding = erasure_coding.clone();
1101 let handlers = Arc::clone(&handlers);
1102 let sectors_being_modified = Arc::clone(§ors_being_modified);
1103 let sectors_metadata = Arc::clone(§ors_metadata);
1104 let mut start_receiver = start_sender.subscribe();
1105 let mut stop_receiver = stop_sender.subscribe();
1106 let node_client = node_client.clone();
1107 let span = span.clone();
1108 let global_mutex = Arc::clone(&global_mutex);
1109
1110 move || {
1111 let _span_guard = span.enter();
1112
1113 let farming_fut = async move {
1114 if start_receiver.recv().await.is_err() {
1115 return Ok(());
1117 }
1118
1119 let plot_audit = PlotAudit::new(&farming_plot);
1120
1121 let farming_options = FarmingOptions {
1122 public_key_hash,
1123 shard_commitments_roots_cache,
1124 reward_address,
1125 node_client,
1126 plot_audit,
1127 sectors_metadata,
1128 erasure_coding,
1129 handlers,
1130 sectors_being_modified,
1131 slot_info_notifications: slot_info_forwarder_receiver,
1132 thread_pool: farming_thread_pool,
1133 global_mutex,
1134 metrics,
1135 };
1136 farming::<PosTable, _, _>(farming_options).await
1137 };
1138
1139 Handle::current().block_on(async {
1140 select! {
1141 farming_result = farming_fut.fuse() => {
1142 if let Err(error) = farming_result
1143 && let Some(error_sender) = error_sender.lock().take()
1144 && let Err(error) = error_sender.send(error.into())
1145 {
1146 error!(
1147 %error,
1148 "Farming failed to send error to background task",
1149 );
1150 }
1151 }
1152 _ = stop_receiver.recv().fuse() => {
1153 }
1155 }
1156 });
1157 }
1158 });
1159 let farming_join_handle = AsyncJoinOnDrop::new(farming_join_handle, false);
1160
1161 tasks.push(Box::pin(async move {
1162 farming_join_handle.await.map_err(|_error| {
1164 BackgroundTaskError::BackgroundTaskPanicked {
1165 task: format!("farming-{farm_index}"),
1166 }
1167 })
1168 }));
1169
1170 let (piece_reader, reading_fut) = DiskPieceReader::new::<PosTable>(
1171 public_key_hash,
1172 shard_commitments_roots_cache.clone(),
1173 pieces_in_sector,
1174 plot_file,
1175 Arc::clone(§ors_metadata),
1176 erasure_coding,
1177 sectors_being_modified,
1178 global_mutex,
1179 );
1180
1181 let reading_join_handle = task::spawn_blocking({
1182 let mut stop_receiver = stop_sender.subscribe();
1183 let reading_fut = reading_fut.instrument(span.clone());
1184
1185 move || {
1186 Handle::current().block_on(async {
1187 select! {
1188 _ = reading_fut.fuse() => {
1189 }
1191 _ = stop_receiver.recv().fuse() => {
1192 }
1194 }
1195 });
1196 }
1197 });
1198
1199 let reading_join_handle = AsyncJoinOnDrop::new(reading_join_handle, false);
1200
1201 tasks.push(Box::pin(async move {
1202 reading_join_handle.await.map_err(|_error| {
1204 BackgroundTaskError::BackgroundTaskPanicked {
1205 task: format!("reading-{farm_index}"),
1206 }
1207 })
1208 }));
1209
1210 tasks.push(Box::pin(async move {
1211 match block_sealing(node_client, identity).await {
1212 Ok(block_sealing_fut) => {
1213 block_sealing_fut.await;
1214 }
1215 Err(error) => {
1216 return Err(BackgroundTaskError::BlockSealing(anyhow::anyhow!(
1217 "Failed to subscribe to block sealing notifications: {error}"
1218 )));
1219 }
1220 }
1221
1222 Ok(())
1223 }));
1224
1225 let farm = Self {
1226 farmer_protocol_info: farmer_app_info.protocol_info,
1227 single_disk_farm_info,
1228 shard_commitments_roots_cache,
1229 sectors_metadata,
1230 pieces_in_sector,
1231 total_sectors_count: target_sector_count,
1232 span,
1233 tasks,
1234 handlers,
1235 piece_cache,
1236 plot_cache,
1237 piece_reader,
1238 start_sender: Some(start_sender),
1239 stop_sender: Some(stop_sender),
1240 _single_disk_farm_info_lock: single_disk_farm_info_lock,
1241 };
1242 Ok(farm)
1243 }
1244
1245 fn init(
1246 directory: &PathBuf,
1247 farmer_app_info: &FarmerAppInfo,
1248 allocated_space: u64,
1249 max_pieces_in_sector: u16,
1250 cache_percentage: u8,
1251 disable_farm_locking: bool,
1252 create: bool,
1253 ) -> Result<SingleDiskFarmInit, SingleDiskFarmError> {
1254 fs::create_dir_all(directory)?;
1255
1256 let identity = if create {
1257 Identity::open_or_create(directory)?
1258 } else {
1259 Identity::open(directory)?.ok_or_else(|| {
1260 IdentityError::Io(io::Error::new(
1261 io::ErrorKind::NotFound,
1262 "Farm does not exist and creation was explicitly disabled",
1263 ))
1264 })?
1265 };
1266 let public_key = identity.public_key();
1267
1268 let (single_disk_farm_info, single_disk_farm_info_lock) =
1269 match SingleDiskFarmInfo::load_from(directory)? {
1270 Some(mut single_disk_farm_info) => {
1271 if &farmer_app_info.genesis_root != single_disk_farm_info.genesis_root() {
1272 return Err(SingleDiskFarmError::WrongChain {
1273 id: *single_disk_farm_info.id(),
1274 correct_chain: hex::encode(single_disk_farm_info.genesis_root()),
1275 wrong_chain: hex::encode(farmer_app_info.genesis_root),
1276 });
1277 }
1278
1279 if &public_key != single_disk_farm_info.public_key() {
1280 return Err(SingleDiskFarmError::IdentityMismatch {
1281 id: *single_disk_farm_info.id(),
1282 correct_public_key: *single_disk_farm_info.public_key(),
1283 wrong_public_key: public_key,
1284 });
1285 }
1286
1287 let pieces_in_sector = single_disk_farm_info.pieces_in_sector();
1288
1289 if max_pieces_in_sector < pieces_in_sector {
1290 return Err(SingleDiskFarmError::InvalidPiecesInSector {
1291 id: *single_disk_farm_info.id(),
1292 max_supported: max_pieces_in_sector,
1293 initialized_with: pieces_in_sector,
1294 });
1295 }
1296
1297 if max_pieces_in_sector > pieces_in_sector {
1298 info!(
1299 pieces_in_sector,
1300 max_pieces_in_sector,
1301 "Farm initialized with smaller number of pieces in sector, farm needs \
1302 to be re-created for increase"
1303 );
1304 }
1305
1306 let mut single_disk_farm_info_lock = None;
1307
1308 if allocated_space != single_disk_farm_info.allocated_space() {
1309 info!(
1310 old_space = %ByteSize::b(single_disk_farm_info.allocated_space()).display().iec(),
1311 new_space = %ByteSize::b(allocated_space).display().iec(),
1312 "Farm size has changed"
1313 );
1314
1315 let new_allocated_space = allocated_space;
1316 match &mut single_disk_farm_info {
1317 SingleDiskFarmInfo::V0 {
1318 allocated_space, ..
1319 } => {
1320 *allocated_space = new_allocated_space;
1321 }
1322 }
1323
1324 single_disk_farm_info_lock =
1325 single_disk_farm_info.store_to(directory, !disable_farm_locking)?;
1326 } else if !disable_farm_locking {
1327 single_disk_farm_info_lock = Some(
1328 SingleDiskFarmInfo::try_lock(directory)
1329 .map_err(SingleDiskFarmError::LikelyAlreadyInUse)?,
1330 );
1331 }
1332
1333 (single_disk_farm_info, single_disk_farm_info_lock)
1334 }
1335 None => {
1336 let single_disk_farm_info = SingleDiskFarmInfo::new(
1337 FarmId::new(),
1338 farmer_app_info.genesis_root,
1339 public_key,
1340 identity.shard_commitments_seed(),
1341 max_pieces_in_sector,
1342 allocated_space,
1343 );
1344
1345 let single_disk_farm_info_lock =
1346 single_disk_farm_info.store_to(directory, !disable_farm_locking)?;
1347
1348 (single_disk_farm_info, single_disk_farm_info_lock)
1349 }
1350 };
1351
1352 let pieces_in_sector = single_disk_farm_info.pieces_in_sector();
1353 let sector_size = sector_size(pieces_in_sector) as u64;
1354 let sector_metadata_size = SectorMetadataChecksummed::encoded_size();
1355 let allocated_space_distribution = AllocatedSpaceDistribution::new(
1356 allocated_space,
1357 sector_size,
1358 cache_percentage,
1359 sector_metadata_size as u64,
1360 )?;
1361 let target_sector_count = allocated_space_distribution.target_sector_count;
1362
1363 let metadata_file_path = directory.join(Self::METADATA_FILE);
1364 let metadata_file = DirectIoFileWrapper::open(&metadata_file_path)?;
1365
1366 let metadata_size = metadata_file.size()?;
1367 let expected_metadata_size = allocated_space_distribution.metadata_file_size;
1368 let expected_metadata_size =
1370 expected_metadata_size.div_ceil(DISK_PAGE_SIZE as u64) * DISK_PAGE_SIZE as u64;
1371 let metadata_header = if metadata_size == 0 {
1372 let metadata_header = PlotMetadataHeader {
1373 version: SingleDiskFarm::SUPPORTED_PLOT_VERSION,
1374 plotted_sector_count: 0,
1375 };
1376
1377 metadata_file
1378 .preallocate(expected_metadata_size)
1379 .map_err(SingleDiskFarmError::CantPreallocateMetadataFile)?;
1380 metadata_file.write_all_at(metadata_header.encode().as_slice(), 0)?;
1381
1382 metadata_header
1383 } else {
1384 if metadata_size != expected_metadata_size {
1385 metadata_file
1388 .preallocate(expected_metadata_size)
1389 .map_err(SingleDiskFarmError::CantPreallocateMetadataFile)?;
1390 metadata_file.set_len(expected_metadata_size)?;
1392 }
1393
1394 let mut metadata_header_bytes = vec![0; PlotMetadataHeader::encoded_size()];
1395 metadata_file.read_exact_at(&mut metadata_header_bytes, 0)?;
1396
1397 let mut metadata_header =
1398 PlotMetadataHeader::decode(&mut metadata_header_bytes.as_ref())
1399 .map_err(SingleDiskFarmError::FailedToDecodeMetadataHeader)?;
1400
1401 if metadata_header.version != SingleDiskFarm::SUPPORTED_PLOT_VERSION {
1402 return Err(SingleDiskFarmError::UnexpectedMetadataVersion(
1403 metadata_header.version,
1404 ));
1405 }
1406
1407 if metadata_header.plotted_sector_count > target_sector_count {
1408 metadata_header.plotted_sector_count = target_sector_count;
1409 metadata_file.write_all_at(&metadata_header.encode(), 0)?;
1410 }
1411
1412 metadata_header
1413 };
1414
1415 let sectors_metadata = {
1416 let mut sectors_metadata =
1417 Vec::<SectorMetadataChecksummed>::with_capacity(usize::from(target_sector_count));
1418
1419 let mut sector_metadata_bytes = vec![0; sector_metadata_size];
1420 for sector_index in
1421 SectorIndex::ZERO..SectorIndex::new(metadata_header.plotted_sector_count)
1422 {
1423 let sector_offset =
1424 RESERVED_PLOT_METADATA + sector_metadata_size as u64 * u64::from(sector_index);
1425 metadata_file.read_exact_at(&mut sector_metadata_bytes, sector_offset)?;
1426
1427 let sector_metadata =
1428 match SectorMetadataChecksummed::decode(&mut sector_metadata_bytes.as_ref()) {
1429 Ok(sector_metadata) => sector_metadata,
1430 Err(error) => {
1431 warn!(
1432 path = %metadata_file_path.display(),
1433 %error,
1434 %sector_index,
1435 "Failed to decode sector metadata, replacing with dummy expired \
1436 sector metadata"
1437 );
1438
1439 let dummy_sector = SectorMetadataChecksummed::from(SectorMetadata {
1440 sector_index,
1441 pieces_in_sector,
1442 s_bucket_sizes: Box::new([0; Record::NUM_S_BUCKETS]),
1443 history_size: HistorySize::from(SegmentIndex::ZERO),
1444 });
1445 metadata_file.write_all_at(&dummy_sector.encode(), sector_offset)?;
1446
1447 dummy_sector
1448 }
1449 };
1450 sectors_metadata.push(sector_metadata);
1451 }
1452
1453 Arc::new(AsyncRwLock::new(sectors_metadata))
1454 };
1455
1456 let plot_file = DirectIoFileWrapper::open(directory.join(Self::PLOT_FILE))?;
1457
1458 if plot_file.size()? != allocated_space_distribution.plot_file_size {
1459 plot_file
1462 .preallocate(allocated_space_distribution.plot_file_size)
1463 .map_err(SingleDiskFarmError::CantPreallocatePlotFile)?;
1464 plot_file.set_len(allocated_space_distribution.plot_file_size)?;
1466 }
1467
1468 let plot_file = Arc::new(plot_file);
1469
1470 let plot_cache = DiskPlotCache::new(
1471 &plot_file,
1472 §ors_metadata,
1473 target_sector_count,
1474 sector_size,
1475 );
1476
1477 Ok(SingleDiskFarmInit {
1478 identity,
1479 single_disk_farm_info,
1480 single_disk_farm_info_lock,
1481 plot_file,
1482 metadata_file,
1483 metadata_header,
1484 target_sector_count,
1485 sectors_metadata,
1486 piece_cache_capacity: allocated_space_distribution.piece_cache_capacity,
1487 plot_cache,
1488 })
1489 }
1490
1491 pub fn collect_summary(directory: PathBuf) -> SingleDiskFarmSummary {
1493 let single_disk_farm_info = match SingleDiskFarmInfo::load_from(&directory) {
1494 Ok(Some(single_disk_farm_info)) => single_disk_farm_info,
1495 Ok(None) => {
1496 return SingleDiskFarmSummary::NotFound { directory };
1497 }
1498 Err(error) => {
1499 return SingleDiskFarmSummary::Error { directory, error };
1500 }
1501 };
1502
1503 SingleDiskFarmSummary::Found {
1504 info: single_disk_farm_info,
1505 directory,
1506 }
1507 }
1508
1509 pub fn effective_disk_usage(
1515 directory: &Path,
1516 cache_percentage: u8,
1517 ) -> Result<u64, SingleDiskFarmError> {
1518 let mut effective_disk_usage;
1519 match SingleDiskFarmInfo::load_from(directory)? {
1520 Some(single_disk_farm_info) => {
1521 let allocated_space_distribution = AllocatedSpaceDistribution::new(
1522 single_disk_farm_info.allocated_space(),
1523 sector_size(single_disk_farm_info.pieces_in_sector()) as u64,
1524 cache_percentage,
1525 SectorMetadataChecksummed::encoded_size() as u64,
1526 )?;
1527
1528 effective_disk_usage = single_disk_farm_info.allocated_space();
1529 effective_disk_usage -= Identity::file_size() as u64;
1530 effective_disk_usage -= allocated_space_distribution.metadata_file_size;
1531 effective_disk_usage -= allocated_space_distribution.plot_file_size;
1532 effective_disk_usage -= allocated_space_distribution.piece_cache_file_size;
1533 }
1534 None => {
1535 effective_disk_usage = 0;
1537 }
1538 };
1539
1540 if Identity::open(directory)?.is_some() {
1541 effective_disk_usage += Identity::file_size() as u64;
1542 }
1543
1544 match OpenOptions::new()
1545 .read(true)
1546 .open(directory.join(Self::METADATA_FILE))
1547 {
1548 Ok(metadata_file) => {
1549 effective_disk_usage += metadata_file.size()?;
1550 }
1551 Err(error) => {
1552 if error.kind() == io::ErrorKind::NotFound {
1553 } else {
1555 return Err(error.into());
1556 }
1557 }
1558 };
1559
1560 match OpenOptions::new()
1561 .read(true)
1562 .open(directory.join(Self::PLOT_FILE))
1563 {
1564 Ok(plot_file) => {
1565 effective_disk_usage += plot_file.size()?;
1566 }
1567 Err(error) => {
1568 if error.kind() == io::ErrorKind::NotFound {
1569 } else {
1571 return Err(error.into());
1572 }
1573 }
1574 };
1575
1576 match OpenOptions::new()
1577 .read(true)
1578 .open(directory.join(DiskPieceCache::FILE_NAME))
1579 {
1580 Ok(piece_cache) => {
1581 effective_disk_usage += piece_cache.size()?;
1582 }
1583 Err(error) => {
1584 if error.kind() == io::ErrorKind::NotFound {
1585 } else {
1587 return Err(error.into());
1588 }
1589 }
1590 };
1591
1592 Ok(effective_disk_usage)
1593 }
1594
1595 pub fn read_all_sectors_metadata(
1597 directory: &Path,
1598 ) -> io::Result<Vec<SectorMetadataChecksummed>> {
1599 let metadata_file = DirectIoFileWrapper::open(directory.join(Self::METADATA_FILE))?;
1600
1601 let metadata_size = metadata_file.size()?;
1602 let sector_metadata_size = SectorMetadataChecksummed::encoded_size();
1603
1604 let mut metadata_header_bytes = vec![0; PlotMetadataHeader::encoded_size()];
1605 metadata_file.read_exact_at(&mut metadata_header_bytes, 0)?;
1606
1607 let metadata_header = PlotMetadataHeader::decode(&mut metadata_header_bytes.as_ref())
1608 .map_err(|error| {
1609 io::Error::other(format!("Failed to decode metadata header: {error}"))
1610 })?;
1611
1612 if metadata_header.version != SingleDiskFarm::SUPPORTED_PLOT_VERSION {
1613 return Err(io::Error::other(format!(
1614 "Unsupported metadata version {}",
1615 metadata_header.version
1616 )));
1617 }
1618
1619 let mut sectors_metadata = Vec::<SectorMetadataChecksummed>::with_capacity(
1620 ((metadata_size - RESERVED_PLOT_METADATA) / sector_metadata_size as u64) as usize,
1621 );
1622
1623 let mut sector_metadata_bytes = vec![0; sector_metadata_size];
1624 for sector_index in 0..metadata_header.plotted_sector_count {
1625 metadata_file.read_exact_at(
1626 &mut sector_metadata_bytes,
1627 RESERVED_PLOT_METADATA + sector_metadata_size as u64 * u64::from(sector_index),
1628 )?;
1629 sectors_metadata.push(
1630 SectorMetadataChecksummed::decode(&mut sector_metadata_bytes.as_ref()).map_err(
1631 |error| io::Error::other(format!("Failed to decode sector metadata: {error}")),
1632 )?,
1633 );
1634 }
1635
1636 Ok(sectors_metadata)
1637 }
1638
1639 pub fn id(&self) -> &FarmId {
1641 self.single_disk_farm_info.id()
1642 }
1643
1644 pub fn info(&self) -> &SingleDiskFarmInfo {
1646 &self.single_disk_farm_info
1647 }
1648
1649 pub fn total_sectors_count(&self) -> u16 {
1651 self.total_sectors_count
1652 }
1653
1654 pub fn plotted_sectors(&self) -> SingleDiskPlottedSectors {
1656 SingleDiskPlottedSectors {
1657 public_key_hash: self.single_disk_farm_info.public_key().hash(),
1658 shard_commitments_roots_cache: self.shard_commitments_roots_cache.clone(),
1659 pieces_in_sector: self.pieces_in_sector,
1660 farmer_protocol_info: self.farmer_protocol_info,
1661 sectors_metadata: Arc::clone(&self.sectors_metadata),
1662 }
1663 }
1664
1665 pub fn piece_cache(&self) -> SingleDiskPieceCache {
1667 self.piece_cache.clone()
1668 }
1669
1670 pub fn plot_cache(&self) -> DiskPlotCache {
1672 self.plot_cache.clone()
1673 }
1674
1675 pub fn piece_reader(&self) -> DiskPieceReader {
1677 self.piece_reader.clone()
1678 }
1679
1680 pub fn on_sector_update(&self, callback: HandlerFn<(SectorIndex, SectorUpdate)>) -> HandlerId {
1682 self.handlers.sector_update.add(callback)
1683 }
1684
1685 pub fn on_farming_notification(&self, callback: HandlerFn<FarmingNotification>) -> HandlerId {
1687 self.handlers.farming_notification.add(callback)
1688 }
1689
1690 pub fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> HandlerId {
1692 self.handlers.solution.add(callback)
1693 }
1694
1695 pub async fn run(mut self) -> anyhow::Result<()> {
1697 if let Some(start_sender) = self.start_sender.take() {
1698 let _ = start_sender.send(());
1700 }
1701
1702 while let Some(result) = self.tasks.next().instrument(self.span.clone()).await {
1703 result?;
1704 }
1705
1706 Ok(())
1707 }
1708
1709 pub fn wipe(directory: &Path) -> io::Result<()> {
1711 let single_disk_info_info_path = directory.join(SingleDiskFarmInfo::FILE_NAME);
1712 match SingleDiskFarmInfo::load_from(directory) {
1713 Ok(Some(single_disk_farm_info)) => {
1714 info!("Found single disk farm {}", single_disk_farm_info.id());
1715 }
1716 Ok(None) => {
1717 return Err(io::Error::new(
1718 io::ErrorKind::NotFound,
1719 format!(
1720 "Single disk farm info not found at {}",
1721 single_disk_info_info_path.display()
1722 ),
1723 ));
1724 }
1725 Err(error) => {
1726 warn!("Found unknown single disk farm: {}", error);
1727 }
1728 }
1729
1730 {
1731 let plot = directory.join(Self::PLOT_FILE);
1732 if plot.exists() {
1733 info!("Deleting plot file at {}", plot.display());
1734 fs::remove_file(plot)?;
1735 }
1736 }
1737 {
1738 let metadata = directory.join(Self::METADATA_FILE);
1739 if metadata.exists() {
1740 info!("Deleting metadata file at {}", metadata.display());
1741 fs::remove_file(metadata)?;
1742 }
1743 }
1744 {
1747 let identity = directory.join("identity.bin");
1748 if identity.exists() {
1749 info!("Deleting identity file at {}", identity.display());
1750 fs::remove_file(identity)?;
1751 }
1752 }
1753
1754 DiskPieceCache::wipe(directory)?;
1755
1756 info!(
1757 "Deleting info file at {}",
1758 single_disk_info_info_path.display()
1759 );
1760 fs::remove_file(single_disk_info_info_path)
1761 }
1762
1763 pub fn scrub(
1766 directory: &Path,
1767 disable_farm_locking: bool,
1768 target: ScrubTarget,
1769 dry_run: bool,
1770 ) -> Result<(), SingleDiskFarmScrubError> {
1771 let span = Span::current();
1772
1773 if dry_run {
1774 info!("Dry run is used, no changes will be written to disk");
1775 }
1776
1777 if target.metadata() || target.plot() {
1778 let info = {
1779 let file = directory.join(SingleDiskFarmInfo::FILE_NAME);
1780 info!(path = %file.display(), "Checking info file");
1781
1782 match SingleDiskFarmInfo::load_from(directory) {
1783 Ok(Some(info)) => info,
1784 Ok(None) => {
1785 return Err(SingleDiskFarmScrubError::FarmInfoFileDoesNotExist { file });
1786 }
1787 Err(error) => {
1788 return Err(SingleDiskFarmScrubError::FarmInfoCantBeOpened { file, error });
1789 }
1790 }
1791 };
1792
1793 let _single_disk_farm_info_lock = if disable_farm_locking {
1794 None
1795 } else {
1796 Some(
1797 SingleDiskFarmInfo::try_lock(directory)
1798 .map_err(SingleDiskFarmScrubError::LikelyAlreadyInUse)?,
1799 )
1800 };
1801
1802 let identity = {
1803 let file = directory.join(Identity::FILE_NAME);
1804 info!(path = %file.display(), "Checking identity file");
1805
1806 match Identity::open(directory) {
1807 Ok(Some(identity)) => identity,
1808 Ok(None) => {
1809 return Err(SingleDiskFarmScrubError::IdentityFileDoesNotExist { file });
1810 }
1811 Err(error) => {
1812 return Err(SingleDiskFarmScrubError::IdentityCantBeOpened { file, error });
1813 }
1814 }
1815 };
1816
1817 if &identity.public_key() != info.public_key() {
1818 return Err(SingleDiskFarmScrubError::PublicKeyMismatch {
1819 identity: identity.public_key(),
1820 info: *info.public_key(),
1821 });
1822 }
1823
1824 let sector_metadata_size = SectorMetadataChecksummed::encoded_size();
1825
1826 let metadata_file_path = directory.join(Self::METADATA_FILE);
1827 let (metadata_file, mut metadata_header) = {
1828 info!(path = %metadata_file_path.display(), "Checking metadata file");
1829
1830 let metadata_file = match OpenOptions::new()
1831 .read(true)
1832 .write(!dry_run)
1833 .open(&metadata_file_path)
1834 {
1835 Ok(metadata_file) => metadata_file,
1836 Err(error) => {
1837 return Err(if error.kind() == io::ErrorKind::NotFound {
1838 SingleDiskFarmScrubError::MetadataFileDoesNotExist {
1839 file: metadata_file_path,
1840 }
1841 } else {
1842 SingleDiskFarmScrubError::MetadataCantBeOpened {
1843 file: metadata_file_path,
1844 error,
1845 }
1846 });
1847 }
1848 };
1849
1850 let _ = metadata_file.advise_sequential_access();
1852
1853 let metadata_size = match metadata_file.size() {
1854 Ok(metadata_size) => metadata_size,
1855 Err(error) => {
1856 return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize {
1857 file: metadata_file_path,
1858 error,
1859 });
1860 }
1861 };
1862
1863 if metadata_size < RESERVED_PLOT_METADATA {
1864 return Err(SingleDiskFarmScrubError::MetadataFileTooSmall {
1865 file: metadata_file_path,
1866 reserved_size: RESERVED_PLOT_METADATA,
1867 size: metadata_size,
1868 });
1869 }
1870
1871 let mut metadata_header = {
1872 let mut reserved_metadata = vec![0; RESERVED_PLOT_METADATA as usize];
1873
1874 if let Err(error) = metadata_file.read_exact_at(&mut reserved_metadata, 0) {
1875 return Err(SingleDiskFarmScrubError::FailedToReadBytes {
1876 file: metadata_file_path,
1877 size: RESERVED_PLOT_METADATA,
1878 offset: 0,
1879 error,
1880 });
1881 }
1882
1883 PlotMetadataHeader::decode(&mut reserved_metadata.as_slice())
1884 .map_err(SingleDiskFarmScrubError::FailedToDecodeMetadataHeader)?
1885 };
1886
1887 if metadata_header.version != SingleDiskFarm::SUPPORTED_PLOT_VERSION {
1888 return Err(SingleDiskFarmScrubError::UnexpectedMetadataVersion(
1889 metadata_header.version,
1890 ));
1891 }
1892
1893 let plotted_sector_count = metadata_header.plotted_sector_count;
1894
1895 let expected_metadata_size = RESERVED_PLOT_METADATA
1896 + sector_metadata_size as u64 * u64::from(plotted_sector_count);
1897
1898 if metadata_size < expected_metadata_size {
1899 warn!(
1900 %metadata_size,
1901 %expected_metadata_size,
1902 "Metadata file size is smaller than expected, shrinking number of plotted \
1903 sectors to correct value"
1904 );
1905
1906 metadata_header.plotted_sector_count =
1907 ((metadata_size - RESERVED_PLOT_METADATA) / sector_metadata_size as u64)
1908 as u16;
1909 let metadata_header_bytes = metadata_header.encode();
1910
1911 if !dry_run
1912 && let Err(error) = metadata_file.write_all_at(&metadata_header_bytes, 0)
1913 {
1914 return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
1915 file: metadata_file_path,
1916 size: metadata_header_bytes.len() as u64,
1917 offset: 0,
1918 error,
1919 });
1920 }
1921 }
1922
1923 (metadata_file, metadata_header)
1924 };
1925
1926 let pieces_in_sector = info.pieces_in_sector();
1927 let sector_size = sector_size(pieces_in_sector) as u64;
1928
1929 let plot_file_path = directory.join(Self::PLOT_FILE);
1930 let plot_file = {
1931 let plot_file_path = directory.join(Self::PLOT_FILE);
1932 info!(path = %plot_file_path.display(), "Checking plot file");
1933
1934 let plot_file = match OpenOptions::new()
1935 .read(true)
1936 .write(!dry_run)
1937 .open(&plot_file_path)
1938 {
1939 Ok(plot_file) => plot_file,
1940 Err(error) => {
1941 return Err(if error.kind() == io::ErrorKind::NotFound {
1942 SingleDiskFarmScrubError::MetadataFileDoesNotExist {
1943 file: plot_file_path,
1944 }
1945 } else {
1946 SingleDiskFarmScrubError::MetadataCantBeOpened {
1947 file: plot_file_path,
1948 error,
1949 }
1950 });
1951 }
1952 };
1953
1954 let _ = plot_file.advise_sequential_access();
1956
1957 let plot_size = match plot_file.size() {
1958 Ok(metadata_size) => metadata_size,
1959 Err(error) => {
1960 return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize {
1961 file: plot_file_path,
1962 error,
1963 });
1964 }
1965 };
1966
1967 let min_expected_plot_size =
1968 u64::from(metadata_header.plotted_sector_count) * sector_size;
1969 if plot_size < min_expected_plot_size {
1970 warn!(
1971 %plot_size,
1972 %min_expected_plot_size,
1973 "Plot file size is smaller than expected, shrinking number of plotted \
1974 sectors to correct value"
1975 );
1976
1977 metadata_header.plotted_sector_count = (plot_size / sector_size) as u16;
1978 let metadata_header_bytes = metadata_header.encode();
1979
1980 if !dry_run
1981 && let Err(error) = metadata_file.write_all_at(&metadata_header_bytes, 0)
1982 {
1983 return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
1984 file: plot_file_path,
1985 size: metadata_header_bytes.len() as u64,
1986 offset: 0,
1987 error,
1988 });
1989 }
1990 }
1991
1992 plot_file
1993 };
1994
1995 let sector_bytes_range = 0..(sector_size as usize - Blake3Hash::SIZE);
1996
1997 info!("Checking sectors and corresponding metadata");
1998 (0..metadata_header.plotted_sector_count)
1999 .into_par_iter()
2000 .map(SectorIndex::new)
2001 .map_init(
2002 || vec![0u8; Record::SIZE],
2003 |scratch_buffer, sector_index| {
2004 let _span_guard = span.enter();
2005
2006 let offset = RESERVED_PLOT_METADATA
2007 + u64::from(sector_index) * sector_metadata_size as u64;
2008 if let Err(error) = metadata_file
2009 .read_exact_at(&mut scratch_buffer[..sector_metadata_size], offset)
2010 {
2011 warn!(
2012 path = %metadata_file_path.display(),
2013 %error,
2014 %offset,
2015 size = %sector_metadata_size,
2016 %sector_index,
2017 "Failed to read sector metadata, replacing with dummy expired \
2018 sector metadata"
2019 );
2020
2021 if !dry_run {
2022 write_dummy_sector_metadata(
2023 &metadata_file,
2024 &metadata_file_path,
2025 sector_index,
2026 pieces_in_sector,
2027 )?;
2028 }
2029 return Ok(());
2030 }
2031
2032 let sector_metadata = match SectorMetadataChecksummed::decode(
2033 &mut &scratch_buffer[..sector_metadata_size],
2034 ) {
2035 Ok(sector_metadata) => sector_metadata,
2036 Err(error) => {
2037 warn!(
2038 path = %metadata_file_path.display(),
2039 %error,
2040 %sector_index,
2041 "Failed to decode sector metadata, replacing with dummy \
2042 expired sector metadata"
2043 );
2044
2045 if !dry_run {
2046 write_dummy_sector_metadata(
2047 &metadata_file,
2048 &metadata_file_path,
2049 sector_index,
2050 pieces_in_sector,
2051 )?;
2052 }
2053 return Ok(());
2054 }
2055 };
2056
2057 if sector_metadata.sector_index != sector_index {
2058 warn!(
2059 path = %metadata_file_path.display(),
2060 %sector_index,
2061 found_sector_index = %sector_metadata.sector_index,
2062 "Sector index mismatch, replacing with dummy expired sector \
2063 metadata"
2064 );
2065
2066 if !dry_run {
2067 write_dummy_sector_metadata(
2068 &metadata_file,
2069 &metadata_file_path,
2070 sector_index,
2071 pieces_in_sector,
2072 )?;
2073 }
2074 return Ok(());
2075 }
2076
2077 if sector_metadata.pieces_in_sector != pieces_in_sector {
2078 warn!(
2079 path = %metadata_file_path.display(),
2080 %sector_index,
2081 %pieces_in_sector,
2082 found_pieces_in_sector = sector_metadata.pieces_in_sector,
2083 "Pieces in sector mismatch, replacing with dummy expired sector \
2084 metadata"
2085 );
2086
2087 if !dry_run {
2088 write_dummy_sector_metadata(
2089 &metadata_file,
2090 &metadata_file_path,
2091 sector_index,
2092 pieces_in_sector,
2093 )?;
2094 }
2095 return Ok(());
2096 }
2097
2098 if target.plot() {
2099 let mut hasher = blake3::Hasher::new();
2100 for offset_in_sector in
2102 sector_bytes_range.clone().step_by(scratch_buffer.len())
2103 {
2104 let offset =
2105 u64::from(sector_index) * sector_size + offset_in_sector as u64;
2106 let bytes_to_read = (offset_in_sector + scratch_buffer.len())
2107 .min(sector_bytes_range.end)
2108 - offset_in_sector;
2109
2110 let bytes = &mut scratch_buffer[..bytes_to_read];
2111
2112 if let Err(error) = plot_file.read_exact_at(bytes, offset) {
2113 warn!(
2114 path = %plot_file_path.display(),
2115 %error,
2116 %sector_index,
2117 %offset,
2118 size = %bytes.len() as u64,
2119 "Failed to read sector bytes"
2120 );
2121
2122 continue;
2123 }
2124
2125 hasher.update(bytes);
2126 }
2127
2128 let actual_checksum = *hasher.finalize().as_bytes();
2129 let mut expected_checksum = [0; Blake3Hash::SIZE];
2130 {
2131 let offset = u64::from(sector_index) * sector_size
2132 + sector_bytes_range.end as u64;
2133 if let Err(error) =
2134 plot_file.read_exact_at(&mut expected_checksum, offset)
2135 {
2136 warn!(
2137 path = %plot_file_path.display(),
2138 %error,
2139 %sector_index,
2140 %offset,
2141 size = %expected_checksum.len() as u64,
2142 "Failed to read sector checksum bytes"
2143 );
2144 }
2145 }
2146
2147 if actual_checksum != expected_checksum {
2149 warn!(
2150 path = %plot_file_path.display(),
2151 %sector_index,
2152 actual_checksum = %hex::encode(actual_checksum),
2153 expected_checksum = %hex::encode(expected_checksum),
2154 "Plotted sector checksum mismatch, replacing with dummy \
2155 expired sector"
2156 );
2157
2158 if !dry_run {
2159 write_dummy_sector_metadata(
2160 &metadata_file,
2161 &metadata_file_path,
2162 sector_index,
2163 pieces_in_sector,
2164 )?;
2165 }
2166
2167 scratch_buffer.fill(0);
2168
2169 hasher.reset();
2170 for offset_in_sector in
2172 sector_bytes_range.clone().step_by(scratch_buffer.len())
2173 {
2174 let offset = u64::from(sector_index) * sector_size
2175 + offset_in_sector as u64;
2176 let bytes_to_write = (offset_in_sector + scratch_buffer.len())
2177 .min(sector_bytes_range.end)
2178 - offset_in_sector;
2179 let bytes = &mut scratch_buffer[..bytes_to_write];
2180
2181 if !dry_run
2182 && let Err(error) = plot_file.write_all_at(bytes, offset)
2183 {
2184 return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2185 file: plot_file_path.clone(),
2186 size: scratch_buffer.len() as u64,
2187 offset,
2188 error,
2189 });
2190 }
2191
2192 hasher.update(bytes);
2193 }
2194 {
2196 let checksum = *hasher.finalize().as_bytes();
2197 let offset = u64::from(sector_index) * sector_size
2198 + sector_bytes_range.end as u64;
2199 if !dry_run
2200 && let Err(error) =
2201 plot_file.write_all_at(&checksum, offset)
2202 {
2203 return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2204 file: plot_file_path.clone(),
2205 size: checksum.len() as u64,
2206 offset,
2207 error,
2208 });
2209 }
2210 }
2211
2212 return Ok(());
2213 }
2214 }
2215
2216 trace!(%sector_index, "Sector is in good shape");
2217
2218 Ok(())
2219 },
2220 )
2221 .try_for_each({
2222 let span = &span;
2223 let checked_sectors = AtomicUsize::new(0);
2224
2225 move |result| {
2226 let _span_guard = span.enter();
2227
2228 let checked_sectors = checked_sectors.fetch_add(1, Ordering::Relaxed);
2229 if checked_sectors > 1 && checked_sectors.is_multiple_of(10) {
2230 info!(
2231 "Checked {}/{} sectors",
2232 checked_sectors, metadata_header.plotted_sector_count
2233 );
2234 }
2235
2236 result
2237 }
2238 })?;
2239 }
2240
2241 if target.cache() {
2242 Self::scrub_cache(directory, dry_run)?;
2243 }
2244
2245 info!("Farm check completed");
2246
2247 Ok(())
2248 }
2249
2250 fn scrub_cache(directory: &Path, dry_run: bool) -> Result<(), SingleDiskFarmScrubError> {
2251 let span = Span::current();
2252
2253 let file = directory.join(DiskPieceCache::FILE_NAME);
2254 info!(path = %file.display(), "Checking cache file");
2255
2256 let cache_file = match OpenOptions::new().read(true).write(!dry_run).open(&file) {
2257 Ok(plot_file) => plot_file,
2258 Err(error) => {
2259 return if error.kind() == io::ErrorKind::NotFound {
2260 warn!(
2261 file = %file.display(),
2262 "Cache file does not exist, this is expected in farming cluster"
2263 );
2264 Ok(())
2265 } else {
2266 Err(SingleDiskFarmScrubError::CacheCantBeOpened { file, error })
2267 };
2268 }
2269 };
2270
2271 let _ = cache_file.advise_sequential_access();
2273
2274 let cache_size = match cache_file.size() {
2275 Ok(cache_size) => cache_size,
2276 Err(error) => {
2277 return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize { file, error });
2278 }
2279 };
2280
2281 let element_size = DiskPieceCache::element_size();
2282 let number_of_cached_elements = cache_size / u64::from(element_size);
2283 let dummy_element = vec![0; element_size as usize];
2284 (0..number_of_cached_elements)
2285 .into_par_iter()
2286 .map_with(vec![0; element_size as usize], |element, cache_offset| {
2287 let _span_guard = span.enter();
2288
2289 let offset = cache_offset * u64::from(element_size);
2290 if let Err(error) = cache_file.read_exact_at(element, offset) {
2291 warn!(
2292 path = %file.display(),
2293 %cache_offset,
2294 size = %element.len() as u64,
2295 %offset,
2296 %error,
2297 "Failed to read cached piece, replacing with dummy element"
2298 );
2299
2300 if !dry_run && let Err(error) = cache_file.write_all_at(&dummy_element, offset)
2301 {
2302 return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2303 file: file.clone(),
2304 size: u64::from(element_size),
2305 offset,
2306 error,
2307 });
2308 }
2309
2310 return Ok(());
2311 }
2312
2313 let (index_and_piece_bytes, expected_checksum) =
2314 element.split_at(element_size as usize - Blake3Hash::SIZE);
2315 let actual_checksum = *blake3::hash(index_and_piece_bytes).as_bytes();
2316 if actual_checksum != expected_checksum && element != &dummy_element {
2317 warn!(
2318 %cache_offset,
2319 actual_checksum = %hex::encode(actual_checksum),
2320 expected_checksum = %hex::encode(expected_checksum),
2321 "Cached piece checksum mismatch, replacing with dummy element"
2322 );
2323
2324 if !dry_run && let Err(error) = cache_file.write_all_at(&dummy_element, offset)
2325 {
2326 return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2327 file: file.clone(),
2328 size: u64::from(element_size),
2329 offset,
2330 error,
2331 });
2332 }
2333
2334 return Ok(());
2335 }
2336
2337 Ok(())
2338 })
2339 .try_for_each({
2340 let span = &span;
2341 let checked_elements = AtomicUsize::new(0);
2342
2343 move |result| {
2344 let _span_guard = span.enter();
2345
2346 let checked_elements = checked_elements.fetch_add(1, Ordering::Relaxed);
2347 if checked_elements > 1 && checked_elements.is_multiple_of(1000) {
2348 info!(
2349 "Checked {}/{} cache elements",
2350 checked_elements, number_of_cached_elements
2351 );
2352 }
2353
2354 result
2355 }
2356 })?;
2357
2358 Ok(())
2359 }
2360}
2361
2362fn write_dummy_sector_metadata(
2363 metadata_file: &File,
2364 metadata_file_path: &Path,
2365 sector_index: SectorIndex,
2366 pieces_in_sector: u16,
2367) -> Result<(), SingleDiskFarmScrubError> {
2368 let dummy_sector_bytes = SectorMetadataChecksummed::from(SectorMetadata {
2369 sector_index,
2370 pieces_in_sector,
2371 s_bucket_sizes: Box::new([0; Record::NUM_S_BUCKETS]),
2372 history_size: HistorySize::from(SegmentIndex::ZERO),
2373 })
2374 .encode();
2375 let sector_offset = RESERVED_PLOT_METADATA
2376 + u64::from(sector_index) * SectorMetadataChecksummed::encoded_size() as u64;
2377 metadata_file
2378 .write_all_at(&dummy_sector_bytes, sector_offset)
2379 .map_err(|error| SingleDiskFarmScrubError::FailedToWriteBytes {
2380 file: metadata_file_path.to_path_buf(),
2381 size: dummy_sector_bytes.len() as u64,
2382 offset: sector_offset,
2383 error,
2384 })
2385}