ab_farmer/
single_disk_farm.rs

1//! Primary [`Farm`] implementation that deals with hardware directly
2//!
3//! Single disk farm is an abstraction that contains an identity, associated plot with metadata and
4//! a small piece cache. It fully manages farming and plotting process, including listening to node
5//! notifications, producing solutions and sealing blocks.
6
7mod block_sealing;
8pub mod direct_io_file_wrapper;
9pub mod farming;
10pub mod identity;
11mod metrics;
12pub mod piece_cache;
13pub mod piece_reader;
14pub mod plot_cache;
15mod plotted_sectors;
16mod plotting;
17
18use crate::disk_piece_cache::{DiskPieceCache, DiskPieceCacheError};
19use crate::farm::{
20    Farm, FarmId, FarmingError, FarmingNotification, HandlerFn, PieceCacheId, PieceReader,
21    PlottedSectors, SectorUpdate,
22};
23use crate::node_client::NodeClient;
24use crate::plotter::Plotter;
25use crate::single_disk_farm::block_sealing::block_sealing;
26use crate::single_disk_farm::direct_io_file_wrapper::{DISK_PAGE_SIZE, DirectIoFileWrapper};
27use crate::single_disk_farm::farming::rayon_files::RayonFiles;
28use crate::single_disk_farm::farming::{
29    FarmingOptions, PlotAudit, farming, slot_notification_forwarder,
30};
31use crate::single_disk_farm::identity::{Identity, IdentityError};
32use crate::single_disk_farm::metrics::SingleDiskFarmMetrics;
33use crate::single_disk_farm::piece_cache::SingleDiskPieceCache;
34use crate::single_disk_farm::piece_reader::DiskPieceReader;
35use crate::single_disk_farm::plot_cache::DiskPlotCache;
36use crate::single_disk_farm::plotted_sectors::SingleDiskPlottedSectors;
37pub use crate::single_disk_farm::plotting::PlottingError;
38use crate::single_disk_farm::plotting::{
39    PlottingOptions, PlottingSchedulerOptions, SectorPlottingOptions, plotting, plotting_scheduler,
40};
41use crate::utils::{AsyncJoinOnDrop, tokio_rayon_spawn_handler};
42use crate::{KNOWN_PEERS_CACHE_SIZE, farm};
43use ab_core_primitives::address::Address;
44use ab_core_primitives::block::BlockRoot;
45use ab_core_primitives::ed25519::Ed25519PublicKey;
46use ab_core_primitives::hashes::Blake3Hash;
47use ab_core_primitives::pieces::Record;
48use ab_core_primitives::sectors::SectorIndex;
49use ab_core_primitives::segments::{HistorySize, SegmentIndex};
50use ab_erasure_coding::ErasureCoding;
51use ab_farmer_components::FarmerProtocolInfo;
52use ab_farmer_components::file_ext::FileExt;
53use ab_farmer_components::sector::{SectorMetadata, SectorMetadataChecksummed, sector_size};
54use ab_farmer_rpc_primitives::{FarmerAppInfo, SolutionResponse};
55use ab_networking::KnownPeersManager;
56use ab_proof_of_space::Table;
57use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
58use async_trait::async_trait;
59use bytesize::ByteSize;
60use event_listener_primitives::{Bag, HandlerId};
61use futures::channel::{mpsc, oneshot};
62use futures::stream::FuturesUnordered;
63use futures::{FutureExt, StreamExt, select};
64use parity_scale_codec::{Decode, Encode};
65use parking_lot::Mutex;
66use prometheus_client::registry::Registry;
67use rayon::prelude::*;
68use rayon::{ThreadPoolBuildError, ThreadPoolBuilder};
69use serde::{Deserialize, Serialize};
70use std::collections::HashSet;
71use std::fs::{File, OpenOptions};
72use std::future::Future;
73use std::io::Write;
74use std::num::{NonZeroU32, NonZeroUsize};
75use std::path::{Path, PathBuf};
76use std::pin::Pin;
77use std::str::FromStr;
78use std::sync::Arc;
79use std::sync::atomic::{AtomicUsize, Ordering};
80use std::time::Duration;
81use std::{fmt, fs, io, mem};
82use thiserror::Error;
83use tokio::runtime::Handle;
84use tokio::sync::broadcast;
85use tokio::task;
86use tracing::{Instrument, Span, error, info, trace, warn};
87
88// Refuse to compile on non-64-bit platforms, offsets may fail on those when converting from u64 to
89// usize depending on chain parameters
90const _: () = {
91    assert!(mem::size_of::<usize>() >= mem::size_of::<u64>());
92};
93
94/// Reserve 1M of space for plot metadata (for potential future expansion)
95const RESERVED_PLOT_METADATA: u64 = 1024 * 1024;
96/// Reserve 1M of space for farm info (for potential future expansion)
97const RESERVED_FARM_INFO: u64 = 1024 * 1024;
98const NEW_SEGMENT_PROCESSING_DELAY: Duration = Duration::from_mins(10);
99
100/// Exclusive lock for single disk farm info file, ensuring no concurrent edits by cooperating processes is done
101#[derive(Debug)]
102#[must_use = "Lock file must be kept around or as long as farm is used"]
103pub struct SingleDiskFarmInfoLock {
104    _file: File,
105}
106
107/// Important information about the contents of the `SingleDiskFarm`
108#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
109#[serde(rename_all = "camelCase")]
110pub enum SingleDiskFarmInfo {
111    /// V0 of the info
112    #[serde(rename_all = "camelCase")]
113    V0 {
114        /// ID of the farm
115        id: FarmId,
116        /// Genesis root of the beacon chain used for farm creation
117        genesis_root: BlockRoot,
118        /// Public key of identity used for farm creation
119        public_key: Ed25519PublicKey,
120        /// How many pieces does one sector contain.
121        pieces_in_sector: u16,
122        /// How much space in bytes is allocated for this farm
123        allocated_space: u64,
124    },
125}
126
127impl SingleDiskFarmInfo {
128    const FILE_NAME: &'static str = "single_disk_farm.json";
129
130    /// Create a new instance
131    pub fn new(
132        id: FarmId,
133        genesis_root: BlockRoot,
134        public_key: Ed25519PublicKey,
135        pieces_in_sector: u16,
136        allocated_space: u64,
137    ) -> Self {
138        Self::V0 {
139            id,
140            genesis_root,
141            public_key,
142            pieces_in_sector,
143            allocated_space,
144        }
145    }
146
147    /// Load `SingleDiskFarm` from path is supposed to be stored, `None` means no info file was
148    /// found, happens during first start.
149    pub fn load_from(directory: &Path) -> io::Result<Option<Self>> {
150        let bytes = match fs::read(directory.join(Self::FILE_NAME)) {
151            Ok(bytes) => bytes,
152            Err(error) => {
153                return if error.kind() == io::ErrorKind::NotFound {
154                    Ok(None)
155                } else {
156                    Err(error)
157                };
158            }
159        };
160
161        serde_json::from_slice(&bytes)
162            .map(Some)
163            .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))
164    }
165
166    /// Store `SingleDiskFarm` info to path, so it can be loaded again upon restart.
167    ///
168    /// Can optionally return a lock.
169    pub fn store_to(
170        &self,
171        directory: &Path,
172        lock: bool,
173    ) -> io::Result<Option<SingleDiskFarmInfoLock>> {
174        let mut file = OpenOptions::new()
175            .write(true)
176            .create(true)
177            .truncate(false)
178            .open(directory.join(Self::FILE_NAME))?;
179        if lock {
180            fs4::fs_std::FileExt::try_lock_exclusive(&file)?;
181        }
182        file.set_len(0)?;
183        file.write_all(&serde_json::to_vec(self).expect("Info serialization never fails; qed"))?;
184
185        Ok(lock.then_some(SingleDiskFarmInfoLock { _file: file }))
186    }
187
188    /// Try to acquire exclusive lock on the single disk farm info file, ensuring no concurrent edits by cooperating
189    /// processes is done
190    pub fn try_lock(directory: &Path) -> io::Result<SingleDiskFarmInfoLock> {
191        let file = File::open(directory.join(Self::FILE_NAME))?;
192        fs4::fs_std::FileExt::try_lock_exclusive(&file)?;
193
194        Ok(SingleDiskFarmInfoLock { _file: file })
195    }
196
197    /// ID of the farm
198    pub fn id(&self) -> &FarmId {
199        let Self::V0 { id, .. } = self;
200        id
201    }
202
203    /// Genesis hash of the chain used for farm creation
204    pub fn genesis_root(&self) -> &BlockRoot {
205        let Self::V0 { genesis_root, .. } = self;
206        genesis_root
207    }
208
209    /// Public key of identity used for farm creation
210    pub fn public_key(&self) -> &Ed25519PublicKey {
211        let Self::V0 { public_key, .. } = self;
212        public_key
213    }
214
215    /// How many pieces does one sector contain.
216    pub fn pieces_in_sector(&self) -> u16 {
217        match self {
218            SingleDiskFarmInfo::V0 {
219                pieces_in_sector, ..
220            } => *pieces_in_sector,
221        }
222    }
223
224    /// How much space in bytes is allocated for this farm
225    pub fn allocated_space(&self) -> u64 {
226        match self {
227            SingleDiskFarmInfo::V0 {
228                allocated_space, ..
229            } => *allocated_space,
230        }
231    }
232}
233
234/// Summary of single disk farm for presentational purposes
235#[derive(Debug)]
236pub enum SingleDiskFarmSummary {
237    /// Farm was found and read successfully
238    Found {
239        /// Farm info
240        info: SingleDiskFarmInfo,
241        /// Path to directory where farm is stored.
242        directory: PathBuf,
243    },
244    /// Farm was not found
245    NotFound {
246        /// Path to directory where farm is stored.
247        directory: PathBuf,
248    },
249    /// Failed to open farm
250    Error {
251        /// Path to directory where farm is stored.
252        directory: PathBuf,
253        /// Error itself
254        error: io::Error,
255    },
256}
257
258#[derive(Debug, Encode, Decode)]
259struct PlotMetadataHeader {
260    version: u8,
261    plotted_sector_count: u16,
262}
263
264impl PlotMetadataHeader {
265    #[inline]
266    fn encoded_size() -> usize {
267        let default = PlotMetadataHeader {
268            version: 0,
269            plotted_sector_count: 0,
270        };
271
272        default.encoded_size()
273    }
274}
275
276/// Options used to open single disk farm
277#[derive(Debug)]
278pub struct SingleDiskFarmOptions<'a, NC>
279where
280    NC: Clone,
281{
282    /// Path to directory where farm is stored.
283    pub directory: PathBuf,
284    /// Information necessary for farmer application
285    pub farmer_app_info: FarmerAppInfo,
286    /// The amount of space in bytes that was allocated
287    pub allocated_space: u64,
288    /// How many pieces one sector is supposed to contain (max)
289    pub max_pieces_in_sector: u16,
290    /// RPC client connected to the node
291    pub node_client: NC,
292    /// Address where farming rewards should go
293    pub reward_address: Address,
294    /// Plotter
295    pub plotter: Arc<dyn Plotter + Send + Sync>,
296    /// Erasure coding instance to use.
297    pub erasure_coding: ErasureCoding,
298    /// Percentage of allocated space dedicated for caching purposes
299    pub cache_percentage: u8,
300    /// Thread pool size used for farming (mostly for blocking I/O, but also for some
301    /// compute-intensive operations during proving)
302    pub farming_thread_pool_size: usize,
303    /// Notification for plotter to start, can be used to delay plotting until some initialization
304    /// has happened externally
305    pub plotting_delay: Option<oneshot::Receiver<()>>,
306    /// Global mutex that can restrict concurrency of resource-intensive operations and make sure
307    /// that those operations that are very sensitive (like proving) have all the resources
308    /// available to them for the highest probability of success
309    pub global_mutex: Arc<AsyncMutex<()>>,
310    /// How many sectors a will be plotted concurrently per farm
311    pub max_plotting_sectors_per_farm: NonZeroUsize,
312    /// Disable farm locking, for example if file system doesn't support it
313    pub disable_farm_locking: bool,
314    /// Prometheus registry
315    pub registry: Option<&'a Mutex<&'a mut Registry>>,
316    /// Whether to create a farm if it doesn't yet exist
317    pub create: bool,
318}
319
320/// Errors happening when trying to create/open single disk farm
321#[derive(Debug, Error)]
322pub enum SingleDiskFarmError {
323    /// Failed to open or create identity
324    #[error("Failed to open or create identity: {0}")]
325    FailedToOpenIdentity(#[from] IdentityError),
326    /// Farm is likely already in use, make sure no other farmer is using it
327    #[error("Farm is likely already in use, make sure no other farmer is using it: {0}")]
328    LikelyAlreadyInUse(io::Error),
329    /// I/O error occurred
330    #[error("Single disk farm I/O error: {0}")]
331    Io(#[from] io::Error),
332    /// Failed to spawn task for blocking thread
333    #[error("Failed to spawn task for blocking thread: {0}")]
334    TokioJoinError(#[from] task::JoinError),
335    /// Piece cache error
336    #[error("Piece cache error: {0}")]
337    PieceCacheError(#[from] DiskPieceCacheError),
338    /// Can't preallocate metadata file, probably not enough space on disk
339    #[error("Can't preallocate metadata file, probably not enough space on disk: {0}")]
340    CantPreallocateMetadataFile(io::Error),
341    /// Can't preallocate plot file, probably not enough space on disk
342    #[error("Can't preallocate plot file, probably not enough space on disk: {0}")]
343    CantPreallocatePlotFile(io::Error),
344    /// Wrong chain (genesis hash)
345    #[error(
346        "Genesis hash of farm {id} {wrong_chain} is different from {correct_chain} when farm was \
347        created, it is not possible to use farm on a different chain"
348    )]
349    WrongChain {
350        /// Farm ID
351        id: FarmId,
352        /// Hex-encoded genesis hash during farm creation
353        // TODO: Wrapper type with `Display` impl for genesis hash
354        correct_chain: String,
355        /// Hex-encoded current genesis hash
356        wrong_chain: String,
357    },
358    /// Public key in identity doesn't match metadata
359    #[error(
360        "Public key of farm {id} {wrong_public_key} is different from {correct_public_key} when \
361        farm was created, something went wrong, likely due to manual edits"
362    )]
363    IdentityMismatch {
364        /// Farm ID
365        id: FarmId,
366        /// Public key used during farm creation
367        correct_public_key: Ed25519PublicKey,
368        /// Current public key
369        wrong_public_key: Ed25519PublicKey,
370    },
371    /// Invalid number pieces in sector
372    #[error(
373        "Invalid number pieces in sector: max supported {max_supported}, farm initialized with \
374        {initialized_with}"
375    )]
376    InvalidPiecesInSector {
377        /// Farm ID
378        id: FarmId,
379        /// Max supported pieces in sector
380        max_supported: u16,
381        /// Number of pieces in sector farm is initialized with
382        initialized_with: u16,
383    },
384    /// Failed to decode metadata header
385    #[error("Failed to decode metadata header: {0}")]
386    FailedToDecodeMetadataHeader(parity_scale_codec::Error),
387    /// Unexpected metadata version
388    #[error("Unexpected metadata version {0}")]
389    UnexpectedMetadataVersion(u8),
390    /// Allocated space is not enough for one sector
391    #[error(
392        "Allocated space is not enough for one sector. \
393        The lowest acceptable value for allocated space is {min_space} bytes, \
394        provided {allocated_space} bytes."
395    )]
396    InsufficientAllocatedSpace {
397        /// Minimal allocated space
398        min_space: u64,
399        /// Current allocated space
400        allocated_space: u64,
401    },
402    /// Farm is too large
403    #[error(
404        "Farm is too large: allocated {allocated_sectors} sectors ({allocated_space} bytes), max \
405        supported is {max_sectors} ({max_space} bytes). Consider creating multiple smaller farms \
406        instead."
407    )]
408    FarmTooLarge {
409        /// Allocated space
410        allocated_space: u64,
411        /// Allocated space in sectors
412        allocated_sectors: u64,
413        /// Max supported allocated space
414        max_space: u64,
415        /// Max supported allocated space in sectors
416        max_sectors: u16,
417    },
418    /// Failed to create thread pool
419    #[error("Failed to create thread pool: {0}")]
420    FailedToCreateThreadPool(ThreadPoolBuildError),
421}
422
423/// Errors happening during scrubbing
424#[derive(Debug, Error)]
425pub enum SingleDiskFarmScrubError {
426    /// Farm is likely already in use, make sure no other farmer is using it
427    #[error("Farm is likely already in use, make sure no other farmer is using it: {0}")]
428    LikelyAlreadyInUse(io::Error),
429    /// Failed to determine file size
430    #[error("Failed to file size of {file}: {error}")]
431    FailedToDetermineFileSize {
432        /// Affected file
433        file: PathBuf,
434        /// Low-level error
435        error: io::Error,
436    },
437    /// Failed to read bytes from file
438    #[error("Failed to read {size} bytes from {file} at offset {offset}: {error}")]
439    FailedToReadBytes {
440        /// Affected file
441        file: PathBuf,
442        /// Number of bytes to read
443        size: u64,
444        /// Offset in the file
445        offset: u64,
446        /// Low-level error
447        error: io::Error,
448    },
449    /// Failed to write bytes from file
450    #[error("Failed to write {size} bytes from {file} at offset {offset}: {error}")]
451    FailedToWriteBytes {
452        /// Affected file
453        file: PathBuf,
454        /// Number of bytes to read
455        size: u64,
456        /// Offset in the file
457        offset: u64,
458        /// Low-level error
459        error: io::Error,
460    },
461    /// Farm info file does not exist
462    #[error("Farm info file does not exist at {file}")]
463    FarmInfoFileDoesNotExist {
464        /// Info file
465        file: PathBuf,
466    },
467    /// Farm info can't be opened
468    #[error("Farm info at {file} can't be opened: {error}")]
469    FarmInfoCantBeOpened {
470        /// Info file
471        file: PathBuf,
472        /// Low-level error
473        error: io::Error,
474    },
475    /// Identity file does not exist
476    #[error("Identity file does not exist at {file}")]
477    IdentityFileDoesNotExist {
478        /// Identity file
479        file: PathBuf,
480    },
481    /// Identity can't be opened
482    #[error("Identity at {file} can't be opened: {error}")]
483    IdentityCantBeOpened {
484        /// Identity file
485        file: PathBuf,
486        /// Low-level error
487        error: IdentityError,
488    },
489    /// Identity public key doesn't match public key in the disk farm info
490    #[error("Identity public key {identity} doesn't match public key in the disk farm info {info}")]
491    PublicKeyMismatch {
492        /// Identity public key
493        identity: Ed25519PublicKey,
494        /// Disk farm info public key
495        info: Ed25519PublicKey,
496    },
497    /// Metadata file does not exist
498    #[error("Metadata file does not exist at {file}")]
499    MetadataFileDoesNotExist {
500        /// Metadata file
501        file: PathBuf,
502    },
503    /// Metadata can't be opened
504    #[error("Metadata at {file} can't be opened: {error}")]
505    MetadataCantBeOpened {
506        /// Metadata file
507        file: PathBuf,
508        /// Low-level error
509        error: io::Error,
510    },
511    /// Metadata file too small
512    #[error(
513        "Metadata file at {file} is too small: reserved size is {reserved_size} bytes, file size \
514        is {size}"
515    )]
516    MetadataFileTooSmall {
517        /// Metadata file
518        file: PathBuf,
519        /// Reserved size
520        reserved_size: u64,
521        /// File size
522        size: u64,
523    },
524    /// Failed to decode metadata header
525    #[error("Failed to decode metadata header: {0}")]
526    FailedToDecodeMetadataHeader(parity_scale_codec::Error),
527    /// Unexpected metadata version
528    #[error("Unexpected metadata version {0}")]
529    UnexpectedMetadataVersion(u8),
530    /// Cache can't be opened
531    #[error("Cache at {file} can't be opened: {error}")]
532    CacheCantBeOpened {
533        /// Cache file
534        file: PathBuf,
535        /// Low-level error
536        error: io::Error,
537    },
538}
539
540/// Errors that happen in background tasks
541#[derive(Debug, Error)]
542pub enum BackgroundTaskError {
543    /// Plotting error
544    #[error(transparent)]
545    Plotting(#[from] PlottingError),
546    /// Farming error
547    #[error(transparent)]
548    Farming(#[from] FarmingError),
549    /// Block sealing
550    #[error(transparent)]
551    BlockSealing(#[from] anyhow::Error),
552    /// Background task panicked
553    #[error("Background task {task} panicked")]
554    BackgroundTaskPanicked {
555        /// Name of the task
556        task: String,
557    },
558}
559
560type BackgroundTask = Pin<Box<dyn Future<Output = Result<(), BackgroundTaskError>> + Send>>;
561
562/// Scrub target
563#[derive(Debug, Copy, Clone)]
564pub enum ScrubTarget {
565    /// Scrub everything
566    All,
567    /// Scrub just metadata
568    Metadata,
569    /// Scrub metadata and corresponding plot
570    Plot,
571    /// Only scrub cache
572    Cache,
573}
574
575impl fmt::Display for ScrubTarget {
576    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
577        match self {
578            Self::All => f.write_str("all"),
579            Self::Metadata => f.write_str("metadata"),
580            Self::Plot => f.write_str("plot"),
581            Self::Cache => f.write_str("cache"),
582        }
583    }
584}
585
586impl FromStr for ScrubTarget {
587    type Err = String;
588
589    fn from_str(s: &str) -> Result<Self, Self::Err> {
590        match s {
591            "all" => Ok(Self::All),
592            "metadata" => Ok(Self::Metadata),
593            "plot" => Ok(Self::Plot),
594            "cache" => Ok(Self::Cache),
595            s => Err(format!("Can't parse {s} as `ScrubTarget`")),
596        }
597    }
598}
599
600impl ScrubTarget {
601    fn metadata(&self) -> bool {
602        match self {
603            Self::All | Self::Metadata | Self::Plot => true,
604            Self::Cache => false,
605        }
606    }
607
608    fn plot(&self) -> bool {
609        match self {
610            Self::All | Self::Plot => true,
611            Self::Metadata | Self::Cache => false,
612        }
613    }
614
615    fn cache(&self) -> bool {
616        match self {
617            Self::All | Self::Cache => true,
618            Self::Metadata | Self::Plot => false,
619        }
620    }
621}
622
623struct AllocatedSpaceDistribution {
624    piece_cache_file_size: u64,
625    piece_cache_capacity: u32,
626    plot_file_size: u64,
627    target_sector_count: u16,
628    metadata_file_size: u64,
629}
630
631impl AllocatedSpaceDistribution {
632    fn new(
633        allocated_space: u64,
634        sector_size: u64,
635        cache_percentage: u8,
636        sector_metadata_size: u64,
637    ) -> Result<Self, SingleDiskFarmError> {
638        let single_sector_overhead = sector_size + sector_metadata_size;
639        // Fixed space usage regardless of plot size
640        let fixed_space_usage = RESERVED_PLOT_METADATA
641            + RESERVED_FARM_INFO
642            + Identity::file_size() as u64
643            + KnownPeersManager::file_size(KNOWN_PEERS_CACHE_SIZE) as u64;
644        // Calculate how many sectors can fit
645        let target_sector_count = {
646            let potentially_plottable_space = allocated_space.saturating_sub(fixed_space_usage)
647                / 100
648                * (100 - u64::from(cache_percentage));
649            // Do the rounding to make sure we have exactly as much space as fits whole number of
650            // sectors, account for disk sector size just in case
651            (potentially_plottable_space - DISK_PAGE_SIZE as u64) / single_sector_overhead
652        };
653
654        if target_sector_count == 0 {
655            let mut single_plot_with_cache_space =
656                single_sector_overhead.div_ceil(100 - u64::from(cache_percentage)) * 100;
657            // Cache must not be empty, ensure it contains at least one element even if
658            // percentage-wise it will use more space
659            if single_plot_with_cache_space - single_sector_overhead
660                < DiskPieceCache::element_size() as u64
661            {
662                single_plot_with_cache_space =
663                    single_sector_overhead + DiskPieceCache::element_size() as u64;
664            }
665
666            return Err(SingleDiskFarmError::InsufficientAllocatedSpace {
667                min_space: fixed_space_usage + single_plot_with_cache_space,
668                allocated_space,
669            });
670        }
671        let plot_file_size = target_sector_count * sector_size;
672        // Align plot file size for disk sector size
673        let plot_file_size = plot_file_size.div_ceil(DISK_PAGE_SIZE as u64) * DISK_PAGE_SIZE as u64;
674
675        // Remaining space will be used for caching purposes
676        let piece_cache_capacity = if cache_percentage > 0 {
677            let cache_space = allocated_space
678                - fixed_space_usage
679                - plot_file_size
680                - (sector_metadata_size * target_sector_count);
681            (cache_space / u64::from(DiskPieceCache::element_size())) as u32
682        } else {
683            0
684        };
685        let target_sector_count = match u16::try_from(target_sector_count).map(SectorIndex::from) {
686            Ok(target_sector_count) if target_sector_count < SectorIndex::MAX => {
687                u16::from(target_sector_count)
688            }
689            _ => {
690                // We use `u16` for both count and index, hence index must not reach actual `MAX`
691                // (consensus doesn't care about this, just farmer implementation detail)
692                let max_sectors = u16::from(SectorIndex::MAX) - 1;
693                return Err(SingleDiskFarmError::FarmTooLarge {
694                    allocated_space: target_sector_count * sector_size,
695                    allocated_sectors: target_sector_count,
696                    max_space: max_sectors as u64 * sector_size,
697                    max_sectors,
698                });
699            }
700        };
701
702        Ok(Self {
703            piece_cache_file_size: u64::from(piece_cache_capacity)
704                * u64::from(DiskPieceCache::element_size()),
705            piece_cache_capacity,
706            plot_file_size,
707            target_sector_count,
708            metadata_file_size: RESERVED_PLOT_METADATA
709                + sector_metadata_size * u64::from(target_sector_count),
710        })
711    }
712}
713
714type Handler<A> = Bag<HandlerFn<A>, A>;
715
716#[derive(Default, Debug)]
717struct Handlers {
718    sector_update: Handler<(SectorIndex, SectorUpdate)>,
719    farming_notification: Handler<FarmingNotification>,
720    solution: Handler<SolutionResponse>,
721}
722
723struct SingleDiskFarmInit {
724    identity: Identity,
725    single_disk_farm_info: SingleDiskFarmInfo,
726    single_disk_farm_info_lock: Option<SingleDiskFarmInfoLock>,
727    plot_file: Arc<DirectIoFileWrapper>,
728    metadata_file: DirectIoFileWrapper,
729    metadata_header: PlotMetadataHeader,
730    target_sector_count: u16,
731    sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
732    piece_cache_capacity: u32,
733    plot_cache: DiskPlotCache,
734}
735
736/// Single disk farm abstraction is a container for everything necessary to plot/farm with a single
737/// disk.
738///
739/// Farm starts operating during creation and doesn't stop until dropped (or error happens).
740#[derive(Debug)]
741#[must_use = "Plot does not function properly unless run() method is called"]
742pub struct SingleDiskFarm {
743    farmer_protocol_info: FarmerProtocolInfo,
744    single_disk_farm_info: SingleDiskFarmInfo,
745    /// Metadata of all sectors plotted so far
746    sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
747    pieces_in_sector: u16,
748    total_sectors_count: u16,
749    span: Span,
750    tasks: FuturesUnordered<BackgroundTask>,
751    handlers: Arc<Handlers>,
752    piece_cache: SingleDiskPieceCache,
753    plot_cache: DiskPlotCache,
754    piece_reader: DiskPieceReader,
755    /// Sender that will be used to signal to background threads that they should start
756    start_sender: Option<broadcast::Sender<()>>,
757    /// Sender that will be used to signal to background threads that they must stop
758    stop_sender: Option<broadcast::Sender<()>>,
759    _single_disk_farm_info_lock: Option<SingleDiskFarmInfoLock>,
760}
761
762impl Drop for SingleDiskFarm {
763    #[inline]
764    fn drop(&mut self) {
765        self.piece_reader.close_all_readers();
766        // Make background threads that are waiting to do something exit immediately
767        self.start_sender.take();
768        // Notify background tasks that they must stop
769        self.stop_sender.take();
770    }
771}
772
773#[async_trait(?Send)]
774impl Farm for SingleDiskFarm {
775    fn id(&self) -> &FarmId {
776        self.id()
777    }
778
779    fn total_sectors_count(&self) -> u16 {
780        self.total_sectors_count
781    }
782
783    fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static> {
784        Arc::new(self.plotted_sectors())
785    }
786
787    fn piece_reader(&self) -> Arc<dyn PieceReader + 'static> {
788        Arc::new(self.piece_reader())
789    }
790
791    fn on_sector_update(
792        &self,
793        callback: HandlerFn<(SectorIndex, SectorUpdate)>,
794    ) -> Box<dyn farm::HandlerId> {
795        Box::new(self.on_sector_update(callback))
796    }
797
798    fn on_farming_notification(
799        &self,
800        callback: HandlerFn<FarmingNotification>,
801    ) -> Box<dyn farm::HandlerId> {
802        Box::new(self.on_farming_notification(callback))
803    }
804
805    fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn farm::HandlerId> {
806        Box::new(self.on_solution(callback))
807    }
808
809    fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
810        Box::pin((*self).run())
811    }
812}
813
814impl SingleDiskFarm {
815    /// Name of the plot file
816    pub const PLOT_FILE: &'static str = "plot.bin";
817    /// Name of the metadata file
818    pub const METADATA_FILE: &'static str = "metadata.bin";
819    const SUPPORTED_PLOT_VERSION: u8 = 0;
820
821    /// Create new single disk farm instance
822    pub async fn new<NC, PosTable>(
823        options: SingleDiskFarmOptions<'_, NC>,
824        farm_index: usize,
825    ) -> Result<Self, SingleDiskFarmError>
826    where
827        NC: NodeClient + Clone,
828        PosTable: Table,
829    {
830        let span = Span::current();
831
832        let SingleDiskFarmOptions {
833            directory,
834            farmer_app_info,
835            allocated_space,
836            max_pieces_in_sector,
837            node_client,
838            reward_address,
839            plotter,
840            erasure_coding,
841            cache_percentage,
842            farming_thread_pool_size,
843            plotting_delay,
844            global_mutex,
845            max_plotting_sectors_per_farm,
846            disable_farm_locking,
847            registry,
848            create,
849        } = options;
850
851        let single_disk_farm_init_fut = task::spawn_blocking({
852            let directory = directory.clone();
853            let farmer_app_info = farmer_app_info.clone();
854            let span = span.clone();
855
856            move || {
857                let _span_guard = span.enter();
858                Self::init(
859                    &directory,
860                    &farmer_app_info,
861                    allocated_space,
862                    max_pieces_in_sector,
863                    cache_percentage,
864                    disable_farm_locking,
865                    create,
866                )
867            }
868        });
869
870        let single_disk_farm_init =
871            AsyncJoinOnDrop::new(single_disk_farm_init_fut, false).await??;
872
873        let SingleDiskFarmInit {
874            identity,
875            single_disk_farm_info,
876            single_disk_farm_info_lock,
877            plot_file,
878            metadata_file,
879            metadata_header,
880            target_sector_count,
881            sectors_metadata,
882            piece_cache_capacity,
883            plot_cache,
884        } = single_disk_farm_init;
885
886        let piece_cache = {
887            // Convert farm ID into cache ID for single disk farm
888            let FarmId::Ulid(id) = *single_disk_farm_info.id();
889            let id = PieceCacheId::Ulid(id);
890
891            SingleDiskPieceCache::new(
892                id,
893                if let Some(piece_cache_capacity) = NonZeroU32::new(piece_cache_capacity) {
894                    Some(task::block_in_place(|| {
895                        if let Some(registry) = registry {
896                            DiskPieceCache::open(
897                                &directory,
898                                piece_cache_capacity,
899                                Some(id),
900                                Some(*registry.lock()),
901                            )
902                        } else {
903                            DiskPieceCache::open(&directory, piece_cache_capacity, Some(id), None)
904                        }
905                    })?)
906                } else {
907                    None
908                },
909            )
910        };
911
912        let public_key = *single_disk_farm_info.public_key();
913        let pieces_in_sector = single_disk_farm_info.pieces_in_sector();
914        let sector_size = sector_size(pieces_in_sector);
915
916        let metrics = registry.map(|registry| {
917            Arc::new(SingleDiskFarmMetrics::new(
918                *registry.lock(),
919                single_disk_farm_info.id(),
920                target_sector_count,
921                sectors_metadata.read_blocking().len() as u16,
922            ))
923        });
924
925        let (error_sender, error_receiver) = oneshot::channel();
926        let error_sender = Arc::new(Mutex::new(Some(error_sender)));
927
928        let tasks = FuturesUnordered::<BackgroundTask>::new();
929
930        tasks.push(Box::pin(async move {
931            if let Ok(error) = error_receiver.await {
932                return Err(error);
933            }
934
935            Ok(())
936        }));
937
938        let handlers = Arc::<Handlers>::default();
939        let (start_sender, mut start_receiver) = broadcast::channel::<()>(1);
940        let (stop_sender, mut stop_receiver) = broadcast::channel::<()>(1);
941        let sectors_being_modified = Arc::<AsyncRwLock<HashSet<SectorIndex>>>::default();
942        let (sectors_to_plot_sender, sectors_to_plot_receiver) = mpsc::channel(1);
943        // Some sectors may already be plotted, skip them
944        let sectors_indices_left_to_plot = SectorIndex::new(metadata_header.plotted_sector_count)
945            ..SectorIndex::new(target_sector_count);
946
947        let farming_thread_pool = ThreadPoolBuilder::new()
948            .thread_name(move |thread_index| format!("farming-{farm_index}.{thread_index}"))
949            .num_threads(farming_thread_pool_size)
950            .spawn_handler(tokio_rayon_spawn_handler())
951            .build()
952            .map_err(SingleDiskFarmError::FailedToCreateThreadPool)?;
953        let farming_plot_fut = task::spawn_blocking(|| {
954            farming_thread_pool
955                .install(move || {
956                    RayonFiles::open_with(directory.join(Self::PLOT_FILE), |path| {
957                        DirectIoFileWrapper::open(path)
958                    })
959                })
960                .map(|farming_plot| (farming_plot, farming_thread_pool))
961        });
962
963        let (farming_plot, farming_thread_pool) =
964            AsyncJoinOnDrop::new(farming_plot_fut, false).await??;
965
966        let plotting_join_handle = task::spawn_blocking({
967            let sectors_metadata = Arc::clone(&sectors_metadata);
968            let handlers = Arc::clone(&handlers);
969            let sectors_being_modified = Arc::clone(&sectors_being_modified);
970            let node_client = node_client.clone();
971            let plot_file = Arc::clone(&plot_file);
972            let error_sender = Arc::clone(&error_sender);
973            let span = span.clone();
974            let global_mutex = Arc::clone(&global_mutex);
975            let metrics = metrics.clone();
976
977            move || {
978                let _span_guard = span.enter();
979
980                let plotting_options = PlottingOptions {
981                    metadata_header,
982                    sectors_metadata: &sectors_metadata,
983                    sectors_being_modified: &sectors_being_modified,
984                    sectors_to_plot_receiver,
985                    sector_plotting_options: SectorPlottingOptions {
986                        public_key,
987                        node_client: &node_client,
988                        pieces_in_sector,
989                        sector_size,
990                        plot_file,
991                        metadata_file: Arc::new(metadata_file),
992                        handlers: &handlers,
993                        global_mutex: &global_mutex,
994                        plotter,
995                        metrics,
996                    },
997                    max_plotting_sectors_per_farm,
998                };
999
1000                let plotting_fut = async {
1001                    if start_receiver.recv().await.is_err() {
1002                        // Dropped before starting
1003                        return Ok(());
1004                    }
1005
1006                    if let Some(plotting_delay) = plotting_delay
1007                        && plotting_delay.await.is_err()
1008                    {
1009                        // Dropped before resolving
1010                        return Ok(());
1011                    }
1012
1013                    plotting(plotting_options).await
1014                };
1015
1016                Handle::current().block_on(async {
1017                    select! {
1018                        plotting_result = plotting_fut.fuse() => {
1019                            if let Err(error) = plotting_result
1020                                && let Some(error_sender) = error_sender.lock().take()
1021                                && let Err(error) = error_sender.send(error.into())
1022                            {
1023                                error!(
1024                                    %error,
1025                                    "Plotting failed to send error to background task"
1026                                );
1027                            }
1028                        }
1029                        _ = stop_receiver.recv().fuse() => {
1030                            // Nothing, just exit
1031                        }
1032                    }
1033                });
1034            }
1035        });
1036        let plotting_join_handle = AsyncJoinOnDrop::new(plotting_join_handle, false);
1037
1038        tasks.push(Box::pin(async move {
1039            // Panic will already be printed by now
1040            plotting_join_handle.await.map_err(|_error| {
1041                BackgroundTaskError::BackgroundTaskPanicked {
1042                    task: format!("plotting-{farm_index}"),
1043                }
1044            })
1045        }));
1046
1047        let public_key_hash = public_key.hash();
1048
1049        let plotting_scheduler_options = PlottingSchedulerOptions {
1050            public_key_hash,
1051            sectors_indices_left_to_plot,
1052            target_sector_count,
1053            last_archived_segment_index: farmer_app_info.protocol_info.history_size.segment_index(),
1054            min_sector_lifetime: farmer_app_info.protocol_info.min_sector_lifetime,
1055            node_client: node_client.clone(),
1056            handlers: Arc::clone(&handlers),
1057            sectors_metadata: Arc::clone(&sectors_metadata),
1058            sectors_to_plot_sender,
1059            new_segment_processing_delay: NEW_SEGMENT_PROCESSING_DELAY,
1060            metrics: metrics.clone(),
1061        };
1062        tasks.push(Box::pin(plotting_scheduler(plotting_scheduler_options)));
1063
1064        let (slot_info_forwarder_sender, slot_info_forwarder_receiver) = mpsc::channel(0);
1065
1066        tasks.push(Box::pin({
1067            let node_client = node_client.clone();
1068            let metrics = metrics.clone();
1069
1070            async move {
1071                slot_notification_forwarder(&node_client, slot_info_forwarder_sender, metrics)
1072                    .await
1073                    .map_err(BackgroundTaskError::Farming)
1074            }
1075        }));
1076
1077        let farming_join_handle = task::spawn_blocking({
1078            let erasure_coding = erasure_coding.clone();
1079            let handlers = Arc::clone(&handlers);
1080            let sectors_being_modified = Arc::clone(&sectors_being_modified);
1081            let sectors_metadata = Arc::clone(&sectors_metadata);
1082            let mut start_receiver = start_sender.subscribe();
1083            let mut stop_receiver = stop_sender.subscribe();
1084            let node_client = node_client.clone();
1085            let span = span.clone();
1086            let global_mutex = Arc::clone(&global_mutex);
1087
1088            move || {
1089                let _span_guard = span.enter();
1090
1091                let farming_fut = async move {
1092                    if start_receiver.recv().await.is_err() {
1093                        // Dropped before starting
1094                        return Ok(());
1095                    }
1096
1097                    let plot_audit = PlotAudit::new(&farming_plot);
1098
1099                    let farming_options = FarmingOptions {
1100                        public_key_hash,
1101                        reward_address,
1102                        node_client,
1103                        plot_audit,
1104                        sectors_metadata,
1105                        erasure_coding,
1106                        handlers,
1107                        sectors_being_modified,
1108                        slot_info_notifications: slot_info_forwarder_receiver,
1109                        thread_pool: farming_thread_pool,
1110                        global_mutex,
1111                        metrics,
1112                    };
1113                    farming::<PosTable, _, _>(farming_options).await
1114                };
1115
1116                Handle::current().block_on(async {
1117                    select! {
1118                        farming_result = farming_fut.fuse() => {
1119                            if let Err(error) = farming_result
1120                                && let Some(error_sender) = error_sender.lock().take()
1121                                && let Err(error) = error_sender.send(error.into())
1122                            {
1123                                error!(
1124                                    %error,
1125                                    "Farming failed to send error to background task",
1126                                );
1127                            }
1128                        }
1129                        _ = stop_receiver.recv().fuse() => {
1130                            // Nothing, just exit
1131                        }
1132                    }
1133                });
1134            }
1135        });
1136        let farming_join_handle = AsyncJoinOnDrop::new(farming_join_handle, false);
1137
1138        tasks.push(Box::pin(async move {
1139            // Panic will already be printed by now
1140            farming_join_handle.await.map_err(|_error| {
1141                BackgroundTaskError::BackgroundTaskPanicked {
1142                    task: format!("farming-{farm_index}"),
1143                }
1144            })
1145        }));
1146
1147        let (piece_reader, reading_fut) = DiskPieceReader::new::<PosTable>(
1148            public_key_hash,
1149            pieces_in_sector,
1150            plot_file,
1151            Arc::clone(&sectors_metadata),
1152            erasure_coding,
1153            sectors_being_modified,
1154            global_mutex,
1155        );
1156
1157        let reading_join_handle = task::spawn_blocking({
1158            let mut stop_receiver = stop_sender.subscribe();
1159            let reading_fut = reading_fut.instrument(span.clone());
1160
1161            move || {
1162                Handle::current().block_on(async {
1163                    select! {
1164                        _ = reading_fut.fuse() => {
1165                            // Nothing, just exit
1166                        }
1167                        _ = stop_receiver.recv().fuse() => {
1168                            // Nothing, just exit
1169                        }
1170                    }
1171                });
1172            }
1173        });
1174
1175        let reading_join_handle = AsyncJoinOnDrop::new(reading_join_handle, false);
1176
1177        tasks.push(Box::pin(async move {
1178            // Panic will already be printed by now
1179            reading_join_handle.await.map_err(|_error| {
1180                BackgroundTaskError::BackgroundTaskPanicked {
1181                    task: format!("reading-{farm_index}"),
1182                }
1183            })
1184        }));
1185
1186        tasks.push(Box::pin(async move {
1187            match block_sealing(node_client, identity).await {
1188                Ok(block_sealing_fut) => {
1189                    block_sealing_fut.await;
1190                }
1191                Err(error) => {
1192                    return Err(BackgroundTaskError::BlockSealing(anyhow::anyhow!(
1193                        "Failed to subscribe to block sealing notifications: {error}"
1194                    )));
1195                }
1196            }
1197
1198            Ok(())
1199        }));
1200
1201        let farm = Self {
1202            farmer_protocol_info: farmer_app_info.protocol_info,
1203            single_disk_farm_info,
1204            sectors_metadata,
1205            pieces_in_sector,
1206            total_sectors_count: target_sector_count,
1207            span,
1208            tasks,
1209            handlers,
1210            piece_cache,
1211            plot_cache,
1212            piece_reader,
1213            start_sender: Some(start_sender),
1214            stop_sender: Some(stop_sender),
1215            _single_disk_farm_info_lock: single_disk_farm_info_lock,
1216        };
1217        Ok(farm)
1218    }
1219
1220    fn init(
1221        directory: &PathBuf,
1222        farmer_app_info: &FarmerAppInfo,
1223        allocated_space: u64,
1224        max_pieces_in_sector: u16,
1225        cache_percentage: u8,
1226        disable_farm_locking: bool,
1227        create: bool,
1228    ) -> Result<SingleDiskFarmInit, SingleDiskFarmError> {
1229        fs::create_dir_all(directory)?;
1230
1231        let identity = if create {
1232            Identity::open_or_create(directory)?
1233        } else {
1234            Identity::open(directory)?.ok_or_else(|| {
1235                IdentityError::Io(io::Error::new(
1236                    io::ErrorKind::NotFound,
1237                    "Farm does not exist and creation was explicitly disabled",
1238                ))
1239            })?
1240        };
1241        let public_key = identity.public_key();
1242
1243        let (single_disk_farm_info, single_disk_farm_info_lock) =
1244            match SingleDiskFarmInfo::load_from(directory)? {
1245                Some(mut single_disk_farm_info) => {
1246                    if &farmer_app_info.genesis_root != single_disk_farm_info.genesis_root() {
1247                        return Err(SingleDiskFarmError::WrongChain {
1248                            id: *single_disk_farm_info.id(),
1249                            correct_chain: hex::encode(single_disk_farm_info.genesis_root()),
1250                            wrong_chain: hex::encode(farmer_app_info.genesis_root),
1251                        });
1252                    }
1253
1254                    if &public_key != single_disk_farm_info.public_key() {
1255                        return Err(SingleDiskFarmError::IdentityMismatch {
1256                            id: *single_disk_farm_info.id(),
1257                            correct_public_key: *single_disk_farm_info.public_key(),
1258                            wrong_public_key: public_key,
1259                        });
1260                    }
1261
1262                    let pieces_in_sector = single_disk_farm_info.pieces_in_sector();
1263
1264                    if max_pieces_in_sector < pieces_in_sector {
1265                        return Err(SingleDiskFarmError::InvalidPiecesInSector {
1266                            id: *single_disk_farm_info.id(),
1267                            max_supported: max_pieces_in_sector,
1268                            initialized_with: pieces_in_sector,
1269                        });
1270                    }
1271
1272                    if max_pieces_in_sector > pieces_in_sector {
1273                        info!(
1274                            pieces_in_sector,
1275                            max_pieces_in_sector,
1276                            "Farm initialized with smaller number of pieces in sector, farm needs \
1277                            to be re-created for increase"
1278                        );
1279                    }
1280
1281                    let mut single_disk_farm_info_lock = None;
1282
1283                    if allocated_space != single_disk_farm_info.allocated_space() {
1284                        info!(
1285                            old_space = %ByteSize::b(single_disk_farm_info.allocated_space()).display().iec(),
1286                            new_space = %ByteSize::b(allocated_space).display().iec(),
1287                            "Farm size has changed"
1288                        );
1289
1290                        let new_allocated_space = allocated_space;
1291                        match &mut single_disk_farm_info {
1292                            SingleDiskFarmInfo::V0 {
1293                                allocated_space, ..
1294                            } => {
1295                                *allocated_space = new_allocated_space;
1296                            }
1297                        }
1298
1299                        single_disk_farm_info_lock =
1300                            single_disk_farm_info.store_to(directory, !disable_farm_locking)?;
1301                    } else if !disable_farm_locking {
1302                        single_disk_farm_info_lock = Some(
1303                            SingleDiskFarmInfo::try_lock(directory)
1304                                .map_err(SingleDiskFarmError::LikelyAlreadyInUse)?,
1305                        );
1306                    }
1307
1308                    (single_disk_farm_info, single_disk_farm_info_lock)
1309                }
1310                None => {
1311                    let single_disk_farm_info = SingleDiskFarmInfo::new(
1312                        FarmId::new(),
1313                        farmer_app_info.genesis_root,
1314                        public_key,
1315                        max_pieces_in_sector,
1316                        allocated_space,
1317                    );
1318
1319                    let single_disk_farm_info_lock =
1320                        single_disk_farm_info.store_to(directory, !disable_farm_locking)?;
1321
1322                    (single_disk_farm_info, single_disk_farm_info_lock)
1323                }
1324            };
1325
1326        let pieces_in_sector = single_disk_farm_info.pieces_in_sector();
1327        let sector_size = sector_size(pieces_in_sector) as u64;
1328        let sector_metadata_size = SectorMetadataChecksummed::encoded_size();
1329        let allocated_space_distribution = AllocatedSpaceDistribution::new(
1330            allocated_space,
1331            sector_size,
1332            cache_percentage,
1333            sector_metadata_size as u64,
1334        )?;
1335        let target_sector_count = allocated_space_distribution.target_sector_count;
1336
1337        let metadata_file_path = directory.join(Self::METADATA_FILE);
1338        let metadata_file = DirectIoFileWrapper::open(&metadata_file_path)?;
1339
1340        let metadata_size = metadata_file.size()?;
1341        let expected_metadata_size = allocated_space_distribution.metadata_file_size;
1342        // Align plot file size for disk sector size
1343        let expected_metadata_size =
1344            expected_metadata_size.div_ceil(DISK_PAGE_SIZE as u64) * DISK_PAGE_SIZE as u64;
1345        let metadata_header = if metadata_size == 0 {
1346            let metadata_header = PlotMetadataHeader {
1347                version: SingleDiskFarm::SUPPORTED_PLOT_VERSION,
1348                plotted_sector_count: 0,
1349            };
1350
1351            metadata_file
1352                .preallocate(expected_metadata_size)
1353                .map_err(SingleDiskFarmError::CantPreallocateMetadataFile)?;
1354            metadata_file.write_all_at(metadata_header.encode().as_slice(), 0)?;
1355
1356            metadata_header
1357        } else {
1358            if metadata_size != expected_metadata_size {
1359                // Allocating the whole file (`set_len` below can create a sparse file, which will
1360                // cause writes to fail later)
1361                metadata_file
1362                    .preallocate(expected_metadata_size)
1363                    .map_err(SingleDiskFarmError::CantPreallocateMetadataFile)?;
1364                // Truncating file (if necessary)
1365                metadata_file.set_len(expected_metadata_size)?;
1366            }
1367
1368            let mut metadata_header_bytes = vec![0; PlotMetadataHeader::encoded_size()];
1369            metadata_file.read_exact_at(&mut metadata_header_bytes, 0)?;
1370
1371            let mut metadata_header =
1372                PlotMetadataHeader::decode(&mut metadata_header_bytes.as_ref())
1373                    .map_err(SingleDiskFarmError::FailedToDecodeMetadataHeader)?;
1374
1375            if metadata_header.version != SingleDiskFarm::SUPPORTED_PLOT_VERSION {
1376                return Err(SingleDiskFarmError::UnexpectedMetadataVersion(
1377                    metadata_header.version,
1378                ));
1379            }
1380
1381            if metadata_header.plotted_sector_count > target_sector_count {
1382                metadata_header.plotted_sector_count = target_sector_count;
1383                metadata_file.write_all_at(&metadata_header.encode(), 0)?;
1384            }
1385
1386            metadata_header
1387        };
1388
1389        let sectors_metadata = {
1390            let mut sectors_metadata =
1391                Vec::<SectorMetadataChecksummed>::with_capacity(usize::from(target_sector_count));
1392
1393            let mut sector_metadata_bytes = vec![0; sector_metadata_size];
1394            for sector_index in
1395                SectorIndex::ZERO..SectorIndex::new(metadata_header.plotted_sector_count)
1396            {
1397                let sector_offset =
1398                    RESERVED_PLOT_METADATA + sector_metadata_size as u64 * u64::from(sector_index);
1399                metadata_file.read_exact_at(&mut sector_metadata_bytes, sector_offset)?;
1400
1401                let sector_metadata =
1402                    match SectorMetadataChecksummed::decode(&mut sector_metadata_bytes.as_ref()) {
1403                        Ok(sector_metadata) => sector_metadata,
1404                        Err(error) => {
1405                            warn!(
1406                                path = %metadata_file_path.display(),
1407                                %error,
1408                                %sector_index,
1409                                "Failed to decode sector metadata, replacing with dummy expired \
1410                                sector metadata"
1411                            );
1412
1413                            let dummy_sector = SectorMetadataChecksummed::from(SectorMetadata {
1414                                sector_index,
1415                                pieces_in_sector,
1416                                s_bucket_sizes: Box::new([0; Record::NUM_S_BUCKETS]),
1417                                history_size: HistorySize::from(SegmentIndex::ZERO),
1418                            });
1419                            metadata_file.write_all_at(&dummy_sector.encode(), sector_offset)?;
1420
1421                            dummy_sector
1422                        }
1423                    };
1424                sectors_metadata.push(sector_metadata);
1425            }
1426
1427            Arc::new(AsyncRwLock::new(sectors_metadata))
1428        };
1429
1430        let plot_file = DirectIoFileWrapper::open(directory.join(Self::PLOT_FILE))?;
1431
1432        if plot_file.size()? != allocated_space_distribution.plot_file_size {
1433            // Allocating the whole file (`set_len` below can create a sparse file, which will cause
1434            // writes to fail later)
1435            plot_file
1436                .preallocate(allocated_space_distribution.plot_file_size)
1437                .map_err(SingleDiskFarmError::CantPreallocatePlotFile)?;
1438            // Truncating file (if necessary)
1439            plot_file.set_len(allocated_space_distribution.plot_file_size)?;
1440        }
1441
1442        let plot_file = Arc::new(plot_file);
1443
1444        let plot_cache = DiskPlotCache::new(
1445            &plot_file,
1446            &sectors_metadata,
1447            target_sector_count,
1448            sector_size,
1449        );
1450
1451        Ok(SingleDiskFarmInit {
1452            identity,
1453            single_disk_farm_info,
1454            single_disk_farm_info_lock,
1455            plot_file,
1456            metadata_file,
1457            metadata_header,
1458            target_sector_count,
1459            sectors_metadata,
1460            piece_cache_capacity: allocated_space_distribution.piece_cache_capacity,
1461            plot_cache,
1462        })
1463    }
1464
1465    /// Collect summary of single disk farm for presentational purposes
1466    pub fn collect_summary(directory: PathBuf) -> SingleDiskFarmSummary {
1467        let single_disk_farm_info = match SingleDiskFarmInfo::load_from(&directory) {
1468            Ok(Some(single_disk_farm_info)) => single_disk_farm_info,
1469            Ok(None) => {
1470                return SingleDiskFarmSummary::NotFound { directory };
1471            }
1472            Err(error) => {
1473                return SingleDiskFarmSummary::Error { directory, error };
1474            }
1475        };
1476
1477        SingleDiskFarmSummary::Found {
1478            info: single_disk_farm_info,
1479            directory,
1480        }
1481    }
1482
1483    /// Effective on-disk allocation of the files related to the farm (takes some buffer space
1484    /// into consideration).
1485    ///
1486    /// This is a helpful number in case some files were not allocated properly or were removed and
1487    /// do not correspond to allocated space in the farm info accurately.
1488    pub fn effective_disk_usage(
1489        directory: &Path,
1490        cache_percentage: u8,
1491    ) -> Result<u64, SingleDiskFarmError> {
1492        let mut effective_disk_usage;
1493        match SingleDiskFarmInfo::load_from(directory)? {
1494            Some(single_disk_farm_info) => {
1495                let allocated_space_distribution = AllocatedSpaceDistribution::new(
1496                    single_disk_farm_info.allocated_space(),
1497                    sector_size(single_disk_farm_info.pieces_in_sector()) as u64,
1498                    cache_percentage,
1499                    SectorMetadataChecksummed::encoded_size() as u64,
1500                )?;
1501
1502                effective_disk_usage = single_disk_farm_info.allocated_space();
1503                effective_disk_usage -= Identity::file_size() as u64;
1504                effective_disk_usage -= allocated_space_distribution.metadata_file_size;
1505                effective_disk_usage -= allocated_space_distribution.plot_file_size;
1506                effective_disk_usage -= allocated_space_distribution.piece_cache_file_size;
1507            }
1508            None => {
1509                // No farm info, try to collect actual file sizes is any
1510                effective_disk_usage = 0;
1511            }
1512        };
1513
1514        if Identity::open(directory)?.is_some() {
1515            effective_disk_usage += Identity::file_size() as u64;
1516        }
1517
1518        match OpenOptions::new()
1519            .read(true)
1520            .open(directory.join(Self::METADATA_FILE))
1521        {
1522            Ok(metadata_file) => {
1523                effective_disk_usage += metadata_file.size()?;
1524            }
1525            Err(error) => {
1526                if error.kind() == io::ErrorKind::NotFound {
1527                    // File is not stored on disk
1528                } else {
1529                    return Err(error.into());
1530                }
1531            }
1532        };
1533
1534        match OpenOptions::new()
1535            .read(true)
1536            .open(directory.join(Self::PLOT_FILE))
1537        {
1538            Ok(plot_file) => {
1539                effective_disk_usage += plot_file.size()?;
1540            }
1541            Err(error) => {
1542                if error.kind() == io::ErrorKind::NotFound {
1543                    // File is not stored on disk
1544                } else {
1545                    return Err(error.into());
1546                }
1547            }
1548        };
1549
1550        match OpenOptions::new()
1551            .read(true)
1552            .open(directory.join(DiskPieceCache::FILE_NAME))
1553        {
1554            Ok(piece_cache) => {
1555                effective_disk_usage += piece_cache.size()?;
1556            }
1557            Err(error) => {
1558                if error.kind() == io::ErrorKind::NotFound {
1559                    // File is not stored on disk
1560                } else {
1561                    return Err(error.into());
1562                }
1563            }
1564        };
1565
1566        Ok(effective_disk_usage)
1567    }
1568
1569    /// Read all sectors metadata
1570    pub fn read_all_sectors_metadata(
1571        directory: &Path,
1572    ) -> io::Result<Vec<SectorMetadataChecksummed>> {
1573        let metadata_file = DirectIoFileWrapper::open(directory.join(Self::METADATA_FILE))?;
1574
1575        let metadata_size = metadata_file.size()?;
1576        let sector_metadata_size = SectorMetadataChecksummed::encoded_size();
1577
1578        let mut metadata_header_bytes = vec![0; PlotMetadataHeader::encoded_size()];
1579        metadata_file.read_exact_at(&mut metadata_header_bytes, 0)?;
1580
1581        let metadata_header = PlotMetadataHeader::decode(&mut metadata_header_bytes.as_ref())
1582            .map_err(|error| {
1583                io::Error::other(format!("Failed to decode metadata header: {error}"))
1584            })?;
1585
1586        if metadata_header.version != SingleDiskFarm::SUPPORTED_PLOT_VERSION {
1587            return Err(io::Error::other(format!(
1588                "Unsupported metadata version {}",
1589                metadata_header.version
1590            )));
1591        }
1592
1593        let mut sectors_metadata = Vec::<SectorMetadataChecksummed>::with_capacity(
1594            ((metadata_size - RESERVED_PLOT_METADATA) / sector_metadata_size as u64) as usize,
1595        );
1596
1597        let mut sector_metadata_bytes = vec![0; sector_metadata_size];
1598        for sector_index in 0..metadata_header.plotted_sector_count {
1599            metadata_file.read_exact_at(
1600                &mut sector_metadata_bytes,
1601                RESERVED_PLOT_METADATA + sector_metadata_size as u64 * u64::from(sector_index),
1602            )?;
1603            sectors_metadata.push(
1604                SectorMetadataChecksummed::decode(&mut sector_metadata_bytes.as_ref()).map_err(
1605                    |error| io::Error::other(format!("Failed to decode sector metadata: {error}")),
1606                )?,
1607            );
1608        }
1609
1610        Ok(sectors_metadata)
1611    }
1612
1613    /// ID of this farm
1614    pub fn id(&self) -> &FarmId {
1615        self.single_disk_farm_info.id()
1616    }
1617
1618    /// Info of this farm
1619    pub fn info(&self) -> &SingleDiskFarmInfo {
1620        &self.single_disk_farm_info
1621    }
1622
1623    /// Number of sectors in this farm
1624    pub fn total_sectors_count(&self) -> u16 {
1625        self.total_sectors_count
1626    }
1627
1628    /// Read information about sectors plotted so far
1629    pub fn plotted_sectors(&self) -> SingleDiskPlottedSectors {
1630        SingleDiskPlottedSectors {
1631            public_key_hash: self.single_disk_farm_info.public_key().hash(),
1632            pieces_in_sector: self.pieces_in_sector,
1633            farmer_protocol_info: self.farmer_protocol_info,
1634            sectors_metadata: Arc::clone(&self.sectors_metadata),
1635        }
1636    }
1637
1638    /// Get piece cache instance
1639    pub fn piece_cache(&self) -> SingleDiskPieceCache {
1640        self.piece_cache.clone()
1641    }
1642
1643    /// Get plot cache instance
1644    pub fn plot_cache(&self) -> DiskPlotCache {
1645        self.plot_cache.clone()
1646    }
1647
1648    /// Get piece reader to read plotted pieces later
1649    pub fn piece_reader(&self) -> DiskPieceReader {
1650        self.piece_reader.clone()
1651    }
1652
1653    /// Subscribe to sector updates
1654    pub fn on_sector_update(&self, callback: HandlerFn<(SectorIndex, SectorUpdate)>) -> HandlerId {
1655        self.handlers.sector_update.add(callback)
1656    }
1657
1658    /// Subscribe to farming notifications
1659    pub fn on_farming_notification(&self, callback: HandlerFn<FarmingNotification>) -> HandlerId {
1660        self.handlers.farming_notification.add(callback)
1661    }
1662
1663    /// Subscribe to new solution notification
1664    pub fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> HandlerId {
1665        self.handlers.solution.add(callback)
1666    }
1667
1668    /// Run and wait for background threads to exit or return an error
1669    pub async fn run(mut self) -> anyhow::Result<()> {
1670        if let Some(start_sender) = self.start_sender.take() {
1671            // Do not care if anyone is listening on the other side
1672            let _ = start_sender.send(());
1673        }
1674
1675        while let Some(result) = self.tasks.next().instrument(self.span.clone()).await {
1676            result?;
1677        }
1678
1679        Ok(())
1680    }
1681
1682    /// Wipe everything that belongs to this single disk farm
1683    pub fn wipe(directory: &Path) -> io::Result<()> {
1684        let single_disk_info_info_path = directory.join(SingleDiskFarmInfo::FILE_NAME);
1685        match SingleDiskFarmInfo::load_from(directory) {
1686            Ok(Some(single_disk_farm_info)) => {
1687                info!("Found single disk farm {}", single_disk_farm_info.id());
1688            }
1689            Ok(None) => {
1690                return Err(io::Error::new(
1691                    io::ErrorKind::NotFound,
1692                    format!(
1693                        "Single disk farm info not found at {}",
1694                        single_disk_info_info_path.display()
1695                    ),
1696                ));
1697            }
1698            Err(error) => {
1699                warn!("Found unknown single disk farm: {}", error);
1700            }
1701        }
1702
1703        {
1704            let plot = directory.join(Self::PLOT_FILE);
1705            if plot.exists() {
1706                info!("Deleting plot file at {}", plot.display());
1707                fs::remove_file(plot)?;
1708            }
1709        }
1710        {
1711            let metadata = directory.join(Self::METADATA_FILE);
1712            if metadata.exists() {
1713                info!("Deleting metadata file at {}", metadata.display());
1714                fs::remove_file(metadata)?;
1715            }
1716        }
1717        // TODO: Identity should be able to wipe itself instead of assuming a specific file name
1718        //  here
1719        {
1720            let identity = directory.join("identity.bin");
1721            if identity.exists() {
1722                info!("Deleting identity file at {}", identity.display());
1723                fs::remove_file(identity)?;
1724            }
1725        }
1726
1727        DiskPieceCache::wipe(directory)?;
1728
1729        info!(
1730            "Deleting info file at {}",
1731            single_disk_info_info_path.display()
1732        );
1733        fs::remove_file(single_disk_info_info_path)
1734    }
1735
1736    /// Check the farm for corruption and repair errors (caused by disk errors or something else),
1737    /// returns an error when irrecoverable errors occur.
1738    pub fn scrub(
1739        directory: &Path,
1740        disable_farm_locking: bool,
1741        target: ScrubTarget,
1742        dry_run: bool,
1743    ) -> Result<(), SingleDiskFarmScrubError> {
1744        let span = Span::current();
1745
1746        if dry_run {
1747            info!("Dry run is used, no changes will be written to disk");
1748        }
1749
1750        if target.metadata() || target.plot() {
1751            let info = {
1752                let file = directory.join(SingleDiskFarmInfo::FILE_NAME);
1753                info!(path = %file.display(), "Checking info file");
1754
1755                match SingleDiskFarmInfo::load_from(directory) {
1756                    Ok(Some(info)) => info,
1757                    Ok(None) => {
1758                        return Err(SingleDiskFarmScrubError::FarmInfoFileDoesNotExist { file });
1759                    }
1760                    Err(error) => {
1761                        return Err(SingleDiskFarmScrubError::FarmInfoCantBeOpened { file, error });
1762                    }
1763                }
1764            };
1765
1766            let _single_disk_farm_info_lock = if disable_farm_locking {
1767                None
1768            } else {
1769                Some(
1770                    SingleDiskFarmInfo::try_lock(directory)
1771                        .map_err(SingleDiskFarmScrubError::LikelyAlreadyInUse)?,
1772                )
1773            };
1774
1775            let identity = {
1776                let file = directory.join(Identity::FILE_NAME);
1777                info!(path = %file.display(), "Checking identity file");
1778
1779                match Identity::open(directory) {
1780                    Ok(Some(identity)) => identity,
1781                    Ok(None) => {
1782                        return Err(SingleDiskFarmScrubError::IdentityFileDoesNotExist { file });
1783                    }
1784                    Err(error) => {
1785                        return Err(SingleDiskFarmScrubError::IdentityCantBeOpened { file, error });
1786                    }
1787                }
1788            };
1789
1790            if &identity.public_key() != info.public_key() {
1791                return Err(SingleDiskFarmScrubError::PublicKeyMismatch {
1792                    identity: identity.public_key(),
1793                    info: *info.public_key(),
1794                });
1795            }
1796
1797            let sector_metadata_size = SectorMetadataChecksummed::encoded_size();
1798
1799            let metadata_file_path = directory.join(Self::METADATA_FILE);
1800            let (metadata_file, mut metadata_header) = {
1801                info!(path = %metadata_file_path.display(), "Checking metadata file");
1802
1803                let metadata_file = match OpenOptions::new()
1804                    .read(true)
1805                    .write(!dry_run)
1806                    .open(&metadata_file_path)
1807                {
1808                    Ok(metadata_file) => metadata_file,
1809                    Err(error) => {
1810                        return Err(if error.kind() == io::ErrorKind::NotFound {
1811                            SingleDiskFarmScrubError::MetadataFileDoesNotExist {
1812                                file: metadata_file_path,
1813                            }
1814                        } else {
1815                            SingleDiskFarmScrubError::MetadataCantBeOpened {
1816                                file: metadata_file_path,
1817                                error,
1818                            }
1819                        });
1820                    }
1821                };
1822
1823                // Error doesn't matter here
1824                let _ = metadata_file.advise_sequential_access();
1825
1826                let metadata_size = match metadata_file.size() {
1827                    Ok(metadata_size) => metadata_size,
1828                    Err(error) => {
1829                        return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize {
1830                            file: metadata_file_path,
1831                            error,
1832                        });
1833                    }
1834                };
1835
1836                if metadata_size < RESERVED_PLOT_METADATA {
1837                    return Err(SingleDiskFarmScrubError::MetadataFileTooSmall {
1838                        file: metadata_file_path,
1839                        reserved_size: RESERVED_PLOT_METADATA,
1840                        size: metadata_size,
1841                    });
1842                }
1843
1844                let mut metadata_header = {
1845                    let mut reserved_metadata = vec![0; RESERVED_PLOT_METADATA as usize];
1846
1847                    if let Err(error) = metadata_file.read_exact_at(&mut reserved_metadata, 0) {
1848                        return Err(SingleDiskFarmScrubError::FailedToReadBytes {
1849                            file: metadata_file_path,
1850                            size: RESERVED_PLOT_METADATA,
1851                            offset: 0,
1852                            error,
1853                        });
1854                    }
1855
1856                    PlotMetadataHeader::decode(&mut reserved_metadata.as_slice())
1857                        .map_err(SingleDiskFarmScrubError::FailedToDecodeMetadataHeader)?
1858                };
1859
1860                if metadata_header.version != SingleDiskFarm::SUPPORTED_PLOT_VERSION {
1861                    return Err(SingleDiskFarmScrubError::UnexpectedMetadataVersion(
1862                        metadata_header.version,
1863                    ));
1864                }
1865
1866                let plotted_sector_count = metadata_header.plotted_sector_count;
1867
1868                let expected_metadata_size = RESERVED_PLOT_METADATA
1869                    + sector_metadata_size as u64 * u64::from(plotted_sector_count);
1870
1871                if metadata_size < expected_metadata_size {
1872                    warn!(
1873                        %metadata_size,
1874                        %expected_metadata_size,
1875                        "Metadata file size is smaller than expected, shrinking number of plotted \
1876                        sectors to correct value"
1877                    );
1878
1879                    metadata_header.plotted_sector_count =
1880                        ((metadata_size - RESERVED_PLOT_METADATA) / sector_metadata_size as u64)
1881                            as u16;
1882                    let metadata_header_bytes = metadata_header.encode();
1883
1884                    if !dry_run
1885                        && let Err(error) = metadata_file.write_all_at(&metadata_header_bytes, 0)
1886                    {
1887                        return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
1888                            file: metadata_file_path,
1889                            size: metadata_header_bytes.len() as u64,
1890                            offset: 0,
1891                            error,
1892                        });
1893                    }
1894                }
1895
1896                (metadata_file, metadata_header)
1897            };
1898
1899            let pieces_in_sector = info.pieces_in_sector();
1900            let sector_size = sector_size(pieces_in_sector) as u64;
1901
1902            let plot_file_path = directory.join(Self::PLOT_FILE);
1903            let plot_file = {
1904                let plot_file_path = directory.join(Self::PLOT_FILE);
1905                info!(path = %plot_file_path.display(), "Checking plot file");
1906
1907                let plot_file = match OpenOptions::new()
1908                    .read(true)
1909                    .write(!dry_run)
1910                    .open(&plot_file_path)
1911                {
1912                    Ok(plot_file) => plot_file,
1913                    Err(error) => {
1914                        return Err(if error.kind() == io::ErrorKind::NotFound {
1915                            SingleDiskFarmScrubError::MetadataFileDoesNotExist {
1916                                file: plot_file_path,
1917                            }
1918                        } else {
1919                            SingleDiskFarmScrubError::MetadataCantBeOpened {
1920                                file: plot_file_path,
1921                                error,
1922                            }
1923                        });
1924                    }
1925                };
1926
1927                // Error doesn't matter here
1928                let _ = plot_file.advise_sequential_access();
1929
1930                let plot_size = match plot_file.size() {
1931                    Ok(metadata_size) => metadata_size,
1932                    Err(error) => {
1933                        return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize {
1934                            file: plot_file_path,
1935                            error,
1936                        });
1937                    }
1938                };
1939
1940                let min_expected_plot_size =
1941                    u64::from(metadata_header.plotted_sector_count) * sector_size;
1942                if plot_size < min_expected_plot_size {
1943                    warn!(
1944                        %plot_size,
1945                        %min_expected_plot_size,
1946                        "Plot file size is smaller than expected, shrinking number of plotted \
1947                        sectors to correct value"
1948                    );
1949
1950                    metadata_header.plotted_sector_count = (plot_size / sector_size) as u16;
1951                    let metadata_header_bytes = metadata_header.encode();
1952
1953                    if !dry_run
1954                        && let Err(error) = metadata_file.write_all_at(&metadata_header_bytes, 0)
1955                    {
1956                        return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
1957                            file: plot_file_path,
1958                            size: metadata_header_bytes.len() as u64,
1959                            offset: 0,
1960                            error,
1961                        });
1962                    }
1963                }
1964
1965                plot_file
1966            };
1967
1968            let sector_bytes_range = 0..(sector_size as usize - Blake3Hash::SIZE);
1969
1970            info!("Checking sectors and corresponding metadata");
1971            (0..metadata_header.plotted_sector_count)
1972                .into_par_iter()
1973                .map(SectorIndex::new)
1974                .map_init(
1975                    || vec![0u8; Record::SIZE],
1976                    |scratch_buffer, sector_index| {
1977                        let _span_guard = span.enter();
1978
1979                        let offset = RESERVED_PLOT_METADATA
1980                            + u64::from(sector_index) * sector_metadata_size as u64;
1981                        if let Err(error) = metadata_file
1982                            .read_exact_at(&mut scratch_buffer[..sector_metadata_size], offset)
1983                        {
1984                            warn!(
1985                                path = %metadata_file_path.display(),
1986                                %error,
1987                                %offset,
1988                                size = %sector_metadata_size,
1989                                %sector_index,
1990                                "Failed to read sector metadata, replacing with dummy expired \
1991                                sector metadata"
1992                            );
1993
1994                            if !dry_run {
1995                                write_dummy_sector_metadata(
1996                                    &metadata_file,
1997                                    &metadata_file_path,
1998                                    sector_index,
1999                                    pieces_in_sector,
2000                                )?;
2001                            }
2002                            return Ok(());
2003                        }
2004
2005                        let sector_metadata = match SectorMetadataChecksummed::decode(
2006                            &mut &scratch_buffer[..sector_metadata_size],
2007                        ) {
2008                            Ok(sector_metadata) => sector_metadata,
2009                            Err(error) => {
2010                                warn!(
2011                                    path = %metadata_file_path.display(),
2012                                    %error,
2013                                    %sector_index,
2014                                    "Failed to decode sector metadata, replacing with dummy \
2015                                    expired sector metadata"
2016                                );
2017
2018                                if !dry_run {
2019                                    write_dummy_sector_metadata(
2020                                        &metadata_file,
2021                                        &metadata_file_path,
2022                                        sector_index,
2023                                        pieces_in_sector,
2024                                    )?;
2025                                }
2026                                return Ok(());
2027                            }
2028                        };
2029
2030                        if sector_metadata.sector_index != sector_index {
2031                            warn!(
2032                                path = %metadata_file_path.display(),
2033                                %sector_index,
2034                                found_sector_index = %sector_metadata.sector_index,
2035                                "Sector index mismatch, replacing with dummy expired sector \
2036                                metadata"
2037                            );
2038
2039                            if !dry_run {
2040                                write_dummy_sector_metadata(
2041                                    &metadata_file,
2042                                    &metadata_file_path,
2043                                    sector_index,
2044                                    pieces_in_sector,
2045                                )?;
2046                            }
2047                            return Ok(());
2048                        }
2049
2050                        if sector_metadata.pieces_in_sector != pieces_in_sector {
2051                            warn!(
2052                                path = %metadata_file_path.display(),
2053                                %sector_index,
2054                                %pieces_in_sector,
2055                                found_pieces_in_sector = sector_metadata.pieces_in_sector,
2056                                "Pieces in sector mismatch, replacing with dummy expired sector \
2057                                metadata"
2058                            );
2059
2060                            if !dry_run {
2061                                write_dummy_sector_metadata(
2062                                    &metadata_file,
2063                                    &metadata_file_path,
2064                                    sector_index,
2065                                    pieces_in_sector,
2066                                )?;
2067                            }
2068                            return Ok(());
2069                        }
2070
2071                        if target.plot() {
2072                            let mut hasher = blake3::Hasher::new();
2073                            // Read sector bytes and compute checksum
2074                            for offset_in_sector in
2075                                sector_bytes_range.clone().step_by(scratch_buffer.len())
2076                            {
2077                                let offset =
2078                                    u64::from(sector_index) * sector_size + offset_in_sector as u64;
2079                                let bytes_to_read = (offset_in_sector + scratch_buffer.len())
2080                                    .min(sector_bytes_range.end)
2081                                    - offset_in_sector;
2082
2083                                let bytes = &mut scratch_buffer[..bytes_to_read];
2084
2085                                if let Err(error) = plot_file.read_exact_at(bytes, offset) {
2086                                    warn!(
2087                                        path = %plot_file_path.display(),
2088                                        %error,
2089                                        %sector_index,
2090                                        %offset,
2091                                        size = %bytes.len() as u64,
2092                                        "Failed to read sector bytes"
2093                                    );
2094
2095                                    continue;
2096                                }
2097
2098                                hasher.update(bytes);
2099                            }
2100
2101                            let actual_checksum = *hasher.finalize().as_bytes();
2102                            let mut expected_checksum = [0; Blake3Hash::SIZE];
2103                            {
2104                                let offset = u64::from(sector_index) * sector_size
2105                                    + sector_bytes_range.end as u64;
2106                                if let Err(error) =
2107                                    plot_file.read_exact_at(&mut expected_checksum, offset)
2108                                {
2109                                    warn!(
2110                                        path = %plot_file_path.display(),
2111                                        %error,
2112                                        %sector_index,
2113                                        %offset,
2114                                        size = %expected_checksum.len() as u64,
2115                                        "Failed to read sector checksum bytes"
2116                                    );
2117                                }
2118                            }
2119
2120                            // Verify checksum
2121                            if actual_checksum != expected_checksum {
2122                                warn!(
2123                                    path = %plot_file_path.display(),
2124                                    %sector_index,
2125                                    actual_checksum = %hex::encode(actual_checksum),
2126                                    expected_checksum = %hex::encode(expected_checksum),
2127                                    "Plotted sector checksum mismatch, replacing with dummy \
2128                                    expired sector"
2129                                );
2130
2131                                if !dry_run {
2132                                    write_dummy_sector_metadata(
2133                                        &metadata_file,
2134                                        &metadata_file_path,
2135                                        sector_index,
2136                                        pieces_in_sector,
2137                                    )?;
2138                                }
2139
2140                                scratch_buffer.fill(0);
2141
2142                                hasher.reset();
2143                                // Fill sector with zeroes and compute checksum
2144                                for offset_in_sector in
2145                                    sector_bytes_range.clone().step_by(scratch_buffer.len())
2146                                {
2147                                    let offset = u64::from(sector_index) * sector_size
2148                                        + offset_in_sector as u64;
2149                                    let bytes_to_write = (offset_in_sector + scratch_buffer.len())
2150                                        .min(sector_bytes_range.end)
2151                                        - offset_in_sector;
2152                                    let bytes = &mut scratch_buffer[..bytes_to_write];
2153
2154                                    if !dry_run
2155                                        && let Err(error) = plot_file.write_all_at(bytes, offset)
2156                                    {
2157                                        return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2158                                            file: plot_file_path.clone(),
2159                                            size: scratch_buffer.len() as u64,
2160                                            offset,
2161                                            error,
2162                                        });
2163                                    }
2164
2165                                    hasher.update(bytes);
2166                                }
2167                                // Write checksum
2168                                {
2169                                    let checksum = *hasher.finalize().as_bytes();
2170                                    let offset = u64::from(sector_index) * sector_size
2171                                        + sector_bytes_range.end as u64;
2172                                    if !dry_run
2173                                        && let Err(error) =
2174                                            plot_file.write_all_at(&checksum, offset)
2175                                    {
2176                                        return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2177                                            file: plot_file_path.clone(),
2178                                            size: checksum.len() as u64,
2179                                            offset,
2180                                            error,
2181                                        });
2182                                    }
2183                                }
2184
2185                                return Ok(());
2186                            }
2187                        }
2188
2189                        trace!(%sector_index, "Sector is in good shape");
2190
2191                        Ok(())
2192                    },
2193                )
2194                .try_for_each({
2195                    let span = &span;
2196                    let checked_sectors = AtomicUsize::new(0);
2197
2198                    move |result| {
2199                        let _span_guard = span.enter();
2200
2201                        let checked_sectors = checked_sectors.fetch_add(1, Ordering::Relaxed);
2202                        if checked_sectors > 1 && checked_sectors.is_multiple_of(10) {
2203                            info!(
2204                                "Checked {}/{} sectors",
2205                                checked_sectors, metadata_header.plotted_sector_count
2206                            );
2207                        }
2208
2209                        result
2210                    }
2211                })?;
2212        }
2213
2214        if target.cache() {
2215            Self::scrub_cache(directory, dry_run)?;
2216        }
2217
2218        info!("Farm check completed");
2219
2220        Ok(())
2221    }
2222
2223    fn scrub_cache(directory: &Path, dry_run: bool) -> Result<(), SingleDiskFarmScrubError> {
2224        let span = Span::current();
2225
2226        let file = directory.join(DiskPieceCache::FILE_NAME);
2227        info!(path = %file.display(), "Checking cache file");
2228
2229        let cache_file = match OpenOptions::new().read(true).write(!dry_run).open(&file) {
2230            Ok(plot_file) => plot_file,
2231            Err(error) => {
2232                return if error.kind() == io::ErrorKind::NotFound {
2233                    warn!(
2234                        file = %file.display(),
2235                        "Cache file does not exist, this is expected in farming cluster"
2236                    );
2237                    Ok(())
2238                } else {
2239                    Err(SingleDiskFarmScrubError::CacheCantBeOpened { file, error })
2240                };
2241            }
2242        };
2243
2244        // Error doesn't matter here
2245        let _ = cache_file.advise_sequential_access();
2246
2247        let cache_size = match cache_file.size() {
2248            Ok(cache_size) => cache_size,
2249            Err(error) => {
2250                return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize { file, error });
2251            }
2252        };
2253
2254        let element_size = DiskPieceCache::element_size();
2255        let number_of_cached_elements = cache_size / u64::from(element_size);
2256        let dummy_element = vec![0; element_size as usize];
2257        (0..number_of_cached_elements)
2258            .into_par_iter()
2259            .map_with(vec![0; element_size as usize], |element, cache_offset| {
2260                let _span_guard = span.enter();
2261
2262                let offset = cache_offset * u64::from(element_size);
2263                if let Err(error) = cache_file.read_exact_at(element, offset) {
2264                    warn!(
2265                        path = %file.display(),
2266                        %cache_offset,
2267                        size = %element.len() as u64,
2268                        %offset,
2269                        %error,
2270                        "Failed to read cached piece, replacing with dummy element"
2271                    );
2272
2273                    if !dry_run && let Err(error) = cache_file.write_all_at(&dummy_element, offset)
2274                    {
2275                        return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2276                            file: file.clone(),
2277                            size: u64::from(element_size),
2278                            offset,
2279                            error,
2280                        });
2281                    }
2282
2283                    return Ok(());
2284                }
2285
2286                let (index_and_piece_bytes, expected_checksum) =
2287                    element.split_at(element_size as usize - Blake3Hash::SIZE);
2288                let actual_checksum = *blake3::hash(index_and_piece_bytes).as_bytes();
2289                if actual_checksum != expected_checksum && element != &dummy_element {
2290                    warn!(
2291                        %cache_offset,
2292                        actual_checksum = %hex::encode(actual_checksum),
2293                        expected_checksum = %hex::encode(expected_checksum),
2294                        "Cached piece checksum mismatch, replacing with dummy element"
2295                    );
2296
2297                    if !dry_run && let Err(error) = cache_file.write_all_at(&dummy_element, offset)
2298                    {
2299                        return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2300                            file: file.clone(),
2301                            size: u64::from(element_size),
2302                            offset,
2303                            error,
2304                        });
2305                    }
2306
2307                    return Ok(());
2308                }
2309
2310                Ok(())
2311            })
2312            .try_for_each({
2313                let span = &span;
2314                let checked_elements = AtomicUsize::new(0);
2315
2316                move |result| {
2317                    let _span_guard = span.enter();
2318
2319                    let checked_elements = checked_elements.fetch_add(1, Ordering::Relaxed);
2320                    if checked_elements > 1 && checked_elements.is_multiple_of(1000) {
2321                        info!(
2322                            "Checked {}/{} cache elements",
2323                            checked_elements, number_of_cached_elements
2324                        );
2325                    }
2326
2327                    result
2328                }
2329            })?;
2330
2331        Ok(())
2332    }
2333}
2334
2335fn write_dummy_sector_metadata(
2336    metadata_file: &File,
2337    metadata_file_path: &Path,
2338    sector_index: SectorIndex,
2339    pieces_in_sector: u16,
2340) -> Result<(), SingleDiskFarmScrubError> {
2341    let dummy_sector_bytes = SectorMetadataChecksummed::from(SectorMetadata {
2342        sector_index,
2343        pieces_in_sector,
2344        s_bucket_sizes: Box::new([0; Record::NUM_S_BUCKETS]),
2345        history_size: HistorySize::from(SegmentIndex::ZERO),
2346    })
2347    .encode();
2348    let sector_offset = RESERVED_PLOT_METADATA
2349        + u64::from(sector_index) * SectorMetadataChecksummed::encoded_size() as u64;
2350    metadata_file
2351        .write_all_at(&dummy_sector_bytes, sector_offset)
2352        .map_err(|error| SingleDiskFarmScrubError::FailedToWriteBytes {
2353            file: metadata_file_path.to_path_buf(),
2354            size: dummy_sector_bytes.len() as u64,
2355            offset: sector_offset,
2356            error,
2357        })
2358}