ab_farmer/
single_disk_farm.rs

1//! Primary [`Farm`] implementation that deals with hardware directly
2//!
3//! Single disk farm is an abstraction that contains an identity, associated plot with metadata and
4//! a small piece cache. It fully manages farming and plotting process, including listening to node
5//! notifications, producing solutions and sealing blocks.
6
7mod block_sealing;
8pub mod direct_io_file_wrapper;
9pub mod farming;
10pub mod identity;
11mod metrics;
12pub mod piece_cache;
13pub mod piece_reader;
14pub mod plot_cache;
15mod plotted_sectors;
16mod plotting;
17
18use crate::disk_piece_cache::{DiskPieceCache, DiskPieceCacheError};
19use crate::farm::{
20    Farm, FarmId, FarmingError, FarmingNotification, HandlerFn, PieceCacheId, PieceReader,
21    PlottedSectors, SectorUpdate,
22};
23use crate::node_client::NodeClient;
24use crate::plotter::Plotter;
25use crate::single_disk_farm::block_sealing::block_sealing;
26use crate::single_disk_farm::direct_io_file_wrapper::{DISK_PAGE_SIZE, DirectIoFileWrapper};
27use crate::single_disk_farm::farming::rayon_files::RayonFiles;
28use crate::single_disk_farm::farming::{
29    FarmingOptions, PlotAudit, farming, slot_notification_forwarder,
30};
31use crate::single_disk_farm::identity::{Identity, IdentityError};
32use crate::single_disk_farm::metrics::SingleDiskFarmMetrics;
33use crate::single_disk_farm::piece_cache::SingleDiskPieceCache;
34use crate::single_disk_farm::piece_reader::DiskPieceReader;
35use crate::single_disk_farm::plot_cache::DiskPlotCache;
36use crate::single_disk_farm::plotted_sectors::SingleDiskPlottedSectors;
37pub use crate::single_disk_farm::plotting::PlottingError;
38use crate::single_disk_farm::plotting::{
39    PlottingOptions, PlottingSchedulerOptions, SectorPlottingOptions, plotting, plotting_scheduler,
40};
41use crate::utils::{AsyncJoinOnDrop, tokio_rayon_spawn_handler};
42use crate::{KNOWN_PEERS_CACHE_SIZE, farm};
43use ab_core_primitives::address::Address;
44use ab_core_primitives::block::BlockRoot;
45use ab_core_primitives::ed25519::Ed25519PublicKey;
46use ab_core_primitives::hashes::Blake3Hash;
47use ab_core_primitives::pieces::Record;
48use ab_core_primitives::sectors::SectorIndex;
49use ab_core_primitives::segments::{HistorySize, SegmentIndex};
50use ab_erasure_coding::ErasureCoding;
51use ab_farmer_components::FarmerProtocolInfo;
52use ab_farmer_components::file_ext::FileExt;
53use ab_farmer_components::sector::{SectorMetadata, SectorMetadataChecksummed, sector_size};
54use ab_farmer_components::shard_commitment::ShardCommitmentsRootsCache;
55use ab_farmer_rpc_primitives::{FarmerAppInfo, SolutionResponse};
56use ab_networking::KnownPeersManager;
57use ab_proof_of_space::Table;
58use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
59use async_trait::async_trait;
60use bytesize::ByteSize;
61use event_listener_primitives::{Bag, HandlerId};
62use futures::channel::{mpsc, oneshot};
63use futures::stream::FuturesUnordered;
64use futures::{FutureExt, StreamExt, select};
65use parity_scale_codec::{Decode, Encode};
66use parking_lot::Mutex;
67use prometheus_client::registry::Registry;
68use rayon::prelude::*;
69use rayon::{ThreadPoolBuildError, ThreadPoolBuilder};
70use serde::{Deserialize, Serialize};
71use std::collections::HashSet;
72use std::fs::{File, OpenOptions};
73use std::future::Future;
74use std::io::Write;
75use std::num::{NonZeroU32, NonZeroUsize};
76use std::path::{Path, PathBuf};
77use std::pin::Pin;
78use std::str::FromStr;
79use std::sync::Arc;
80use std::sync::atomic::{AtomicUsize, Ordering};
81use std::time::Duration;
82use std::{fmt, fs, io, mem};
83use thiserror::Error;
84use tokio::runtime::Handle;
85use tokio::sync::broadcast;
86use tokio::task;
87use tracing::{Instrument, Span, error, info, trace, warn};
88
89// Refuse to compile on non-64-bit platforms, offsets may fail on those when converting from u64 to
90// usize depending on chain parameters
91const _: () = {
92    assert!(mem::size_of::<usize>() >= mem::size_of::<u64>());
93};
94
95/// Reserve 1M of space for plot metadata (for potential future expansion)
96const RESERVED_PLOT_METADATA: u64 = 1024 * 1024;
97/// Reserve 1M of space for farm info (for potential future expansion)
98const RESERVED_FARM_INFO: u64 = 1024 * 1024;
99const NEW_SEGMENT_PROCESSING_DELAY: Duration = Duration::from_mins(10);
100
101/// Exclusive lock for single disk farm info file, ensuring no concurrent edits by cooperating
102/// processes is done
103#[derive(Debug)]
104#[must_use = "Lock file must be kept around or as long as farm is used"]
105pub struct SingleDiskFarmInfoLock {
106    _file: File,
107}
108
109/// Important information about the contents of the `SingleDiskFarm`
110#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
111#[serde(rename_all = "camelCase")]
112pub enum SingleDiskFarmInfo {
113    /// V0 of the info
114    #[serde(rename_all = "camelCase")]
115    V0 {
116        /// ID of the farm
117        id: FarmId,
118        /// Genesis root of the beacon chain used for farm creation
119        genesis_root: BlockRoot,
120        /// Public key of identity used for farm creation
121        public_key: Ed25519PublicKey,
122        /// Seed used for deriving shard commitments
123        shard_commitments_seed: Blake3Hash,
124        /// How many pieces does one sector contain.
125        pieces_in_sector: u16,
126        /// How much space in bytes is allocated for this farm
127        allocated_space: u64,
128    },
129}
130
131impl SingleDiskFarmInfo {
132    const FILE_NAME: &'static str = "single_disk_farm.json";
133
134    /// Create a new instance
135    pub fn new(
136        id: FarmId,
137        genesis_root: BlockRoot,
138        public_key: Ed25519PublicKey,
139        shard_commitments_seed: Blake3Hash,
140        pieces_in_sector: u16,
141        allocated_space: u64,
142    ) -> Self {
143        Self::V0 {
144            id,
145            genesis_root,
146            public_key,
147            shard_commitments_seed,
148            pieces_in_sector,
149            allocated_space,
150        }
151    }
152
153    /// Load `SingleDiskFarm` from path is supposed to be stored, `None` means no info file was
154    /// found, happens during first start.
155    pub fn load_from(directory: &Path) -> io::Result<Option<Self>> {
156        let bytes = match fs::read(directory.join(Self::FILE_NAME)) {
157            Ok(bytes) => bytes,
158            Err(error) => {
159                return if error.kind() == io::ErrorKind::NotFound {
160                    Ok(None)
161                } else {
162                    Err(error)
163                };
164            }
165        };
166
167        serde_json::from_slice(&bytes)
168            .map(Some)
169            .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))
170    }
171
172    /// Store `SingleDiskFarm` info to path, so it can be loaded again upon restart.
173    ///
174    /// Can optionally return a lock.
175    pub fn store_to(
176        &self,
177        directory: &Path,
178        lock: bool,
179    ) -> io::Result<Option<SingleDiskFarmInfoLock>> {
180        let mut file = OpenOptions::new()
181            .write(true)
182            .create(true)
183            .truncate(false)
184            .open(directory.join(Self::FILE_NAME))?;
185        if lock {
186            fs4::fs_std::FileExt::try_lock_exclusive(&file)?;
187        }
188        file.set_len(0)?;
189        file.write_all(&serde_json::to_vec(self).expect("Info serialization never fails; qed"))?;
190
191        Ok(lock.then_some(SingleDiskFarmInfoLock { _file: file }))
192    }
193
194    /// Try to acquire exclusive lock on the single disk farm info file, ensuring no concurrent
195    /// edits by cooperating processes is done
196    pub fn try_lock(directory: &Path) -> io::Result<SingleDiskFarmInfoLock> {
197        let file = File::open(directory.join(Self::FILE_NAME))?;
198        fs4::fs_std::FileExt::try_lock_exclusive(&file)?;
199
200        Ok(SingleDiskFarmInfoLock { _file: file })
201    }
202
203    /// ID of the farm
204    pub fn id(&self) -> &FarmId {
205        let Self::V0 { id, .. } = self;
206        id
207    }
208
209    /// Genesis hash of the chain used for farm creation
210    pub fn genesis_root(&self) -> &BlockRoot {
211        let Self::V0 { genesis_root, .. } = self;
212        genesis_root
213    }
214
215    /// Public key of identity used for farm creation
216    pub fn public_key(&self) -> &Ed25519PublicKey {
217        let Self::V0 { public_key, .. } = self;
218        public_key
219    }
220
221    /// Seed used for deriving shard commitments
222    pub fn shard_commitments_seed(&self) -> &Blake3Hash {
223        let Self::V0 {
224            shard_commitments_seed,
225            ..
226        } = self;
227        shard_commitments_seed
228    }
229
230    /// How many pieces does one sector contain.
231    pub fn pieces_in_sector(&self) -> u16 {
232        match self {
233            SingleDiskFarmInfo::V0 {
234                pieces_in_sector, ..
235            } => *pieces_in_sector,
236        }
237    }
238
239    /// How much space in bytes is allocated for this farm
240    pub fn allocated_space(&self) -> u64 {
241        match self {
242            SingleDiskFarmInfo::V0 {
243                allocated_space, ..
244            } => *allocated_space,
245        }
246    }
247}
248
249/// Summary of single disk farm for presentational purposes
250#[derive(Debug)]
251pub enum SingleDiskFarmSummary {
252    /// Farm was found and read successfully
253    Found {
254        /// Farm info
255        info: SingleDiskFarmInfo,
256        /// Path to directory where farm is stored.
257        directory: PathBuf,
258    },
259    /// Farm was not found
260    NotFound {
261        /// Path to directory where farm is stored.
262        directory: PathBuf,
263    },
264    /// Failed to open farm
265    Error {
266        /// Path to directory where farm is stored.
267        directory: PathBuf,
268        /// Error itself
269        error: io::Error,
270    },
271}
272
273#[derive(Debug, Encode, Decode)]
274struct PlotMetadataHeader {
275    version: u8,
276    plotted_sector_count: u16,
277}
278
279impl PlotMetadataHeader {
280    #[inline]
281    fn encoded_size() -> usize {
282        let default = PlotMetadataHeader {
283            version: 0,
284            plotted_sector_count: 0,
285        };
286
287        default.encoded_size()
288    }
289}
290
291/// Options used to open single disk farm
292#[derive(Debug)]
293pub struct SingleDiskFarmOptions<'a, NC>
294where
295    NC: Clone,
296{
297    /// Path to directory where farm is stored.
298    pub directory: PathBuf,
299    /// Information necessary for farmer application
300    pub farmer_app_info: FarmerAppInfo,
301    /// The amount of space in bytes that was allocated
302    pub allocated_space: u64,
303    /// How many pieces one sector is supposed to contain (max)
304    pub max_pieces_in_sector: u16,
305    /// RPC client connected to the node
306    pub node_client: NC,
307    /// Address where farming rewards should go
308    pub reward_address: Address,
309    /// Plotter
310    pub plotter: Arc<dyn Plotter + Send + Sync>,
311    /// Erasure coding instance to use.
312    pub erasure_coding: ErasureCoding,
313    /// Percentage of allocated space dedicated for caching purposes
314    pub cache_percentage: u8,
315    /// Thread pool size used for farming (mostly for blocking I/O, but also for some
316    /// compute-intensive operations during proving)
317    pub farming_thread_pool_size: usize,
318    /// Notification for plotter to start, can be used to delay plotting until some initialization
319    /// has happened externally
320    pub plotting_delay: Option<oneshot::Receiver<()>>,
321    /// Global mutex that can restrict concurrency of resource-intensive operations and make sure
322    /// that those operations that are very sensitive (like proving) have all the resources
323    /// available to them for the highest probability of success
324    pub global_mutex: Arc<AsyncMutex<()>>,
325    /// How many sectors a will be plotted concurrently per farm
326    pub max_plotting_sectors_per_farm: NonZeroUsize,
327    /// Disable farm locking, for example if file system doesn't support it
328    pub disable_farm_locking: bool,
329    /// Prometheus registry
330    pub registry: Option<&'a Mutex<&'a mut Registry>>,
331    /// Whether to create a farm if it doesn't yet exist
332    pub create: bool,
333}
334
335/// Errors happening when trying to create/open single disk farm
336#[derive(Debug, Error)]
337pub enum SingleDiskFarmError {
338    /// Failed to open or create identity
339    #[error("Failed to open or create identity: {0}")]
340    FailedToOpenIdentity(#[from] IdentityError),
341    /// Farm is likely already in use, make sure no other farmer is using it
342    #[error("Farm is likely already in use, make sure no other farmer is using it: {0}")]
343    LikelyAlreadyInUse(io::Error),
344    /// I/O error occurred
345    #[error("Single disk farm I/O error: {0}")]
346    Io(#[from] io::Error),
347    /// Failed to spawn task for blocking thread
348    #[error("Failed to spawn task for blocking thread: {0}")]
349    TokioJoinError(#[from] task::JoinError),
350    /// Piece cache error
351    #[error("Piece cache error: {0}")]
352    PieceCacheError(#[from] DiskPieceCacheError),
353    /// Can't preallocate metadata file, probably not enough space on disk
354    #[error("Can't preallocate metadata file, probably not enough space on disk: {0}")]
355    CantPreallocateMetadataFile(io::Error),
356    /// Can't preallocate plot file, probably not enough space on disk
357    #[error("Can't preallocate plot file, probably not enough space on disk: {0}")]
358    CantPreallocatePlotFile(io::Error),
359    /// Wrong chain (genesis hash)
360    #[error(
361        "Genesis hash of farm {id} {wrong_chain} is different from {correct_chain} when farm was \
362        created, it is not possible to use farm on a different chain"
363    )]
364    WrongChain {
365        /// Farm ID
366        id: FarmId,
367        /// Hex-encoded genesis hash during farm creation
368        // TODO: Wrapper type with `Display` impl for genesis hash
369        correct_chain: String,
370        /// Hex-encoded current genesis hash
371        wrong_chain: String,
372    },
373    /// Public key in identity doesn't match metadata
374    #[error(
375        "Public key of farm {id} {wrong_public_key} is different from {correct_public_key} when \
376        farm was created, something went wrong, likely due to manual edits"
377    )]
378    IdentityMismatch {
379        /// Farm ID
380        id: FarmId,
381        /// Public key used during farm creation
382        correct_public_key: Ed25519PublicKey,
383        /// Current public key
384        wrong_public_key: Ed25519PublicKey,
385    },
386    /// Invalid number pieces in sector
387    #[error(
388        "Invalid number pieces in sector: max supported {max_supported}, farm initialized with \
389        {initialized_with}"
390    )]
391    InvalidPiecesInSector {
392        /// Farm ID
393        id: FarmId,
394        /// Max supported pieces in sector
395        max_supported: u16,
396        /// Number of pieces in sector farm is initialized with
397        initialized_with: u16,
398    },
399    /// Failed to decode metadata header
400    #[error("Failed to decode metadata header: {0}")]
401    FailedToDecodeMetadataHeader(parity_scale_codec::Error),
402    /// Unexpected metadata version
403    #[error("Unexpected metadata version {0}")]
404    UnexpectedMetadataVersion(u8),
405    /// Allocated space is not enough for one sector
406    #[error(
407        "Allocated space is not enough for one sector. \
408        The lowest acceptable value for allocated space is {min_space} bytes, \
409        provided {allocated_space} bytes."
410    )]
411    InsufficientAllocatedSpace {
412        /// Minimal allocated space
413        min_space: u64,
414        /// Current allocated space
415        allocated_space: u64,
416    },
417    /// Farm is too large
418    #[error(
419        "Farm is too large: allocated {allocated_sectors} sectors ({allocated_space} bytes), max \
420        supported is {max_sectors} ({max_space} bytes). Consider creating multiple smaller farms \
421        instead."
422    )]
423    FarmTooLarge {
424        /// Allocated space
425        allocated_space: u64,
426        /// Allocated space in sectors
427        allocated_sectors: u64,
428        /// Max supported allocated space
429        max_space: u64,
430        /// Max supported allocated space in sectors
431        max_sectors: u16,
432    },
433    /// Failed to create thread pool
434    #[error("Failed to create thread pool: {0}")]
435    FailedToCreateThreadPool(ThreadPoolBuildError),
436}
437
438/// Errors happening during scrubbing
439#[derive(Debug, Error)]
440pub enum SingleDiskFarmScrubError {
441    /// Farm is likely already in use, make sure no other farmer is using it
442    #[error("Farm is likely already in use, make sure no other farmer is using it: {0}")]
443    LikelyAlreadyInUse(io::Error),
444    /// Failed to determine file size
445    #[error("Failed to file size of {file}: {error}")]
446    FailedToDetermineFileSize {
447        /// Affected file
448        file: PathBuf,
449        /// Low-level error
450        error: io::Error,
451    },
452    /// Failed to read bytes from file
453    #[error("Failed to read {size} bytes from {file} at offset {offset}: {error}")]
454    FailedToReadBytes {
455        /// Affected file
456        file: PathBuf,
457        /// Number of bytes to read
458        size: u64,
459        /// Offset in the file
460        offset: u64,
461        /// Low-level error
462        error: io::Error,
463    },
464    /// Failed to write bytes from file
465    #[error("Failed to write {size} bytes from {file} at offset {offset}: {error}")]
466    FailedToWriteBytes {
467        /// Affected file
468        file: PathBuf,
469        /// Number of bytes to read
470        size: u64,
471        /// Offset in the file
472        offset: u64,
473        /// Low-level error
474        error: io::Error,
475    },
476    /// Farm info file does not exist
477    #[error("Farm info file does not exist at {file}")]
478    FarmInfoFileDoesNotExist {
479        /// Info file
480        file: PathBuf,
481    },
482    /// Farm info can't be opened
483    #[error("Farm info at {file} can't be opened: {error}")]
484    FarmInfoCantBeOpened {
485        /// Info file
486        file: PathBuf,
487        /// Low-level error
488        error: io::Error,
489    },
490    /// Identity file does not exist
491    #[error("Identity file does not exist at {file}")]
492    IdentityFileDoesNotExist {
493        /// Identity file
494        file: PathBuf,
495    },
496    /// Identity can't be opened
497    #[error("Identity at {file} can't be opened: {error}")]
498    IdentityCantBeOpened {
499        /// Identity file
500        file: PathBuf,
501        /// Low-level error
502        error: IdentityError,
503    },
504    /// Identity public key doesn't match public key in the disk farm info
505    #[error("Identity public key {identity} doesn't match public key in the disk farm info {info}")]
506    PublicKeyMismatch {
507        /// Identity public key
508        identity: Ed25519PublicKey,
509        /// Disk farm info public key
510        info: Ed25519PublicKey,
511    },
512    /// Metadata file does not exist
513    #[error("Metadata file does not exist at {file}")]
514    MetadataFileDoesNotExist {
515        /// Metadata file
516        file: PathBuf,
517    },
518    /// Metadata can't be opened
519    #[error("Metadata at {file} can't be opened: {error}")]
520    MetadataCantBeOpened {
521        /// Metadata file
522        file: PathBuf,
523        /// Low-level error
524        error: io::Error,
525    },
526    /// Metadata file too small
527    #[error(
528        "Metadata file at {file} is too small: reserved size is {reserved_size} bytes, file size \
529        is {size}"
530    )]
531    MetadataFileTooSmall {
532        /// Metadata file
533        file: PathBuf,
534        /// Reserved size
535        reserved_size: u64,
536        /// File size
537        size: u64,
538    },
539    /// Failed to decode metadata header
540    #[error("Failed to decode metadata header: {0}")]
541    FailedToDecodeMetadataHeader(parity_scale_codec::Error),
542    /// Unexpected metadata version
543    #[error("Unexpected metadata version {0}")]
544    UnexpectedMetadataVersion(u8),
545    /// Cache can't be opened
546    #[error("Cache at {file} can't be opened: {error}")]
547    CacheCantBeOpened {
548        /// Cache file
549        file: PathBuf,
550        /// Low-level error
551        error: io::Error,
552    },
553}
554
555/// Errors that happen in background tasks
556#[derive(Debug, Error)]
557pub enum BackgroundTaskError {
558    /// Plotting error
559    #[error(transparent)]
560    Plotting(#[from] PlottingError),
561    /// Farming error
562    #[error(transparent)]
563    Farming(#[from] FarmingError),
564    /// Block sealing
565    #[error(transparent)]
566    BlockSealing(#[from] anyhow::Error),
567    /// Background task panicked
568    #[error("Background task {task} panicked")]
569    BackgroundTaskPanicked {
570        /// Name of the task
571        task: String,
572    },
573}
574
575type BackgroundTask = Pin<Box<dyn Future<Output = Result<(), BackgroundTaskError>> + Send>>;
576
577/// Scrub target
578#[derive(Debug, Copy, Clone)]
579pub enum ScrubTarget {
580    /// Scrub everything
581    All,
582    /// Scrub just metadata
583    Metadata,
584    /// Scrub metadata and corresponding plot
585    Plot,
586    /// Only scrub cache
587    Cache,
588}
589
590impl fmt::Display for ScrubTarget {
591    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
592        match self {
593            Self::All => f.write_str("all"),
594            Self::Metadata => f.write_str("metadata"),
595            Self::Plot => f.write_str("plot"),
596            Self::Cache => f.write_str("cache"),
597        }
598    }
599}
600
601impl FromStr for ScrubTarget {
602    type Err = String;
603
604    fn from_str(s: &str) -> Result<Self, Self::Err> {
605        match s {
606            "all" => Ok(Self::All),
607            "metadata" => Ok(Self::Metadata),
608            "plot" => Ok(Self::Plot),
609            "cache" => Ok(Self::Cache),
610            s => Err(format!("Can't parse {s} as `ScrubTarget`")),
611        }
612    }
613}
614
615impl ScrubTarget {
616    fn metadata(&self) -> bool {
617        match self {
618            Self::All | Self::Metadata | Self::Plot => true,
619            Self::Cache => false,
620        }
621    }
622
623    fn plot(&self) -> bool {
624        match self {
625            Self::All | Self::Plot => true,
626            Self::Metadata | Self::Cache => false,
627        }
628    }
629
630    fn cache(&self) -> bool {
631        match self {
632            Self::All | Self::Cache => true,
633            Self::Metadata | Self::Plot => false,
634        }
635    }
636}
637
638struct AllocatedSpaceDistribution {
639    piece_cache_file_size: u64,
640    piece_cache_capacity: u32,
641    plot_file_size: u64,
642    target_sector_count: u16,
643    metadata_file_size: u64,
644}
645
646impl AllocatedSpaceDistribution {
647    fn new(
648        allocated_space: u64,
649        sector_size: u64,
650        cache_percentage: u8,
651        sector_metadata_size: u64,
652    ) -> Result<Self, SingleDiskFarmError> {
653        let single_sector_overhead = sector_size + sector_metadata_size;
654        // Fixed space usage regardless of plot size
655        let fixed_space_usage = RESERVED_PLOT_METADATA
656            + RESERVED_FARM_INFO
657            + Identity::file_size() as u64
658            + KnownPeersManager::file_size(KNOWN_PEERS_CACHE_SIZE) as u64;
659        // Calculate how many sectors can fit
660        let target_sector_count = {
661            let potentially_plottable_space = allocated_space.saturating_sub(fixed_space_usage)
662                / 100
663                * (100 - u64::from(cache_percentage));
664            // Do the rounding to make sure we have exactly as much space as fits whole number of
665            // sectors, account for disk sector size just in case
666            (potentially_plottable_space - DISK_PAGE_SIZE as u64) / single_sector_overhead
667        };
668
669        if target_sector_count == 0 {
670            let mut single_plot_with_cache_space =
671                single_sector_overhead.div_ceil(100 - u64::from(cache_percentage)) * 100;
672            // Cache must not be empty, ensure it contains at least one element even if
673            // percentage-wise it will use more space
674            if single_plot_with_cache_space - single_sector_overhead
675                < DiskPieceCache::element_size() as u64
676            {
677                single_plot_with_cache_space =
678                    single_sector_overhead + DiskPieceCache::element_size() as u64;
679            }
680
681            return Err(SingleDiskFarmError::InsufficientAllocatedSpace {
682                min_space: fixed_space_usage + single_plot_with_cache_space,
683                allocated_space,
684            });
685        }
686        let plot_file_size = target_sector_count * sector_size;
687        // Align plot file size for disk sector size
688        let plot_file_size = plot_file_size.div_ceil(DISK_PAGE_SIZE as u64) * DISK_PAGE_SIZE as u64;
689
690        // Remaining space will be used for caching purposes
691        let piece_cache_capacity = if cache_percentage > 0 {
692            let cache_space = allocated_space
693                - fixed_space_usage
694                - plot_file_size
695                - (sector_metadata_size * target_sector_count);
696            (cache_space / u64::from(DiskPieceCache::element_size())) as u32
697        } else {
698            0
699        };
700        let target_sector_count = match u16::try_from(target_sector_count).map(SectorIndex::from) {
701            Ok(target_sector_count) if target_sector_count < SectorIndex::MAX => {
702                u16::from(target_sector_count)
703            }
704            _ => {
705                // We use `u16` for both count and index, hence index must not reach actual `MAX`
706                // (consensus doesn't care about this, just farmer implementation detail)
707                let max_sectors = u16::from(SectorIndex::MAX) - 1;
708                return Err(SingleDiskFarmError::FarmTooLarge {
709                    allocated_space: target_sector_count * sector_size,
710                    allocated_sectors: target_sector_count,
711                    max_space: max_sectors as u64 * sector_size,
712                    max_sectors,
713                });
714            }
715        };
716
717        Ok(Self {
718            piece_cache_file_size: u64::from(piece_cache_capacity)
719                * u64::from(DiskPieceCache::element_size()),
720            piece_cache_capacity,
721            plot_file_size,
722            target_sector_count,
723            metadata_file_size: RESERVED_PLOT_METADATA
724                + sector_metadata_size * u64::from(target_sector_count),
725        })
726    }
727}
728
729type Handler<A> = Bag<HandlerFn<A>, A>;
730
731#[derive(Default, Debug)]
732struct Handlers {
733    sector_update: Handler<(SectorIndex, SectorUpdate)>,
734    farming_notification: Handler<FarmingNotification>,
735    solution: Handler<SolutionResponse>,
736}
737
738struct SingleDiskFarmInit {
739    identity: Identity,
740    single_disk_farm_info: SingleDiskFarmInfo,
741    single_disk_farm_info_lock: Option<SingleDiskFarmInfoLock>,
742    plot_file: Arc<DirectIoFileWrapper>,
743    metadata_file: DirectIoFileWrapper,
744    metadata_header: PlotMetadataHeader,
745    target_sector_count: u16,
746    sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
747    piece_cache_capacity: u32,
748    plot_cache: DiskPlotCache,
749}
750
751/// Single disk farm abstraction is a container for everything necessary to plot/farm with a single
752/// disk.
753///
754/// Farm starts operating during creation and doesn't stop until dropped (or error happens).
755#[derive(Debug)]
756#[must_use = "Plot does not function properly unless run() method is called"]
757pub struct SingleDiskFarm {
758    farmer_protocol_info: FarmerProtocolInfo,
759    single_disk_farm_info: SingleDiskFarmInfo,
760    shard_commitments_roots_cache: ShardCommitmentsRootsCache,
761    /// Metadata of all sectors plotted so far
762    sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
763    pieces_in_sector: u16,
764    total_sectors_count: u16,
765    span: Span,
766    tasks: FuturesUnordered<BackgroundTask>,
767    handlers: Arc<Handlers>,
768    piece_cache: SingleDiskPieceCache,
769    plot_cache: DiskPlotCache,
770    piece_reader: DiskPieceReader,
771    /// Sender that will be used to signal to background threads that they should start
772    start_sender: Option<broadcast::Sender<()>>,
773    /// Sender that will be used to signal to background threads that they must stop
774    stop_sender: Option<broadcast::Sender<()>>,
775    _single_disk_farm_info_lock: Option<SingleDiskFarmInfoLock>,
776}
777
778impl Drop for SingleDiskFarm {
779    #[inline]
780    fn drop(&mut self) {
781        self.piece_reader.close_all_readers();
782        // Make background threads that are waiting to do something exit immediately
783        self.start_sender.take();
784        // Notify background tasks that they must stop
785        self.stop_sender.take();
786    }
787}
788
789#[async_trait(?Send)]
790impl Farm for SingleDiskFarm {
791    fn id(&self) -> &FarmId {
792        self.id()
793    }
794
795    fn total_sectors_count(&self) -> u16 {
796        self.total_sectors_count
797    }
798
799    fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static> {
800        Arc::new(self.plotted_sectors())
801    }
802
803    fn piece_reader(&self) -> Arc<dyn PieceReader + 'static> {
804        Arc::new(self.piece_reader())
805    }
806
807    fn on_sector_update(
808        &self,
809        callback: HandlerFn<(SectorIndex, SectorUpdate)>,
810    ) -> Box<dyn farm::HandlerId> {
811        Box::new(self.on_sector_update(callback))
812    }
813
814    fn on_farming_notification(
815        &self,
816        callback: HandlerFn<FarmingNotification>,
817    ) -> Box<dyn farm::HandlerId> {
818        Box::new(self.on_farming_notification(callback))
819    }
820
821    fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn farm::HandlerId> {
822        Box::new(self.on_solution(callback))
823    }
824
825    fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
826        Box::pin((*self).run())
827    }
828}
829
830impl SingleDiskFarm {
831    /// Name of the plot file
832    pub const PLOT_FILE: &'static str = "plot.bin";
833    /// Name of the metadata file
834    pub const METADATA_FILE: &'static str = "metadata.bin";
835    const SUPPORTED_PLOT_VERSION: u8 = 0;
836
837    /// Create new single disk farm instance
838    pub async fn new<NC, PosTable>(
839        options: SingleDiskFarmOptions<'_, NC>,
840        farm_index: usize,
841    ) -> Result<Self, SingleDiskFarmError>
842    where
843        NC: NodeClient + Clone,
844        PosTable: Table,
845    {
846        let span = Span::current();
847
848        let SingleDiskFarmOptions {
849            directory,
850            farmer_app_info,
851            allocated_space,
852            max_pieces_in_sector,
853            node_client,
854            reward_address,
855            plotter,
856            erasure_coding,
857            cache_percentage,
858            farming_thread_pool_size,
859            plotting_delay,
860            global_mutex,
861            max_plotting_sectors_per_farm,
862            disable_farm_locking,
863            registry,
864            create,
865        } = options;
866
867        let single_disk_farm_init_fut = task::spawn_blocking({
868            let directory = directory.clone();
869            let farmer_app_info = farmer_app_info.clone();
870            let span = span.clone();
871
872            move || {
873                let _span_guard = span.enter();
874                Self::init(
875                    &directory,
876                    &farmer_app_info,
877                    allocated_space,
878                    max_pieces_in_sector,
879                    cache_percentage,
880                    disable_farm_locking,
881                    create,
882                )
883            }
884        });
885
886        let single_disk_farm_init =
887            AsyncJoinOnDrop::new(single_disk_farm_init_fut, false).await??;
888
889        let SingleDiskFarmInit {
890            identity,
891            single_disk_farm_info,
892            single_disk_farm_info_lock,
893            plot_file,
894            metadata_file,
895            metadata_header,
896            target_sector_count,
897            sectors_metadata,
898            piece_cache_capacity,
899            plot_cache,
900        } = single_disk_farm_init;
901
902        let piece_cache = {
903            // Convert farm ID into cache ID for single disk farm
904            let FarmId::Ulid(id) = *single_disk_farm_info.id();
905            let id = PieceCacheId::Ulid(id);
906
907            SingleDiskPieceCache::new(
908                id,
909                if let Some(piece_cache_capacity) = NonZeroU32::new(piece_cache_capacity) {
910                    Some(task::block_in_place(|| {
911                        if let Some(registry) = registry {
912                            DiskPieceCache::open(
913                                &directory,
914                                piece_cache_capacity,
915                                Some(id),
916                                Some(*registry.lock()),
917                            )
918                        } else {
919                            DiskPieceCache::open(&directory, piece_cache_capacity, Some(id), None)
920                        }
921                    })?)
922                } else {
923                    None
924                },
925            )
926        };
927
928        let public_key = *single_disk_farm_info.public_key();
929        let shard_commitments_roots_cache =
930            ShardCommitmentsRootsCache::new(*single_disk_farm_info.shard_commitments_seed());
931        let pieces_in_sector = single_disk_farm_info.pieces_in_sector();
932        let sector_size = sector_size(pieces_in_sector);
933
934        let metrics = registry.map(|registry| {
935            Arc::new(SingleDiskFarmMetrics::new(
936                *registry.lock(),
937                single_disk_farm_info.id(),
938                target_sector_count,
939                sectors_metadata.read_blocking().len() as u16,
940            ))
941        });
942
943        let (error_sender, error_receiver) = oneshot::channel();
944        let error_sender = Arc::new(Mutex::new(Some(error_sender)));
945
946        let tasks = FuturesUnordered::<BackgroundTask>::new();
947
948        tasks.push(Box::pin(async move {
949            if let Ok(error) = error_receiver.await {
950                return Err(error);
951            }
952
953            Ok(())
954        }));
955
956        let handlers = Arc::<Handlers>::default();
957        let (start_sender, mut start_receiver) = broadcast::channel::<()>(1);
958        let (stop_sender, mut stop_receiver) = broadcast::channel::<()>(1);
959        let sectors_being_modified = Arc::<AsyncRwLock<HashSet<SectorIndex>>>::default();
960        let (sectors_to_plot_sender, sectors_to_plot_receiver) = mpsc::channel(1);
961        // Some sectors may already be plotted, skip them
962        let sectors_indices_left_to_plot = SectorIndex::new(metadata_header.plotted_sector_count)
963            ..SectorIndex::new(target_sector_count);
964
965        let farming_thread_pool = ThreadPoolBuilder::new()
966            .thread_name(move |thread_index| format!("farming-{farm_index}.{thread_index}"))
967            .num_threads(farming_thread_pool_size)
968            .spawn_handler(tokio_rayon_spawn_handler())
969            .build()
970            .map_err(SingleDiskFarmError::FailedToCreateThreadPool)?;
971        let farming_plot_fut = task::spawn_blocking(|| {
972            farming_thread_pool
973                .install(move || {
974                    RayonFiles::open_with(directory.join(Self::PLOT_FILE), |path| {
975                        DirectIoFileWrapper::open(path)
976                    })
977                })
978                .map(|farming_plot| (farming_plot, farming_thread_pool))
979        });
980
981        let (farming_plot, farming_thread_pool) =
982            AsyncJoinOnDrop::new(farming_plot_fut, false).await??;
983
984        let plotting_join_handle = task::spawn_blocking({
985            let shard_commitments_roots_cache = shard_commitments_roots_cache.clone();
986            let sectors_metadata = Arc::clone(&sectors_metadata);
987            let handlers = Arc::clone(&handlers);
988            let sectors_being_modified = Arc::clone(&sectors_being_modified);
989            let node_client = node_client.clone();
990            let plot_file = Arc::clone(&plot_file);
991            let error_sender = Arc::clone(&error_sender);
992            let span = span.clone();
993            let global_mutex = Arc::clone(&global_mutex);
994            let metrics = metrics.clone();
995
996            move || {
997                let _span_guard = span.enter();
998
999                let plotting_options = PlottingOptions {
1000                    metadata_header,
1001                    sectors_metadata: &sectors_metadata,
1002                    sectors_being_modified: &sectors_being_modified,
1003                    sectors_to_plot_receiver,
1004                    sector_plotting_options: SectorPlottingOptions {
1005                        public_key,
1006                        shard_commitments_roots_cache,
1007                        node_client: &node_client,
1008                        pieces_in_sector,
1009                        sector_size,
1010                        plot_file,
1011                        metadata_file: Arc::new(metadata_file),
1012                        handlers: &handlers,
1013                        global_mutex: &global_mutex,
1014                        plotter,
1015                        metrics,
1016                    },
1017                    max_plotting_sectors_per_farm,
1018                };
1019
1020                let plotting_fut = async {
1021                    if start_receiver.recv().await.is_err() {
1022                        // Dropped before starting
1023                        return Ok(());
1024                    }
1025
1026                    if let Some(plotting_delay) = plotting_delay
1027                        && plotting_delay.await.is_err()
1028                    {
1029                        // Dropped before resolving
1030                        return Ok(());
1031                    }
1032
1033                    plotting(plotting_options).await
1034                };
1035
1036                Handle::current().block_on(async {
1037                    select! {
1038                        plotting_result = plotting_fut.fuse() => {
1039                            if let Err(error) = plotting_result
1040                                && let Some(error_sender) = error_sender.lock().take()
1041                                && let Err(error) = error_sender.send(error.into())
1042                            {
1043                                error!(
1044                                    %error,
1045                                    "Plotting failed to send error to background task"
1046                                );
1047                            }
1048                        }
1049                        _ = stop_receiver.recv().fuse() => {
1050                            // Nothing, just exit
1051                        }
1052                    }
1053                });
1054            }
1055        });
1056        let plotting_join_handle = AsyncJoinOnDrop::new(plotting_join_handle, false);
1057
1058        tasks.push(Box::pin(async move {
1059            // Panic will already be printed by now
1060            plotting_join_handle.await.map_err(|_error| {
1061                BackgroundTaskError::BackgroundTaskPanicked {
1062                    task: format!("plotting-{farm_index}"),
1063                }
1064            })
1065        }));
1066
1067        let public_key_hash = public_key.hash();
1068
1069        let plotting_scheduler_options = PlottingSchedulerOptions {
1070            public_key_hash,
1071            shard_commitments_roots_cache: shard_commitments_roots_cache.clone(),
1072            sectors_indices_left_to_plot,
1073            target_sector_count,
1074            last_archived_segment_index: farmer_app_info.protocol_info.history_size.segment_index(),
1075            min_sector_lifetime: farmer_app_info.protocol_info.min_sector_lifetime,
1076            node_client: node_client.clone(),
1077            handlers: Arc::clone(&handlers),
1078            sectors_metadata: Arc::clone(&sectors_metadata),
1079            sectors_to_plot_sender,
1080            new_segment_processing_delay: NEW_SEGMENT_PROCESSING_DELAY,
1081            metrics: metrics.clone(),
1082        };
1083        tasks.push(Box::pin(plotting_scheduler(plotting_scheduler_options)));
1084
1085        let (slot_info_forwarder_sender, slot_info_forwarder_receiver) = mpsc::channel(0);
1086
1087        tasks.push(Box::pin({
1088            let node_client = node_client.clone();
1089            let metrics = metrics.clone();
1090
1091            async move {
1092                slot_notification_forwarder(&node_client, slot_info_forwarder_sender, metrics)
1093                    .await
1094                    .map_err(BackgroundTaskError::Farming)
1095            }
1096        }));
1097
1098        let farming_join_handle = task::spawn_blocking({
1099            let shard_commitments_roots_cache = shard_commitments_roots_cache.clone();
1100            let erasure_coding = erasure_coding.clone();
1101            let handlers = Arc::clone(&handlers);
1102            let sectors_being_modified = Arc::clone(&sectors_being_modified);
1103            let sectors_metadata = Arc::clone(&sectors_metadata);
1104            let mut start_receiver = start_sender.subscribe();
1105            let mut stop_receiver = stop_sender.subscribe();
1106            let node_client = node_client.clone();
1107            let span = span.clone();
1108            let global_mutex = Arc::clone(&global_mutex);
1109
1110            move || {
1111                let _span_guard = span.enter();
1112
1113                let farming_fut = async move {
1114                    if start_receiver.recv().await.is_err() {
1115                        // Dropped before starting
1116                        return Ok(());
1117                    }
1118
1119                    let plot_audit = PlotAudit::new(&farming_plot);
1120
1121                    let farming_options = FarmingOptions {
1122                        public_key_hash,
1123                        shard_commitments_roots_cache,
1124                        reward_address,
1125                        node_client,
1126                        plot_audit,
1127                        sectors_metadata,
1128                        erasure_coding,
1129                        handlers,
1130                        sectors_being_modified,
1131                        slot_info_notifications: slot_info_forwarder_receiver,
1132                        thread_pool: farming_thread_pool,
1133                        global_mutex,
1134                        metrics,
1135                    };
1136                    farming::<PosTable, _, _>(farming_options).await
1137                };
1138
1139                Handle::current().block_on(async {
1140                    select! {
1141                        farming_result = farming_fut.fuse() => {
1142                            if let Err(error) = farming_result
1143                                && let Some(error_sender) = error_sender.lock().take()
1144                                && let Err(error) = error_sender.send(error.into())
1145                            {
1146                                error!(
1147                                    %error,
1148                                    "Farming failed to send error to background task",
1149                                );
1150                            }
1151                        }
1152                        _ = stop_receiver.recv().fuse() => {
1153                            // Nothing, just exit
1154                        }
1155                    }
1156                });
1157            }
1158        });
1159        let farming_join_handle = AsyncJoinOnDrop::new(farming_join_handle, false);
1160
1161        tasks.push(Box::pin(async move {
1162            // Panic will already be printed by now
1163            farming_join_handle.await.map_err(|_error| {
1164                BackgroundTaskError::BackgroundTaskPanicked {
1165                    task: format!("farming-{farm_index}"),
1166                }
1167            })
1168        }));
1169
1170        let (piece_reader, reading_fut) = DiskPieceReader::new::<PosTable>(
1171            public_key_hash,
1172            shard_commitments_roots_cache.clone(),
1173            pieces_in_sector,
1174            plot_file,
1175            Arc::clone(&sectors_metadata),
1176            erasure_coding,
1177            sectors_being_modified,
1178            global_mutex,
1179        );
1180
1181        let reading_join_handle = task::spawn_blocking({
1182            let mut stop_receiver = stop_sender.subscribe();
1183            let reading_fut = reading_fut.instrument(span.clone());
1184
1185            move || {
1186                Handle::current().block_on(async {
1187                    select! {
1188                        _ = reading_fut.fuse() => {
1189                            // Nothing, just exit
1190                        }
1191                        _ = stop_receiver.recv().fuse() => {
1192                            // Nothing, just exit
1193                        }
1194                    }
1195                });
1196            }
1197        });
1198
1199        let reading_join_handle = AsyncJoinOnDrop::new(reading_join_handle, false);
1200
1201        tasks.push(Box::pin(async move {
1202            // Panic will already be printed by now
1203            reading_join_handle.await.map_err(|_error| {
1204                BackgroundTaskError::BackgroundTaskPanicked {
1205                    task: format!("reading-{farm_index}"),
1206                }
1207            })
1208        }));
1209
1210        tasks.push(Box::pin(async move {
1211            match block_sealing(node_client, identity).await {
1212                Ok(block_sealing_fut) => {
1213                    block_sealing_fut.await;
1214                }
1215                Err(error) => {
1216                    return Err(BackgroundTaskError::BlockSealing(anyhow::anyhow!(
1217                        "Failed to subscribe to block sealing notifications: {error}"
1218                    )));
1219                }
1220            }
1221
1222            Ok(())
1223        }));
1224
1225        let farm = Self {
1226            farmer_protocol_info: farmer_app_info.protocol_info,
1227            single_disk_farm_info,
1228            shard_commitments_roots_cache,
1229            sectors_metadata,
1230            pieces_in_sector,
1231            total_sectors_count: target_sector_count,
1232            span,
1233            tasks,
1234            handlers,
1235            piece_cache,
1236            plot_cache,
1237            piece_reader,
1238            start_sender: Some(start_sender),
1239            stop_sender: Some(stop_sender),
1240            _single_disk_farm_info_lock: single_disk_farm_info_lock,
1241        };
1242        Ok(farm)
1243    }
1244
1245    fn init(
1246        directory: &PathBuf,
1247        farmer_app_info: &FarmerAppInfo,
1248        allocated_space: u64,
1249        max_pieces_in_sector: u16,
1250        cache_percentage: u8,
1251        disable_farm_locking: bool,
1252        create: bool,
1253    ) -> Result<SingleDiskFarmInit, SingleDiskFarmError> {
1254        fs::create_dir_all(directory)?;
1255
1256        let identity = if create {
1257            Identity::open_or_create(directory)?
1258        } else {
1259            Identity::open(directory)?.ok_or_else(|| {
1260                IdentityError::Io(io::Error::new(
1261                    io::ErrorKind::NotFound,
1262                    "Farm does not exist and creation was explicitly disabled",
1263                ))
1264            })?
1265        };
1266        let public_key = identity.public_key();
1267
1268        let (single_disk_farm_info, single_disk_farm_info_lock) =
1269            match SingleDiskFarmInfo::load_from(directory)? {
1270                Some(mut single_disk_farm_info) => {
1271                    if &farmer_app_info.genesis_root != single_disk_farm_info.genesis_root() {
1272                        return Err(SingleDiskFarmError::WrongChain {
1273                            id: *single_disk_farm_info.id(),
1274                            correct_chain: hex::encode(single_disk_farm_info.genesis_root()),
1275                            wrong_chain: hex::encode(farmer_app_info.genesis_root),
1276                        });
1277                    }
1278
1279                    if &public_key != single_disk_farm_info.public_key() {
1280                        return Err(SingleDiskFarmError::IdentityMismatch {
1281                            id: *single_disk_farm_info.id(),
1282                            correct_public_key: *single_disk_farm_info.public_key(),
1283                            wrong_public_key: public_key,
1284                        });
1285                    }
1286
1287                    let pieces_in_sector = single_disk_farm_info.pieces_in_sector();
1288
1289                    if max_pieces_in_sector < pieces_in_sector {
1290                        return Err(SingleDiskFarmError::InvalidPiecesInSector {
1291                            id: *single_disk_farm_info.id(),
1292                            max_supported: max_pieces_in_sector,
1293                            initialized_with: pieces_in_sector,
1294                        });
1295                    }
1296
1297                    if max_pieces_in_sector > pieces_in_sector {
1298                        info!(
1299                            pieces_in_sector,
1300                            max_pieces_in_sector,
1301                            "Farm initialized with smaller number of pieces in sector, farm needs \
1302                            to be re-created for increase"
1303                        );
1304                    }
1305
1306                    let mut single_disk_farm_info_lock = None;
1307
1308                    if allocated_space != single_disk_farm_info.allocated_space() {
1309                        info!(
1310                            old_space = %ByteSize::b(single_disk_farm_info.allocated_space()).display().iec(),
1311                            new_space = %ByteSize::b(allocated_space).display().iec(),
1312                            "Farm size has changed"
1313                        );
1314
1315                        let new_allocated_space = allocated_space;
1316                        match &mut single_disk_farm_info {
1317                            SingleDiskFarmInfo::V0 {
1318                                allocated_space, ..
1319                            } => {
1320                                *allocated_space = new_allocated_space;
1321                            }
1322                        }
1323
1324                        single_disk_farm_info_lock =
1325                            single_disk_farm_info.store_to(directory, !disable_farm_locking)?;
1326                    } else if !disable_farm_locking {
1327                        single_disk_farm_info_lock = Some(
1328                            SingleDiskFarmInfo::try_lock(directory)
1329                                .map_err(SingleDiskFarmError::LikelyAlreadyInUse)?,
1330                        );
1331                    }
1332
1333                    (single_disk_farm_info, single_disk_farm_info_lock)
1334                }
1335                None => {
1336                    let single_disk_farm_info = SingleDiskFarmInfo::new(
1337                        FarmId::new(),
1338                        farmer_app_info.genesis_root,
1339                        public_key,
1340                        identity.shard_commitments_seed(),
1341                        max_pieces_in_sector,
1342                        allocated_space,
1343                    );
1344
1345                    let single_disk_farm_info_lock =
1346                        single_disk_farm_info.store_to(directory, !disable_farm_locking)?;
1347
1348                    (single_disk_farm_info, single_disk_farm_info_lock)
1349                }
1350            };
1351
1352        let pieces_in_sector = single_disk_farm_info.pieces_in_sector();
1353        let sector_size = sector_size(pieces_in_sector) as u64;
1354        let sector_metadata_size = SectorMetadataChecksummed::encoded_size();
1355        let allocated_space_distribution = AllocatedSpaceDistribution::new(
1356            allocated_space,
1357            sector_size,
1358            cache_percentage,
1359            sector_metadata_size as u64,
1360        )?;
1361        let target_sector_count = allocated_space_distribution.target_sector_count;
1362
1363        let metadata_file_path = directory.join(Self::METADATA_FILE);
1364        let metadata_file = DirectIoFileWrapper::open(&metadata_file_path)?;
1365
1366        let metadata_size = metadata_file.size()?;
1367        let expected_metadata_size = allocated_space_distribution.metadata_file_size;
1368        // Align plot file size for disk sector size
1369        let expected_metadata_size =
1370            expected_metadata_size.div_ceil(DISK_PAGE_SIZE as u64) * DISK_PAGE_SIZE as u64;
1371        let metadata_header = if metadata_size == 0 {
1372            let metadata_header = PlotMetadataHeader {
1373                version: SingleDiskFarm::SUPPORTED_PLOT_VERSION,
1374                plotted_sector_count: 0,
1375            };
1376
1377            metadata_file
1378                .preallocate(expected_metadata_size)
1379                .map_err(SingleDiskFarmError::CantPreallocateMetadataFile)?;
1380            metadata_file.write_all_at(metadata_header.encode().as_slice(), 0)?;
1381
1382            metadata_header
1383        } else {
1384            if metadata_size != expected_metadata_size {
1385                // Allocating the whole file (`set_len` below can create a sparse file, which will
1386                // cause writes to fail later)
1387                metadata_file
1388                    .preallocate(expected_metadata_size)
1389                    .map_err(SingleDiskFarmError::CantPreallocateMetadataFile)?;
1390                // Truncating file (if necessary)
1391                metadata_file.set_len(expected_metadata_size)?;
1392            }
1393
1394            let mut metadata_header_bytes = vec![0; PlotMetadataHeader::encoded_size()];
1395            metadata_file.read_exact_at(&mut metadata_header_bytes, 0)?;
1396
1397            let mut metadata_header =
1398                PlotMetadataHeader::decode(&mut metadata_header_bytes.as_ref())
1399                    .map_err(SingleDiskFarmError::FailedToDecodeMetadataHeader)?;
1400
1401            if metadata_header.version != SingleDiskFarm::SUPPORTED_PLOT_VERSION {
1402                return Err(SingleDiskFarmError::UnexpectedMetadataVersion(
1403                    metadata_header.version,
1404                ));
1405            }
1406
1407            if metadata_header.plotted_sector_count > target_sector_count {
1408                metadata_header.plotted_sector_count = target_sector_count;
1409                metadata_file.write_all_at(&metadata_header.encode(), 0)?;
1410            }
1411
1412            metadata_header
1413        };
1414
1415        let sectors_metadata = {
1416            let mut sectors_metadata =
1417                Vec::<SectorMetadataChecksummed>::with_capacity(usize::from(target_sector_count));
1418
1419            let mut sector_metadata_bytes = vec![0; sector_metadata_size];
1420            for sector_index in
1421                SectorIndex::ZERO..SectorIndex::new(metadata_header.plotted_sector_count)
1422            {
1423                let sector_offset =
1424                    RESERVED_PLOT_METADATA + sector_metadata_size as u64 * u64::from(sector_index);
1425                metadata_file.read_exact_at(&mut sector_metadata_bytes, sector_offset)?;
1426
1427                let sector_metadata =
1428                    match SectorMetadataChecksummed::decode(&mut sector_metadata_bytes.as_ref()) {
1429                        Ok(sector_metadata) => sector_metadata,
1430                        Err(error) => {
1431                            warn!(
1432                                path = %metadata_file_path.display(),
1433                                %error,
1434                                %sector_index,
1435                                "Failed to decode sector metadata, replacing with dummy expired \
1436                                sector metadata"
1437                            );
1438
1439                            let dummy_sector = SectorMetadataChecksummed::from(SectorMetadata {
1440                                sector_index,
1441                                pieces_in_sector,
1442                                s_bucket_sizes: Box::new([0; Record::NUM_S_BUCKETS]),
1443                                history_size: HistorySize::from(SegmentIndex::ZERO),
1444                            });
1445                            metadata_file.write_all_at(&dummy_sector.encode(), sector_offset)?;
1446
1447                            dummy_sector
1448                        }
1449                    };
1450                sectors_metadata.push(sector_metadata);
1451            }
1452
1453            Arc::new(AsyncRwLock::new(sectors_metadata))
1454        };
1455
1456        let plot_file = DirectIoFileWrapper::open(directory.join(Self::PLOT_FILE))?;
1457
1458        if plot_file.size()? != allocated_space_distribution.plot_file_size {
1459            // Allocating the whole file (`set_len` below can create a sparse file, which will cause
1460            // writes to fail later)
1461            plot_file
1462                .preallocate(allocated_space_distribution.plot_file_size)
1463                .map_err(SingleDiskFarmError::CantPreallocatePlotFile)?;
1464            // Truncating file (if necessary)
1465            plot_file.set_len(allocated_space_distribution.plot_file_size)?;
1466        }
1467
1468        let plot_file = Arc::new(plot_file);
1469
1470        let plot_cache = DiskPlotCache::new(
1471            &plot_file,
1472            &sectors_metadata,
1473            target_sector_count,
1474            sector_size,
1475        );
1476
1477        Ok(SingleDiskFarmInit {
1478            identity,
1479            single_disk_farm_info,
1480            single_disk_farm_info_lock,
1481            plot_file,
1482            metadata_file,
1483            metadata_header,
1484            target_sector_count,
1485            sectors_metadata,
1486            piece_cache_capacity: allocated_space_distribution.piece_cache_capacity,
1487            plot_cache,
1488        })
1489    }
1490
1491    /// Collect summary of single disk farm for presentational purposes
1492    pub fn collect_summary(directory: PathBuf) -> SingleDiskFarmSummary {
1493        let single_disk_farm_info = match SingleDiskFarmInfo::load_from(&directory) {
1494            Ok(Some(single_disk_farm_info)) => single_disk_farm_info,
1495            Ok(None) => {
1496                return SingleDiskFarmSummary::NotFound { directory };
1497            }
1498            Err(error) => {
1499                return SingleDiskFarmSummary::Error { directory, error };
1500            }
1501        };
1502
1503        SingleDiskFarmSummary::Found {
1504            info: single_disk_farm_info,
1505            directory,
1506        }
1507    }
1508
1509    /// Effective on-disk allocation of the files related to the farm (takes some buffer space
1510    /// into consideration).
1511    ///
1512    /// This is a helpful number in case some files were not allocated properly or were removed and
1513    /// do not correspond to allocated space in the farm info accurately.
1514    pub fn effective_disk_usage(
1515        directory: &Path,
1516        cache_percentage: u8,
1517    ) -> Result<u64, SingleDiskFarmError> {
1518        let mut effective_disk_usage;
1519        match SingleDiskFarmInfo::load_from(directory)? {
1520            Some(single_disk_farm_info) => {
1521                let allocated_space_distribution = AllocatedSpaceDistribution::new(
1522                    single_disk_farm_info.allocated_space(),
1523                    sector_size(single_disk_farm_info.pieces_in_sector()) as u64,
1524                    cache_percentage,
1525                    SectorMetadataChecksummed::encoded_size() as u64,
1526                )?;
1527
1528                effective_disk_usage = single_disk_farm_info.allocated_space();
1529                effective_disk_usage -= Identity::file_size() as u64;
1530                effective_disk_usage -= allocated_space_distribution.metadata_file_size;
1531                effective_disk_usage -= allocated_space_distribution.plot_file_size;
1532                effective_disk_usage -= allocated_space_distribution.piece_cache_file_size;
1533            }
1534            None => {
1535                // No farm info, try to collect actual file sizes is any
1536                effective_disk_usage = 0;
1537            }
1538        };
1539
1540        if Identity::open(directory)?.is_some() {
1541            effective_disk_usage += Identity::file_size() as u64;
1542        }
1543
1544        match OpenOptions::new()
1545            .read(true)
1546            .open(directory.join(Self::METADATA_FILE))
1547        {
1548            Ok(metadata_file) => {
1549                effective_disk_usage += metadata_file.size()?;
1550            }
1551            Err(error) => {
1552                if error.kind() == io::ErrorKind::NotFound {
1553                    // File is not stored on disk
1554                } else {
1555                    return Err(error.into());
1556                }
1557            }
1558        };
1559
1560        match OpenOptions::new()
1561            .read(true)
1562            .open(directory.join(Self::PLOT_FILE))
1563        {
1564            Ok(plot_file) => {
1565                effective_disk_usage += plot_file.size()?;
1566            }
1567            Err(error) => {
1568                if error.kind() == io::ErrorKind::NotFound {
1569                    // File is not stored on disk
1570                } else {
1571                    return Err(error.into());
1572                }
1573            }
1574        };
1575
1576        match OpenOptions::new()
1577            .read(true)
1578            .open(directory.join(DiskPieceCache::FILE_NAME))
1579        {
1580            Ok(piece_cache) => {
1581                effective_disk_usage += piece_cache.size()?;
1582            }
1583            Err(error) => {
1584                if error.kind() == io::ErrorKind::NotFound {
1585                    // File is not stored on disk
1586                } else {
1587                    return Err(error.into());
1588                }
1589            }
1590        };
1591
1592        Ok(effective_disk_usage)
1593    }
1594
1595    /// Read all sectors metadata
1596    pub fn read_all_sectors_metadata(
1597        directory: &Path,
1598    ) -> io::Result<Vec<SectorMetadataChecksummed>> {
1599        let metadata_file = DirectIoFileWrapper::open(directory.join(Self::METADATA_FILE))?;
1600
1601        let metadata_size = metadata_file.size()?;
1602        let sector_metadata_size = SectorMetadataChecksummed::encoded_size();
1603
1604        let mut metadata_header_bytes = vec![0; PlotMetadataHeader::encoded_size()];
1605        metadata_file.read_exact_at(&mut metadata_header_bytes, 0)?;
1606
1607        let metadata_header = PlotMetadataHeader::decode(&mut metadata_header_bytes.as_ref())
1608            .map_err(|error| {
1609                io::Error::other(format!("Failed to decode metadata header: {error}"))
1610            })?;
1611
1612        if metadata_header.version != SingleDiskFarm::SUPPORTED_PLOT_VERSION {
1613            return Err(io::Error::other(format!(
1614                "Unsupported metadata version {}",
1615                metadata_header.version
1616            )));
1617        }
1618
1619        let mut sectors_metadata = Vec::<SectorMetadataChecksummed>::with_capacity(
1620            ((metadata_size - RESERVED_PLOT_METADATA) / sector_metadata_size as u64) as usize,
1621        );
1622
1623        let mut sector_metadata_bytes = vec![0; sector_metadata_size];
1624        for sector_index in 0..metadata_header.plotted_sector_count {
1625            metadata_file.read_exact_at(
1626                &mut sector_metadata_bytes,
1627                RESERVED_PLOT_METADATA + sector_metadata_size as u64 * u64::from(sector_index),
1628            )?;
1629            sectors_metadata.push(
1630                SectorMetadataChecksummed::decode(&mut sector_metadata_bytes.as_ref()).map_err(
1631                    |error| io::Error::other(format!("Failed to decode sector metadata: {error}")),
1632                )?,
1633            );
1634        }
1635
1636        Ok(sectors_metadata)
1637    }
1638
1639    /// ID of this farm
1640    pub fn id(&self) -> &FarmId {
1641        self.single_disk_farm_info.id()
1642    }
1643
1644    /// Info of this farm
1645    pub fn info(&self) -> &SingleDiskFarmInfo {
1646        &self.single_disk_farm_info
1647    }
1648
1649    /// Number of sectors in this farm
1650    pub fn total_sectors_count(&self) -> u16 {
1651        self.total_sectors_count
1652    }
1653
1654    /// Read information about sectors plotted so far
1655    pub fn plotted_sectors(&self) -> SingleDiskPlottedSectors {
1656        SingleDiskPlottedSectors {
1657            public_key_hash: self.single_disk_farm_info.public_key().hash(),
1658            shard_commitments_roots_cache: self.shard_commitments_roots_cache.clone(),
1659            pieces_in_sector: self.pieces_in_sector,
1660            farmer_protocol_info: self.farmer_protocol_info,
1661            sectors_metadata: Arc::clone(&self.sectors_metadata),
1662        }
1663    }
1664
1665    /// Get piece cache instance
1666    pub fn piece_cache(&self) -> SingleDiskPieceCache {
1667        self.piece_cache.clone()
1668    }
1669
1670    /// Get plot cache instance
1671    pub fn plot_cache(&self) -> DiskPlotCache {
1672        self.plot_cache.clone()
1673    }
1674
1675    /// Get piece reader to read plotted pieces later
1676    pub fn piece_reader(&self) -> DiskPieceReader {
1677        self.piece_reader.clone()
1678    }
1679
1680    /// Subscribe to sector updates
1681    pub fn on_sector_update(&self, callback: HandlerFn<(SectorIndex, SectorUpdate)>) -> HandlerId {
1682        self.handlers.sector_update.add(callback)
1683    }
1684
1685    /// Subscribe to farming notifications
1686    pub fn on_farming_notification(&self, callback: HandlerFn<FarmingNotification>) -> HandlerId {
1687        self.handlers.farming_notification.add(callback)
1688    }
1689
1690    /// Subscribe to new solution notification
1691    pub fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> HandlerId {
1692        self.handlers.solution.add(callback)
1693    }
1694
1695    /// Run and wait for background threads to exit or return an error
1696    pub async fn run(mut self) -> anyhow::Result<()> {
1697        if let Some(start_sender) = self.start_sender.take() {
1698            // Do not care if anyone is listening on the other side
1699            let _ = start_sender.send(());
1700        }
1701
1702        while let Some(result) = self.tasks.next().instrument(self.span.clone()).await {
1703            result?;
1704        }
1705
1706        Ok(())
1707    }
1708
1709    /// Wipe everything that belongs to this single disk farm
1710    pub fn wipe(directory: &Path) -> io::Result<()> {
1711        let single_disk_info_info_path = directory.join(SingleDiskFarmInfo::FILE_NAME);
1712        match SingleDiskFarmInfo::load_from(directory) {
1713            Ok(Some(single_disk_farm_info)) => {
1714                info!("Found single disk farm {}", single_disk_farm_info.id());
1715            }
1716            Ok(None) => {
1717                return Err(io::Error::new(
1718                    io::ErrorKind::NotFound,
1719                    format!(
1720                        "Single disk farm info not found at {}",
1721                        single_disk_info_info_path.display()
1722                    ),
1723                ));
1724            }
1725            Err(error) => {
1726                warn!("Found unknown single disk farm: {}", error);
1727            }
1728        }
1729
1730        {
1731            let plot = directory.join(Self::PLOT_FILE);
1732            if plot.exists() {
1733                info!("Deleting plot file at {}", plot.display());
1734                fs::remove_file(plot)?;
1735            }
1736        }
1737        {
1738            let metadata = directory.join(Self::METADATA_FILE);
1739            if metadata.exists() {
1740                info!("Deleting metadata file at {}", metadata.display());
1741                fs::remove_file(metadata)?;
1742            }
1743        }
1744        // TODO: Identity should be able to wipe itself instead of assuming a specific file name
1745        //  here
1746        {
1747            let identity = directory.join("identity.bin");
1748            if identity.exists() {
1749                info!("Deleting identity file at {}", identity.display());
1750                fs::remove_file(identity)?;
1751            }
1752        }
1753
1754        DiskPieceCache::wipe(directory)?;
1755
1756        info!(
1757            "Deleting info file at {}",
1758            single_disk_info_info_path.display()
1759        );
1760        fs::remove_file(single_disk_info_info_path)
1761    }
1762
1763    /// Check the farm for corruption and repair errors (caused by disk errors or something else),
1764    /// returns an error when irrecoverable errors occur.
1765    pub fn scrub(
1766        directory: &Path,
1767        disable_farm_locking: bool,
1768        target: ScrubTarget,
1769        dry_run: bool,
1770    ) -> Result<(), SingleDiskFarmScrubError> {
1771        let span = Span::current();
1772
1773        if dry_run {
1774            info!("Dry run is used, no changes will be written to disk");
1775        }
1776
1777        if target.metadata() || target.plot() {
1778            let info = {
1779                let file = directory.join(SingleDiskFarmInfo::FILE_NAME);
1780                info!(path = %file.display(), "Checking info file");
1781
1782                match SingleDiskFarmInfo::load_from(directory) {
1783                    Ok(Some(info)) => info,
1784                    Ok(None) => {
1785                        return Err(SingleDiskFarmScrubError::FarmInfoFileDoesNotExist { file });
1786                    }
1787                    Err(error) => {
1788                        return Err(SingleDiskFarmScrubError::FarmInfoCantBeOpened { file, error });
1789                    }
1790                }
1791            };
1792
1793            let _single_disk_farm_info_lock = if disable_farm_locking {
1794                None
1795            } else {
1796                Some(
1797                    SingleDiskFarmInfo::try_lock(directory)
1798                        .map_err(SingleDiskFarmScrubError::LikelyAlreadyInUse)?,
1799                )
1800            };
1801
1802            let identity = {
1803                let file = directory.join(Identity::FILE_NAME);
1804                info!(path = %file.display(), "Checking identity file");
1805
1806                match Identity::open(directory) {
1807                    Ok(Some(identity)) => identity,
1808                    Ok(None) => {
1809                        return Err(SingleDiskFarmScrubError::IdentityFileDoesNotExist { file });
1810                    }
1811                    Err(error) => {
1812                        return Err(SingleDiskFarmScrubError::IdentityCantBeOpened { file, error });
1813                    }
1814                }
1815            };
1816
1817            if &identity.public_key() != info.public_key() {
1818                return Err(SingleDiskFarmScrubError::PublicKeyMismatch {
1819                    identity: identity.public_key(),
1820                    info: *info.public_key(),
1821                });
1822            }
1823
1824            let sector_metadata_size = SectorMetadataChecksummed::encoded_size();
1825
1826            let metadata_file_path = directory.join(Self::METADATA_FILE);
1827            let (metadata_file, mut metadata_header) = {
1828                info!(path = %metadata_file_path.display(), "Checking metadata file");
1829
1830                let metadata_file = match OpenOptions::new()
1831                    .read(true)
1832                    .write(!dry_run)
1833                    .open(&metadata_file_path)
1834                {
1835                    Ok(metadata_file) => metadata_file,
1836                    Err(error) => {
1837                        return Err(if error.kind() == io::ErrorKind::NotFound {
1838                            SingleDiskFarmScrubError::MetadataFileDoesNotExist {
1839                                file: metadata_file_path,
1840                            }
1841                        } else {
1842                            SingleDiskFarmScrubError::MetadataCantBeOpened {
1843                                file: metadata_file_path,
1844                                error,
1845                            }
1846                        });
1847                    }
1848                };
1849
1850                // Error doesn't matter here
1851                let _ = metadata_file.advise_sequential_access();
1852
1853                let metadata_size = match metadata_file.size() {
1854                    Ok(metadata_size) => metadata_size,
1855                    Err(error) => {
1856                        return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize {
1857                            file: metadata_file_path,
1858                            error,
1859                        });
1860                    }
1861                };
1862
1863                if metadata_size < RESERVED_PLOT_METADATA {
1864                    return Err(SingleDiskFarmScrubError::MetadataFileTooSmall {
1865                        file: metadata_file_path,
1866                        reserved_size: RESERVED_PLOT_METADATA,
1867                        size: metadata_size,
1868                    });
1869                }
1870
1871                let mut metadata_header = {
1872                    let mut reserved_metadata = vec![0; RESERVED_PLOT_METADATA as usize];
1873
1874                    if let Err(error) = metadata_file.read_exact_at(&mut reserved_metadata, 0) {
1875                        return Err(SingleDiskFarmScrubError::FailedToReadBytes {
1876                            file: metadata_file_path,
1877                            size: RESERVED_PLOT_METADATA,
1878                            offset: 0,
1879                            error,
1880                        });
1881                    }
1882
1883                    PlotMetadataHeader::decode(&mut reserved_metadata.as_slice())
1884                        .map_err(SingleDiskFarmScrubError::FailedToDecodeMetadataHeader)?
1885                };
1886
1887                if metadata_header.version != SingleDiskFarm::SUPPORTED_PLOT_VERSION {
1888                    return Err(SingleDiskFarmScrubError::UnexpectedMetadataVersion(
1889                        metadata_header.version,
1890                    ));
1891                }
1892
1893                let plotted_sector_count = metadata_header.plotted_sector_count;
1894
1895                let expected_metadata_size = RESERVED_PLOT_METADATA
1896                    + sector_metadata_size as u64 * u64::from(plotted_sector_count);
1897
1898                if metadata_size < expected_metadata_size {
1899                    warn!(
1900                        %metadata_size,
1901                        %expected_metadata_size,
1902                        "Metadata file size is smaller than expected, shrinking number of plotted \
1903                        sectors to correct value"
1904                    );
1905
1906                    metadata_header.plotted_sector_count =
1907                        ((metadata_size - RESERVED_PLOT_METADATA) / sector_metadata_size as u64)
1908                            as u16;
1909                    let metadata_header_bytes = metadata_header.encode();
1910
1911                    if !dry_run
1912                        && let Err(error) = metadata_file.write_all_at(&metadata_header_bytes, 0)
1913                    {
1914                        return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
1915                            file: metadata_file_path,
1916                            size: metadata_header_bytes.len() as u64,
1917                            offset: 0,
1918                            error,
1919                        });
1920                    }
1921                }
1922
1923                (metadata_file, metadata_header)
1924            };
1925
1926            let pieces_in_sector = info.pieces_in_sector();
1927            let sector_size = sector_size(pieces_in_sector) as u64;
1928
1929            let plot_file_path = directory.join(Self::PLOT_FILE);
1930            let plot_file = {
1931                let plot_file_path = directory.join(Self::PLOT_FILE);
1932                info!(path = %plot_file_path.display(), "Checking plot file");
1933
1934                let plot_file = match OpenOptions::new()
1935                    .read(true)
1936                    .write(!dry_run)
1937                    .open(&plot_file_path)
1938                {
1939                    Ok(plot_file) => plot_file,
1940                    Err(error) => {
1941                        return Err(if error.kind() == io::ErrorKind::NotFound {
1942                            SingleDiskFarmScrubError::MetadataFileDoesNotExist {
1943                                file: plot_file_path,
1944                            }
1945                        } else {
1946                            SingleDiskFarmScrubError::MetadataCantBeOpened {
1947                                file: plot_file_path,
1948                                error,
1949                            }
1950                        });
1951                    }
1952                };
1953
1954                // Error doesn't matter here
1955                let _ = plot_file.advise_sequential_access();
1956
1957                let plot_size = match plot_file.size() {
1958                    Ok(metadata_size) => metadata_size,
1959                    Err(error) => {
1960                        return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize {
1961                            file: plot_file_path,
1962                            error,
1963                        });
1964                    }
1965                };
1966
1967                let min_expected_plot_size =
1968                    u64::from(metadata_header.plotted_sector_count) * sector_size;
1969                if plot_size < min_expected_plot_size {
1970                    warn!(
1971                        %plot_size,
1972                        %min_expected_plot_size,
1973                        "Plot file size is smaller than expected, shrinking number of plotted \
1974                        sectors to correct value"
1975                    );
1976
1977                    metadata_header.plotted_sector_count = (plot_size / sector_size) as u16;
1978                    let metadata_header_bytes = metadata_header.encode();
1979
1980                    if !dry_run
1981                        && let Err(error) = metadata_file.write_all_at(&metadata_header_bytes, 0)
1982                    {
1983                        return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
1984                            file: plot_file_path,
1985                            size: metadata_header_bytes.len() as u64,
1986                            offset: 0,
1987                            error,
1988                        });
1989                    }
1990                }
1991
1992                plot_file
1993            };
1994
1995            let sector_bytes_range = 0..(sector_size as usize - Blake3Hash::SIZE);
1996
1997            info!("Checking sectors and corresponding metadata");
1998            (0..metadata_header.plotted_sector_count)
1999                .into_par_iter()
2000                .map(SectorIndex::new)
2001                .map_init(
2002                    || vec![0u8; Record::SIZE],
2003                    |scratch_buffer, sector_index| {
2004                        let _span_guard = span.enter();
2005
2006                        let offset = RESERVED_PLOT_METADATA
2007                            + u64::from(sector_index) * sector_metadata_size as u64;
2008                        if let Err(error) = metadata_file
2009                            .read_exact_at(&mut scratch_buffer[..sector_metadata_size], offset)
2010                        {
2011                            warn!(
2012                                path = %metadata_file_path.display(),
2013                                %error,
2014                                %offset,
2015                                size = %sector_metadata_size,
2016                                %sector_index,
2017                                "Failed to read sector metadata, replacing with dummy expired \
2018                                sector metadata"
2019                            );
2020
2021                            if !dry_run {
2022                                write_dummy_sector_metadata(
2023                                    &metadata_file,
2024                                    &metadata_file_path,
2025                                    sector_index,
2026                                    pieces_in_sector,
2027                                )?;
2028                            }
2029                            return Ok(());
2030                        }
2031
2032                        let sector_metadata = match SectorMetadataChecksummed::decode(
2033                            &mut &scratch_buffer[..sector_metadata_size],
2034                        ) {
2035                            Ok(sector_metadata) => sector_metadata,
2036                            Err(error) => {
2037                                warn!(
2038                                    path = %metadata_file_path.display(),
2039                                    %error,
2040                                    %sector_index,
2041                                    "Failed to decode sector metadata, replacing with dummy \
2042                                    expired sector metadata"
2043                                );
2044
2045                                if !dry_run {
2046                                    write_dummy_sector_metadata(
2047                                        &metadata_file,
2048                                        &metadata_file_path,
2049                                        sector_index,
2050                                        pieces_in_sector,
2051                                    )?;
2052                                }
2053                                return Ok(());
2054                            }
2055                        };
2056
2057                        if sector_metadata.sector_index != sector_index {
2058                            warn!(
2059                                path = %metadata_file_path.display(),
2060                                %sector_index,
2061                                found_sector_index = %sector_metadata.sector_index,
2062                                "Sector index mismatch, replacing with dummy expired sector \
2063                                metadata"
2064                            );
2065
2066                            if !dry_run {
2067                                write_dummy_sector_metadata(
2068                                    &metadata_file,
2069                                    &metadata_file_path,
2070                                    sector_index,
2071                                    pieces_in_sector,
2072                                )?;
2073                            }
2074                            return Ok(());
2075                        }
2076
2077                        if sector_metadata.pieces_in_sector != pieces_in_sector {
2078                            warn!(
2079                                path = %metadata_file_path.display(),
2080                                %sector_index,
2081                                %pieces_in_sector,
2082                                found_pieces_in_sector = sector_metadata.pieces_in_sector,
2083                                "Pieces in sector mismatch, replacing with dummy expired sector \
2084                                metadata"
2085                            );
2086
2087                            if !dry_run {
2088                                write_dummy_sector_metadata(
2089                                    &metadata_file,
2090                                    &metadata_file_path,
2091                                    sector_index,
2092                                    pieces_in_sector,
2093                                )?;
2094                            }
2095                            return Ok(());
2096                        }
2097
2098                        if target.plot() {
2099                            let mut hasher = blake3::Hasher::new();
2100                            // Read sector bytes and compute checksum
2101                            for offset_in_sector in
2102                                sector_bytes_range.clone().step_by(scratch_buffer.len())
2103                            {
2104                                let offset =
2105                                    u64::from(sector_index) * sector_size + offset_in_sector as u64;
2106                                let bytes_to_read = (offset_in_sector + scratch_buffer.len())
2107                                    .min(sector_bytes_range.end)
2108                                    - offset_in_sector;
2109
2110                                let bytes = &mut scratch_buffer[..bytes_to_read];
2111
2112                                if let Err(error) = plot_file.read_exact_at(bytes, offset) {
2113                                    warn!(
2114                                        path = %plot_file_path.display(),
2115                                        %error,
2116                                        %sector_index,
2117                                        %offset,
2118                                        size = %bytes.len() as u64,
2119                                        "Failed to read sector bytes"
2120                                    );
2121
2122                                    continue;
2123                                }
2124
2125                                hasher.update(bytes);
2126                            }
2127
2128                            let actual_checksum = *hasher.finalize().as_bytes();
2129                            let mut expected_checksum = [0; Blake3Hash::SIZE];
2130                            {
2131                                let offset = u64::from(sector_index) * sector_size
2132                                    + sector_bytes_range.end as u64;
2133                                if let Err(error) =
2134                                    plot_file.read_exact_at(&mut expected_checksum, offset)
2135                                {
2136                                    warn!(
2137                                        path = %plot_file_path.display(),
2138                                        %error,
2139                                        %sector_index,
2140                                        %offset,
2141                                        size = %expected_checksum.len() as u64,
2142                                        "Failed to read sector checksum bytes"
2143                                    );
2144                                }
2145                            }
2146
2147                            // Verify checksum
2148                            if actual_checksum != expected_checksum {
2149                                warn!(
2150                                    path = %plot_file_path.display(),
2151                                    %sector_index,
2152                                    actual_checksum = %hex::encode(actual_checksum),
2153                                    expected_checksum = %hex::encode(expected_checksum),
2154                                    "Plotted sector checksum mismatch, replacing with dummy \
2155                                    expired sector"
2156                                );
2157
2158                                if !dry_run {
2159                                    write_dummy_sector_metadata(
2160                                        &metadata_file,
2161                                        &metadata_file_path,
2162                                        sector_index,
2163                                        pieces_in_sector,
2164                                    )?;
2165                                }
2166
2167                                scratch_buffer.fill(0);
2168
2169                                hasher.reset();
2170                                // Fill sector with zeroes and compute checksum
2171                                for offset_in_sector in
2172                                    sector_bytes_range.clone().step_by(scratch_buffer.len())
2173                                {
2174                                    let offset = u64::from(sector_index) * sector_size
2175                                        + offset_in_sector as u64;
2176                                    let bytes_to_write = (offset_in_sector + scratch_buffer.len())
2177                                        .min(sector_bytes_range.end)
2178                                        - offset_in_sector;
2179                                    let bytes = &mut scratch_buffer[..bytes_to_write];
2180
2181                                    if !dry_run
2182                                        && let Err(error) = plot_file.write_all_at(bytes, offset)
2183                                    {
2184                                        return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2185                                            file: plot_file_path.clone(),
2186                                            size: scratch_buffer.len() as u64,
2187                                            offset,
2188                                            error,
2189                                        });
2190                                    }
2191
2192                                    hasher.update(bytes);
2193                                }
2194                                // Write checksum
2195                                {
2196                                    let checksum = *hasher.finalize().as_bytes();
2197                                    let offset = u64::from(sector_index) * sector_size
2198                                        + sector_bytes_range.end as u64;
2199                                    if !dry_run
2200                                        && let Err(error) =
2201                                            plot_file.write_all_at(&checksum, offset)
2202                                    {
2203                                        return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2204                                            file: plot_file_path.clone(),
2205                                            size: checksum.len() as u64,
2206                                            offset,
2207                                            error,
2208                                        });
2209                                    }
2210                                }
2211
2212                                return Ok(());
2213                            }
2214                        }
2215
2216                        trace!(%sector_index, "Sector is in good shape");
2217
2218                        Ok(())
2219                    },
2220                )
2221                .try_for_each({
2222                    let span = &span;
2223                    let checked_sectors = AtomicUsize::new(0);
2224
2225                    move |result| {
2226                        let _span_guard = span.enter();
2227
2228                        let checked_sectors = checked_sectors.fetch_add(1, Ordering::Relaxed);
2229                        if checked_sectors > 1 && checked_sectors.is_multiple_of(10) {
2230                            info!(
2231                                "Checked {}/{} sectors",
2232                                checked_sectors, metadata_header.plotted_sector_count
2233                            );
2234                        }
2235
2236                        result
2237                    }
2238                })?;
2239        }
2240
2241        if target.cache() {
2242            Self::scrub_cache(directory, dry_run)?;
2243        }
2244
2245        info!("Farm check completed");
2246
2247        Ok(())
2248    }
2249
2250    fn scrub_cache(directory: &Path, dry_run: bool) -> Result<(), SingleDiskFarmScrubError> {
2251        let span = Span::current();
2252
2253        let file = directory.join(DiskPieceCache::FILE_NAME);
2254        info!(path = %file.display(), "Checking cache file");
2255
2256        let cache_file = match OpenOptions::new().read(true).write(!dry_run).open(&file) {
2257            Ok(plot_file) => plot_file,
2258            Err(error) => {
2259                return if error.kind() == io::ErrorKind::NotFound {
2260                    warn!(
2261                        file = %file.display(),
2262                        "Cache file does not exist, this is expected in farming cluster"
2263                    );
2264                    Ok(())
2265                } else {
2266                    Err(SingleDiskFarmScrubError::CacheCantBeOpened { file, error })
2267                };
2268            }
2269        };
2270
2271        // Error doesn't matter here
2272        let _ = cache_file.advise_sequential_access();
2273
2274        let cache_size = match cache_file.size() {
2275            Ok(cache_size) => cache_size,
2276            Err(error) => {
2277                return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize { file, error });
2278            }
2279        };
2280
2281        let element_size = DiskPieceCache::element_size();
2282        let number_of_cached_elements = cache_size / u64::from(element_size);
2283        let dummy_element = vec![0; element_size as usize];
2284        (0..number_of_cached_elements)
2285            .into_par_iter()
2286            .map_with(vec![0; element_size as usize], |element, cache_offset| {
2287                let _span_guard = span.enter();
2288
2289                let offset = cache_offset * u64::from(element_size);
2290                if let Err(error) = cache_file.read_exact_at(element, offset) {
2291                    warn!(
2292                        path = %file.display(),
2293                        %cache_offset,
2294                        size = %element.len() as u64,
2295                        %offset,
2296                        %error,
2297                        "Failed to read cached piece, replacing with dummy element"
2298                    );
2299
2300                    if !dry_run && let Err(error) = cache_file.write_all_at(&dummy_element, offset)
2301                    {
2302                        return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2303                            file: file.clone(),
2304                            size: u64::from(element_size),
2305                            offset,
2306                            error,
2307                        });
2308                    }
2309
2310                    return Ok(());
2311                }
2312
2313                let (index_and_piece_bytes, expected_checksum) =
2314                    element.split_at(element_size as usize - Blake3Hash::SIZE);
2315                let actual_checksum = *blake3::hash(index_and_piece_bytes).as_bytes();
2316                if actual_checksum != expected_checksum && element != &dummy_element {
2317                    warn!(
2318                        %cache_offset,
2319                        actual_checksum = %hex::encode(actual_checksum),
2320                        expected_checksum = %hex::encode(expected_checksum),
2321                        "Cached piece checksum mismatch, replacing with dummy element"
2322                    );
2323
2324                    if !dry_run && let Err(error) = cache_file.write_all_at(&dummy_element, offset)
2325                    {
2326                        return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2327                            file: file.clone(),
2328                            size: u64::from(element_size),
2329                            offset,
2330                            error,
2331                        });
2332                    }
2333
2334                    return Ok(());
2335                }
2336
2337                Ok(())
2338            })
2339            .try_for_each({
2340                let span = &span;
2341                let checked_elements = AtomicUsize::new(0);
2342
2343                move |result| {
2344                    let _span_guard = span.enter();
2345
2346                    let checked_elements = checked_elements.fetch_add(1, Ordering::Relaxed);
2347                    if checked_elements > 1 && checked_elements.is_multiple_of(1000) {
2348                        info!(
2349                            "Checked {}/{} cache elements",
2350                            checked_elements, number_of_cached_elements
2351                        );
2352                    }
2353
2354                    result
2355                }
2356            })?;
2357
2358        Ok(())
2359    }
2360}
2361
2362fn write_dummy_sector_metadata(
2363    metadata_file: &File,
2364    metadata_file_path: &Path,
2365    sector_index: SectorIndex,
2366    pieces_in_sector: u16,
2367) -> Result<(), SingleDiskFarmScrubError> {
2368    let dummy_sector_bytes = SectorMetadataChecksummed::from(SectorMetadata {
2369        sector_index,
2370        pieces_in_sector,
2371        s_bucket_sizes: Box::new([0; Record::NUM_S_BUCKETS]),
2372        history_size: HistorySize::from(SegmentIndex::ZERO),
2373    })
2374    .encode();
2375    let sector_offset = RESERVED_PLOT_METADATA
2376        + u64::from(sector_index) * SectorMetadataChecksummed::encoded_size() as u64;
2377    metadata_file
2378        .write_all_at(&dummy_sector_bytes, sector_offset)
2379        .map_err(|error| SingleDiskFarmScrubError::FailedToWriteBytes {
2380            file: metadata_file_path.to_path_buf(),
2381            size: dummy_sector_bytes.len() as u64,
2382            offset: sector_offset,
2383            error,
2384        })
2385}