1mod block_sealing;
8pub mod direct_io_file_wrapper;
9pub mod farming;
10pub mod identity;
11mod metrics;
12pub mod piece_cache;
13pub mod piece_reader;
14pub mod plot_cache;
15mod plotted_sectors;
16mod plotting;
17
18use crate::disk_piece_cache::{DiskPieceCache, DiskPieceCacheError};
19use crate::farm::{
20 Farm, FarmId, FarmingError, FarmingNotification, HandlerFn, PieceCacheId, PieceReader,
21 PlottedSectors, SectorUpdate,
22};
23use crate::node_client::NodeClient;
24use crate::plotter::Plotter;
25use crate::single_disk_farm::block_sealing::block_sealing;
26use crate::single_disk_farm::direct_io_file_wrapper::{DISK_PAGE_SIZE, DirectIoFileWrapper};
27use crate::single_disk_farm::farming::rayon_files::RayonFiles;
28use crate::single_disk_farm::farming::{
29 FarmingOptions, PlotAudit, farming, slot_notification_forwarder,
30};
31use crate::single_disk_farm::identity::{Identity, IdentityError};
32use crate::single_disk_farm::metrics::SingleDiskFarmMetrics;
33use crate::single_disk_farm::piece_cache::SingleDiskPieceCache;
34use crate::single_disk_farm::piece_reader::DiskPieceReader;
35use crate::single_disk_farm::plot_cache::DiskPlotCache;
36use crate::single_disk_farm::plotted_sectors::SingleDiskPlottedSectors;
37pub use crate::single_disk_farm::plotting::PlottingError;
38use crate::single_disk_farm::plotting::{
39 PlottingOptions, PlottingSchedulerOptions, SectorPlottingOptions, plotting, plotting_scheduler,
40};
41use crate::utils::{AsyncJoinOnDrop, tokio_rayon_spawn_handler};
42use crate::{KNOWN_PEERS_CACHE_SIZE, farm};
43use ab_core_primitives::address::Address;
44use ab_core_primitives::block::BlockRoot;
45use ab_core_primitives::ed25519::Ed25519PublicKey;
46use ab_core_primitives::hashes::Blake3Hash;
47use ab_core_primitives::pieces::Record;
48use ab_core_primitives::sectors::SectorIndex;
49use ab_core_primitives::segments::{HistorySize, SegmentIndex};
50use ab_erasure_coding::ErasureCoding;
51use ab_farmer_components::FarmerProtocolInfo;
52use ab_farmer_components::file_ext::FileExt;
53use ab_farmer_components::sector::{SectorMetadata, SectorMetadataChecksummed, sector_size};
54use ab_farmer_rpc_primitives::{FarmerAppInfo, SolutionResponse};
55use ab_networking::KnownPeersManager;
56use ab_proof_of_space::Table;
57use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
58use async_trait::async_trait;
59use bytesize::ByteSize;
60use event_listener_primitives::{Bag, HandlerId};
61use futures::channel::{mpsc, oneshot};
62use futures::stream::FuturesUnordered;
63use futures::{FutureExt, StreamExt, select};
64use parity_scale_codec::{Decode, Encode};
65use parking_lot::Mutex;
66use prometheus_client::registry::Registry;
67use rayon::prelude::*;
68use rayon::{ThreadPoolBuildError, ThreadPoolBuilder};
69use serde::{Deserialize, Serialize};
70use std::collections::HashSet;
71use std::fs::{File, OpenOptions};
72use std::future::Future;
73use std::io::Write;
74use std::num::{NonZeroU32, NonZeroUsize};
75use std::path::{Path, PathBuf};
76use std::pin::Pin;
77use std::str::FromStr;
78use std::sync::Arc;
79use std::sync::atomic::{AtomicUsize, Ordering};
80use std::time::Duration;
81use std::{fmt, fs, io, mem};
82use thiserror::Error;
83use tokio::runtime::Handle;
84use tokio::sync::broadcast;
85use tokio::task;
86use tracing::{Instrument, Span, error, info, trace, warn};
87
88const _: () = {
91 assert!(mem::size_of::<usize>() >= mem::size_of::<u64>());
92};
93
94const RESERVED_PLOT_METADATA: u64 = 1024 * 1024;
96const RESERVED_FARM_INFO: u64 = 1024 * 1024;
98const NEW_SEGMENT_PROCESSING_DELAY: Duration = Duration::from_mins(10);
99
100#[derive(Debug)]
102#[must_use = "Lock file must be kept around or as long as farm is used"]
103pub struct SingleDiskFarmInfoLock {
104 _file: File,
105}
106
107#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
109#[serde(rename_all = "camelCase")]
110pub enum SingleDiskFarmInfo {
111 #[serde(rename_all = "camelCase")]
113 V0 {
114 id: FarmId,
116 genesis_root: BlockRoot,
118 public_key: Ed25519PublicKey,
120 pieces_in_sector: u16,
122 allocated_space: u64,
124 },
125}
126
127impl SingleDiskFarmInfo {
128 const FILE_NAME: &'static str = "single_disk_farm.json";
129
130 pub fn new(
132 id: FarmId,
133 genesis_root: BlockRoot,
134 public_key: Ed25519PublicKey,
135 pieces_in_sector: u16,
136 allocated_space: u64,
137 ) -> Self {
138 Self::V0 {
139 id,
140 genesis_root,
141 public_key,
142 pieces_in_sector,
143 allocated_space,
144 }
145 }
146
147 pub fn load_from(directory: &Path) -> io::Result<Option<Self>> {
150 let bytes = match fs::read(directory.join(Self::FILE_NAME)) {
151 Ok(bytes) => bytes,
152 Err(error) => {
153 return if error.kind() == io::ErrorKind::NotFound {
154 Ok(None)
155 } else {
156 Err(error)
157 };
158 }
159 };
160
161 serde_json::from_slice(&bytes)
162 .map(Some)
163 .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))
164 }
165
166 pub fn store_to(
170 &self,
171 directory: &Path,
172 lock: bool,
173 ) -> io::Result<Option<SingleDiskFarmInfoLock>> {
174 let mut file = OpenOptions::new()
175 .write(true)
176 .create(true)
177 .truncate(false)
178 .open(directory.join(Self::FILE_NAME))?;
179 if lock {
180 fs4::fs_std::FileExt::try_lock_exclusive(&file)?;
181 }
182 file.set_len(0)?;
183 file.write_all(&serde_json::to_vec(self).expect("Info serialization never fails; qed"))?;
184
185 Ok(lock.then_some(SingleDiskFarmInfoLock { _file: file }))
186 }
187
188 pub fn try_lock(directory: &Path) -> io::Result<SingleDiskFarmInfoLock> {
191 let file = File::open(directory.join(Self::FILE_NAME))?;
192 fs4::fs_std::FileExt::try_lock_exclusive(&file)?;
193
194 Ok(SingleDiskFarmInfoLock { _file: file })
195 }
196
197 pub fn id(&self) -> &FarmId {
199 let Self::V0 { id, .. } = self;
200 id
201 }
202
203 pub fn genesis_root(&self) -> &BlockRoot {
205 let Self::V0 { genesis_root, .. } = self;
206 genesis_root
207 }
208
209 pub fn public_key(&self) -> &Ed25519PublicKey {
211 let Self::V0 { public_key, .. } = self;
212 public_key
213 }
214
215 pub fn pieces_in_sector(&self) -> u16 {
217 match self {
218 SingleDiskFarmInfo::V0 {
219 pieces_in_sector, ..
220 } => *pieces_in_sector,
221 }
222 }
223
224 pub fn allocated_space(&self) -> u64 {
226 match self {
227 SingleDiskFarmInfo::V0 {
228 allocated_space, ..
229 } => *allocated_space,
230 }
231 }
232}
233
234#[derive(Debug)]
236pub enum SingleDiskFarmSummary {
237 Found {
239 info: SingleDiskFarmInfo,
241 directory: PathBuf,
243 },
244 NotFound {
246 directory: PathBuf,
248 },
249 Error {
251 directory: PathBuf,
253 error: io::Error,
255 },
256}
257
258#[derive(Debug, Encode, Decode)]
259struct PlotMetadataHeader {
260 version: u8,
261 plotted_sector_count: u16,
262}
263
264impl PlotMetadataHeader {
265 #[inline]
266 fn encoded_size() -> usize {
267 let default = PlotMetadataHeader {
268 version: 0,
269 plotted_sector_count: 0,
270 };
271
272 default.encoded_size()
273 }
274}
275
276#[derive(Debug)]
278pub struct SingleDiskFarmOptions<'a, NC>
279where
280 NC: Clone,
281{
282 pub directory: PathBuf,
284 pub farmer_app_info: FarmerAppInfo,
286 pub allocated_space: u64,
288 pub max_pieces_in_sector: u16,
290 pub node_client: NC,
292 pub reward_address: Address,
294 pub plotter: Arc<dyn Plotter + Send + Sync>,
296 pub erasure_coding: ErasureCoding,
298 pub cache_percentage: u8,
300 pub farming_thread_pool_size: usize,
303 pub plotting_delay: Option<oneshot::Receiver<()>>,
306 pub global_mutex: Arc<AsyncMutex<()>>,
310 pub max_plotting_sectors_per_farm: NonZeroUsize,
312 pub disable_farm_locking: bool,
314 pub registry: Option<&'a Mutex<&'a mut Registry>>,
316 pub create: bool,
318}
319
320#[derive(Debug, Error)]
322pub enum SingleDiskFarmError {
323 #[error("Failed to open or create identity: {0}")]
325 FailedToOpenIdentity(#[from] IdentityError),
326 #[error("Farm is likely already in use, make sure no other farmer is using it: {0}")]
328 LikelyAlreadyInUse(io::Error),
329 #[error("Single disk farm I/O error: {0}")]
331 Io(#[from] io::Error),
332 #[error("Failed to spawn task for blocking thread: {0}")]
334 TokioJoinError(#[from] task::JoinError),
335 #[error("Piece cache error: {0}")]
337 PieceCacheError(#[from] DiskPieceCacheError),
338 #[error("Can't preallocate metadata file, probably not enough space on disk: {0}")]
340 CantPreallocateMetadataFile(io::Error),
341 #[error("Can't preallocate plot file, probably not enough space on disk: {0}")]
343 CantPreallocatePlotFile(io::Error),
344 #[error(
346 "Genesis hash of farm {id} {wrong_chain} is different from {correct_chain} when farm was \
347 created, it is not possible to use farm on a different chain"
348 )]
349 WrongChain {
350 id: FarmId,
352 correct_chain: String,
355 wrong_chain: String,
357 },
358 #[error(
360 "Public key of farm {id} {wrong_public_key} is different from {correct_public_key} when \
361 farm was created, something went wrong, likely due to manual edits"
362 )]
363 IdentityMismatch {
364 id: FarmId,
366 correct_public_key: Ed25519PublicKey,
368 wrong_public_key: Ed25519PublicKey,
370 },
371 #[error(
373 "Invalid number pieces in sector: max supported {max_supported}, farm initialized with \
374 {initialized_with}"
375 )]
376 InvalidPiecesInSector {
377 id: FarmId,
379 max_supported: u16,
381 initialized_with: u16,
383 },
384 #[error("Failed to decode metadata header: {0}")]
386 FailedToDecodeMetadataHeader(parity_scale_codec::Error),
387 #[error("Unexpected metadata version {0}")]
389 UnexpectedMetadataVersion(u8),
390 #[error(
392 "Allocated space is not enough for one sector. \
393 The lowest acceptable value for allocated space is {min_space} bytes, \
394 provided {allocated_space} bytes."
395 )]
396 InsufficientAllocatedSpace {
397 min_space: u64,
399 allocated_space: u64,
401 },
402 #[error(
404 "Farm is too large: allocated {allocated_sectors} sectors ({allocated_space} bytes), max \
405 supported is {max_sectors} ({max_space} bytes). Consider creating multiple smaller farms \
406 instead."
407 )]
408 FarmTooLarge {
409 allocated_space: u64,
411 allocated_sectors: u64,
413 max_space: u64,
415 max_sectors: u16,
417 },
418 #[error("Failed to create thread pool: {0}")]
420 FailedToCreateThreadPool(ThreadPoolBuildError),
421}
422
423#[derive(Debug, Error)]
425pub enum SingleDiskFarmScrubError {
426 #[error("Farm is likely already in use, make sure no other farmer is using it: {0}")]
428 LikelyAlreadyInUse(io::Error),
429 #[error("Failed to file size of {file}: {error}")]
431 FailedToDetermineFileSize {
432 file: PathBuf,
434 error: io::Error,
436 },
437 #[error("Failed to read {size} bytes from {file} at offset {offset}: {error}")]
439 FailedToReadBytes {
440 file: PathBuf,
442 size: u64,
444 offset: u64,
446 error: io::Error,
448 },
449 #[error("Failed to write {size} bytes from {file} at offset {offset}: {error}")]
451 FailedToWriteBytes {
452 file: PathBuf,
454 size: u64,
456 offset: u64,
458 error: io::Error,
460 },
461 #[error("Farm info file does not exist at {file}")]
463 FarmInfoFileDoesNotExist {
464 file: PathBuf,
466 },
467 #[error("Farm info at {file} can't be opened: {error}")]
469 FarmInfoCantBeOpened {
470 file: PathBuf,
472 error: io::Error,
474 },
475 #[error("Identity file does not exist at {file}")]
477 IdentityFileDoesNotExist {
478 file: PathBuf,
480 },
481 #[error("Identity at {file} can't be opened: {error}")]
483 IdentityCantBeOpened {
484 file: PathBuf,
486 error: IdentityError,
488 },
489 #[error("Identity public key {identity} doesn't match public key in the disk farm info {info}")]
491 PublicKeyMismatch {
492 identity: Ed25519PublicKey,
494 info: Ed25519PublicKey,
496 },
497 #[error("Metadata file does not exist at {file}")]
499 MetadataFileDoesNotExist {
500 file: PathBuf,
502 },
503 #[error("Metadata at {file} can't be opened: {error}")]
505 MetadataCantBeOpened {
506 file: PathBuf,
508 error: io::Error,
510 },
511 #[error(
513 "Metadata file at {file} is too small: reserved size is {reserved_size} bytes, file size \
514 is {size}"
515 )]
516 MetadataFileTooSmall {
517 file: PathBuf,
519 reserved_size: u64,
521 size: u64,
523 },
524 #[error("Failed to decode metadata header: {0}")]
526 FailedToDecodeMetadataHeader(parity_scale_codec::Error),
527 #[error("Unexpected metadata version {0}")]
529 UnexpectedMetadataVersion(u8),
530 #[error("Cache at {file} can't be opened: {error}")]
532 CacheCantBeOpened {
533 file: PathBuf,
535 error: io::Error,
537 },
538}
539
540#[derive(Debug, Error)]
542pub enum BackgroundTaskError {
543 #[error(transparent)]
545 Plotting(#[from] PlottingError),
546 #[error(transparent)]
548 Farming(#[from] FarmingError),
549 #[error(transparent)]
551 BlockSealing(#[from] anyhow::Error),
552 #[error("Background task {task} panicked")]
554 BackgroundTaskPanicked {
555 task: String,
557 },
558}
559
560type BackgroundTask = Pin<Box<dyn Future<Output = Result<(), BackgroundTaskError>> + Send>>;
561
562#[derive(Debug, Copy, Clone)]
564pub enum ScrubTarget {
565 All,
567 Metadata,
569 Plot,
571 Cache,
573}
574
575impl fmt::Display for ScrubTarget {
576 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
577 match self {
578 Self::All => f.write_str("all"),
579 Self::Metadata => f.write_str("metadata"),
580 Self::Plot => f.write_str("plot"),
581 Self::Cache => f.write_str("cache"),
582 }
583 }
584}
585
586impl FromStr for ScrubTarget {
587 type Err = String;
588
589 fn from_str(s: &str) -> Result<Self, Self::Err> {
590 match s {
591 "all" => Ok(Self::All),
592 "metadata" => Ok(Self::Metadata),
593 "plot" => Ok(Self::Plot),
594 "cache" => Ok(Self::Cache),
595 s => Err(format!("Can't parse {s} as `ScrubTarget`")),
596 }
597 }
598}
599
600impl ScrubTarget {
601 fn metadata(&self) -> bool {
602 match self {
603 Self::All | Self::Metadata | Self::Plot => true,
604 Self::Cache => false,
605 }
606 }
607
608 fn plot(&self) -> bool {
609 match self {
610 Self::All | Self::Plot => true,
611 Self::Metadata | Self::Cache => false,
612 }
613 }
614
615 fn cache(&self) -> bool {
616 match self {
617 Self::All | Self::Cache => true,
618 Self::Metadata | Self::Plot => false,
619 }
620 }
621}
622
623struct AllocatedSpaceDistribution {
624 piece_cache_file_size: u64,
625 piece_cache_capacity: u32,
626 plot_file_size: u64,
627 target_sector_count: u16,
628 metadata_file_size: u64,
629}
630
631impl AllocatedSpaceDistribution {
632 fn new(
633 allocated_space: u64,
634 sector_size: u64,
635 cache_percentage: u8,
636 sector_metadata_size: u64,
637 ) -> Result<Self, SingleDiskFarmError> {
638 let single_sector_overhead = sector_size + sector_metadata_size;
639 let fixed_space_usage = RESERVED_PLOT_METADATA
641 + RESERVED_FARM_INFO
642 + Identity::file_size() as u64
643 + KnownPeersManager::file_size(KNOWN_PEERS_CACHE_SIZE) as u64;
644 let target_sector_count = {
646 let potentially_plottable_space = allocated_space.saturating_sub(fixed_space_usage)
647 / 100
648 * (100 - u64::from(cache_percentage));
649 (potentially_plottable_space - DISK_PAGE_SIZE as u64) / single_sector_overhead
652 };
653
654 if target_sector_count == 0 {
655 let mut single_plot_with_cache_space =
656 single_sector_overhead.div_ceil(100 - u64::from(cache_percentage)) * 100;
657 if single_plot_with_cache_space - single_sector_overhead
660 < DiskPieceCache::element_size() as u64
661 {
662 single_plot_with_cache_space =
663 single_sector_overhead + DiskPieceCache::element_size() as u64;
664 }
665
666 return Err(SingleDiskFarmError::InsufficientAllocatedSpace {
667 min_space: fixed_space_usage + single_plot_with_cache_space,
668 allocated_space,
669 });
670 }
671 let plot_file_size = target_sector_count * sector_size;
672 let plot_file_size = plot_file_size.div_ceil(DISK_PAGE_SIZE as u64) * DISK_PAGE_SIZE as u64;
674
675 let piece_cache_capacity = if cache_percentage > 0 {
677 let cache_space = allocated_space
678 - fixed_space_usage
679 - plot_file_size
680 - (sector_metadata_size * target_sector_count);
681 (cache_space / u64::from(DiskPieceCache::element_size())) as u32
682 } else {
683 0
684 };
685 let target_sector_count = match u16::try_from(target_sector_count).map(SectorIndex::from) {
686 Ok(target_sector_count) if target_sector_count < SectorIndex::MAX => {
687 u16::from(target_sector_count)
688 }
689 _ => {
690 let max_sectors = u16::from(SectorIndex::MAX) - 1;
693 return Err(SingleDiskFarmError::FarmTooLarge {
694 allocated_space: target_sector_count * sector_size,
695 allocated_sectors: target_sector_count,
696 max_space: max_sectors as u64 * sector_size,
697 max_sectors,
698 });
699 }
700 };
701
702 Ok(Self {
703 piece_cache_file_size: u64::from(piece_cache_capacity)
704 * u64::from(DiskPieceCache::element_size()),
705 piece_cache_capacity,
706 plot_file_size,
707 target_sector_count,
708 metadata_file_size: RESERVED_PLOT_METADATA
709 + sector_metadata_size * u64::from(target_sector_count),
710 })
711 }
712}
713
714type Handler<A> = Bag<HandlerFn<A>, A>;
715
716#[derive(Default, Debug)]
717struct Handlers {
718 sector_update: Handler<(SectorIndex, SectorUpdate)>,
719 farming_notification: Handler<FarmingNotification>,
720 solution: Handler<SolutionResponse>,
721}
722
723struct SingleDiskFarmInit {
724 identity: Identity,
725 single_disk_farm_info: SingleDiskFarmInfo,
726 single_disk_farm_info_lock: Option<SingleDiskFarmInfoLock>,
727 plot_file: Arc<DirectIoFileWrapper>,
728 metadata_file: DirectIoFileWrapper,
729 metadata_header: PlotMetadataHeader,
730 target_sector_count: u16,
731 sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
732 piece_cache_capacity: u32,
733 plot_cache: DiskPlotCache,
734}
735
736#[derive(Debug)]
741#[must_use = "Plot does not function properly unless run() method is called"]
742pub struct SingleDiskFarm {
743 farmer_protocol_info: FarmerProtocolInfo,
744 single_disk_farm_info: SingleDiskFarmInfo,
745 sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
747 pieces_in_sector: u16,
748 total_sectors_count: u16,
749 span: Span,
750 tasks: FuturesUnordered<BackgroundTask>,
751 handlers: Arc<Handlers>,
752 piece_cache: SingleDiskPieceCache,
753 plot_cache: DiskPlotCache,
754 piece_reader: DiskPieceReader,
755 start_sender: Option<broadcast::Sender<()>>,
757 stop_sender: Option<broadcast::Sender<()>>,
759 _single_disk_farm_info_lock: Option<SingleDiskFarmInfoLock>,
760}
761
762impl Drop for SingleDiskFarm {
763 #[inline]
764 fn drop(&mut self) {
765 self.piece_reader.close_all_readers();
766 self.start_sender.take();
768 self.stop_sender.take();
770 }
771}
772
773#[async_trait(?Send)]
774impl Farm for SingleDiskFarm {
775 fn id(&self) -> &FarmId {
776 self.id()
777 }
778
779 fn total_sectors_count(&self) -> u16 {
780 self.total_sectors_count
781 }
782
783 fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static> {
784 Arc::new(self.plotted_sectors())
785 }
786
787 fn piece_reader(&self) -> Arc<dyn PieceReader + 'static> {
788 Arc::new(self.piece_reader())
789 }
790
791 fn on_sector_update(
792 &self,
793 callback: HandlerFn<(SectorIndex, SectorUpdate)>,
794 ) -> Box<dyn farm::HandlerId> {
795 Box::new(self.on_sector_update(callback))
796 }
797
798 fn on_farming_notification(
799 &self,
800 callback: HandlerFn<FarmingNotification>,
801 ) -> Box<dyn farm::HandlerId> {
802 Box::new(self.on_farming_notification(callback))
803 }
804
805 fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn farm::HandlerId> {
806 Box::new(self.on_solution(callback))
807 }
808
809 fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
810 Box::pin((*self).run())
811 }
812}
813
814impl SingleDiskFarm {
815 pub const PLOT_FILE: &'static str = "plot.bin";
817 pub const METADATA_FILE: &'static str = "metadata.bin";
819 const SUPPORTED_PLOT_VERSION: u8 = 0;
820
821 pub async fn new<NC, PosTable>(
823 options: SingleDiskFarmOptions<'_, NC>,
824 farm_index: usize,
825 ) -> Result<Self, SingleDiskFarmError>
826 where
827 NC: NodeClient + Clone,
828 PosTable: Table,
829 {
830 let span = Span::current();
831
832 let SingleDiskFarmOptions {
833 directory,
834 farmer_app_info,
835 allocated_space,
836 max_pieces_in_sector,
837 node_client,
838 reward_address,
839 plotter,
840 erasure_coding,
841 cache_percentage,
842 farming_thread_pool_size,
843 plotting_delay,
844 global_mutex,
845 max_plotting_sectors_per_farm,
846 disable_farm_locking,
847 registry,
848 create,
849 } = options;
850
851 let single_disk_farm_init_fut = task::spawn_blocking({
852 let directory = directory.clone();
853 let farmer_app_info = farmer_app_info.clone();
854 let span = span.clone();
855
856 move || {
857 let _span_guard = span.enter();
858 Self::init(
859 &directory,
860 &farmer_app_info,
861 allocated_space,
862 max_pieces_in_sector,
863 cache_percentage,
864 disable_farm_locking,
865 create,
866 )
867 }
868 });
869
870 let single_disk_farm_init =
871 AsyncJoinOnDrop::new(single_disk_farm_init_fut, false).await??;
872
873 let SingleDiskFarmInit {
874 identity,
875 single_disk_farm_info,
876 single_disk_farm_info_lock,
877 plot_file,
878 metadata_file,
879 metadata_header,
880 target_sector_count,
881 sectors_metadata,
882 piece_cache_capacity,
883 plot_cache,
884 } = single_disk_farm_init;
885
886 let piece_cache = {
887 let FarmId::Ulid(id) = *single_disk_farm_info.id();
889 let id = PieceCacheId::Ulid(id);
890
891 SingleDiskPieceCache::new(
892 id,
893 if let Some(piece_cache_capacity) = NonZeroU32::new(piece_cache_capacity) {
894 Some(task::block_in_place(|| {
895 if let Some(registry) = registry {
896 DiskPieceCache::open(
897 &directory,
898 piece_cache_capacity,
899 Some(id),
900 Some(*registry.lock()),
901 )
902 } else {
903 DiskPieceCache::open(&directory, piece_cache_capacity, Some(id), None)
904 }
905 })?)
906 } else {
907 None
908 },
909 )
910 };
911
912 let public_key = *single_disk_farm_info.public_key();
913 let pieces_in_sector = single_disk_farm_info.pieces_in_sector();
914 let sector_size = sector_size(pieces_in_sector);
915
916 let metrics = registry.map(|registry| {
917 Arc::new(SingleDiskFarmMetrics::new(
918 *registry.lock(),
919 single_disk_farm_info.id(),
920 target_sector_count,
921 sectors_metadata.read_blocking().len() as u16,
922 ))
923 });
924
925 let (error_sender, error_receiver) = oneshot::channel();
926 let error_sender = Arc::new(Mutex::new(Some(error_sender)));
927
928 let tasks = FuturesUnordered::<BackgroundTask>::new();
929
930 tasks.push(Box::pin(async move {
931 if let Ok(error) = error_receiver.await {
932 return Err(error);
933 }
934
935 Ok(())
936 }));
937
938 let handlers = Arc::<Handlers>::default();
939 let (start_sender, mut start_receiver) = broadcast::channel::<()>(1);
940 let (stop_sender, mut stop_receiver) = broadcast::channel::<()>(1);
941 let sectors_being_modified = Arc::<AsyncRwLock<HashSet<SectorIndex>>>::default();
942 let (sectors_to_plot_sender, sectors_to_plot_receiver) = mpsc::channel(1);
943 let sectors_indices_left_to_plot = SectorIndex::new(metadata_header.plotted_sector_count)
945 ..SectorIndex::new(target_sector_count);
946
947 let farming_thread_pool = ThreadPoolBuilder::new()
948 .thread_name(move |thread_index| format!("farming-{farm_index}.{thread_index}"))
949 .num_threads(farming_thread_pool_size)
950 .spawn_handler(tokio_rayon_spawn_handler())
951 .build()
952 .map_err(SingleDiskFarmError::FailedToCreateThreadPool)?;
953 let farming_plot_fut = task::spawn_blocking(|| {
954 farming_thread_pool
955 .install(move || {
956 RayonFiles::open_with(directory.join(Self::PLOT_FILE), |path| {
957 DirectIoFileWrapper::open(path)
958 })
959 })
960 .map(|farming_plot| (farming_plot, farming_thread_pool))
961 });
962
963 let (farming_plot, farming_thread_pool) =
964 AsyncJoinOnDrop::new(farming_plot_fut, false).await??;
965
966 let plotting_join_handle = task::spawn_blocking({
967 let sectors_metadata = Arc::clone(§ors_metadata);
968 let handlers = Arc::clone(&handlers);
969 let sectors_being_modified = Arc::clone(§ors_being_modified);
970 let node_client = node_client.clone();
971 let plot_file = Arc::clone(&plot_file);
972 let error_sender = Arc::clone(&error_sender);
973 let span = span.clone();
974 let global_mutex = Arc::clone(&global_mutex);
975 let metrics = metrics.clone();
976
977 move || {
978 let _span_guard = span.enter();
979
980 let plotting_options = PlottingOptions {
981 metadata_header,
982 sectors_metadata: §ors_metadata,
983 sectors_being_modified: §ors_being_modified,
984 sectors_to_plot_receiver,
985 sector_plotting_options: SectorPlottingOptions {
986 public_key,
987 node_client: &node_client,
988 pieces_in_sector,
989 sector_size,
990 plot_file,
991 metadata_file: Arc::new(metadata_file),
992 handlers: &handlers,
993 global_mutex: &global_mutex,
994 plotter,
995 metrics,
996 },
997 max_plotting_sectors_per_farm,
998 };
999
1000 let plotting_fut = async {
1001 if start_receiver.recv().await.is_err() {
1002 return Ok(());
1004 }
1005
1006 if let Some(plotting_delay) = plotting_delay
1007 && plotting_delay.await.is_err()
1008 {
1009 return Ok(());
1011 }
1012
1013 plotting(plotting_options).await
1014 };
1015
1016 Handle::current().block_on(async {
1017 select! {
1018 plotting_result = plotting_fut.fuse() => {
1019 if let Err(error) = plotting_result
1020 && let Some(error_sender) = error_sender.lock().take()
1021 && let Err(error) = error_sender.send(error.into())
1022 {
1023 error!(
1024 %error,
1025 "Plotting failed to send error to background task"
1026 );
1027 }
1028 }
1029 _ = stop_receiver.recv().fuse() => {
1030 }
1032 }
1033 });
1034 }
1035 });
1036 let plotting_join_handle = AsyncJoinOnDrop::new(plotting_join_handle, false);
1037
1038 tasks.push(Box::pin(async move {
1039 plotting_join_handle.await.map_err(|_error| {
1041 BackgroundTaskError::BackgroundTaskPanicked {
1042 task: format!("plotting-{farm_index}"),
1043 }
1044 })
1045 }));
1046
1047 let public_key_hash = public_key.hash();
1048
1049 let plotting_scheduler_options = PlottingSchedulerOptions {
1050 public_key_hash,
1051 sectors_indices_left_to_plot,
1052 target_sector_count,
1053 last_archived_segment_index: farmer_app_info.protocol_info.history_size.segment_index(),
1054 min_sector_lifetime: farmer_app_info.protocol_info.min_sector_lifetime,
1055 node_client: node_client.clone(),
1056 handlers: Arc::clone(&handlers),
1057 sectors_metadata: Arc::clone(§ors_metadata),
1058 sectors_to_plot_sender,
1059 new_segment_processing_delay: NEW_SEGMENT_PROCESSING_DELAY,
1060 metrics: metrics.clone(),
1061 };
1062 tasks.push(Box::pin(plotting_scheduler(plotting_scheduler_options)));
1063
1064 let (slot_info_forwarder_sender, slot_info_forwarder_receiver) = mpsc::channel(0);
1065
1066 tasks.push(Box::pin({
1067 let node_client = node_client.clone();
1068 let metrics = metrics.clone();
1069
1070 async move {
1071 slot_notification_forwarder(&node_client, slot_info_forwarder_sender, metrics)
1072 .await
1073 .map_err(BackgroundTaskError::Farming)
1074 }
1075 }));
1076
1077 let farming_join_handle = task::spawn_blocking({
1078 let erasure_coding = erasure_coding.clone();
1079 let handlers = Arc::clone(&handlers);
1080 let sectors_being_modified = Arc::clone(§ors_being_modified);
1081 let sectors_metadata = Arc::clone(§ors_metadata);
1082 let mut start_receiver = start_sender.subscribe();
1083 let mut stop_receiver = stop_sender.subscribe();
1084 let node_client = node_client.clone();
1085 let span = span.clone();
1086 let global_mutex = Arc::clone(&global_mutex);
1087
1088 move || {
1089 let _span_guard = span.enter();
1090
1091 let farming_fut = async move {
1092 if start_receiver.recv().await.is_err() {
1093 return Ok(());
1095 }
1096
1097 let plot_audit = PlotAudit::new(&farming_plot);
1098
1099 let farming_options = FarmingOptions {
1100 public_key_hash,
1101 reward_address,
1102 node_client,
1103 plot_audit,
1104 sectors_metadata,
1105 erasure_coding,
1106 handlers,
1107 sectors_being_modified,
1108 slot_info_notifications: slot_info_forwarder_receiver,
1109 thread_pool: farming_thread_pool,
1110 global_mutex,
1111 metrics,
1112 };
1113 farming::<PosTable, _, _>(farming_options).await
1114 };
1115
1116 Handle::current().block_on(async {
1117 select! {
1118 farming_result = farming_fut.fuse() => {
1119 if let Err(error) = farming_result
1120 && let Some(error_sender) = error_sender.lock().take()
1121 && let Err(error) = error_sender.send(error.into())
1122 {
1123 error!(
1124 %error,
1125 "Farming failed to send error to background task",
1126 );
1127 }
1128 }
1129 _ = stop_receiver.recv().fuse() => {
1130 }
1132 }
1133 });
1134 }
1135 });
1136 let farming_join_handle = AsyncJoinOnDrop::new(farming_join_handle, false);
1137
1138 tasks.push(Box::pin(async move {
1139 farming_join_handle.await.map_err(|_error| {
1141 BackgroundTaskError::BackgroundTaskPanicked {
1142 task: format!("farming-{farm_index}"),
1143 }
1144 })
1145 }));
1146
1147 let (piece_reader, reading_fut) = DiskPieceReader::new::<PosTable>(
1148 public_key_hash,
1149 pieces_in_sector,
1150 plot_file,
1151 Arc::clone(§ors_metadata),
1152 erasure_coding,
1153 sectors_being_modified,
1154 global_mutex,
1155 );
1156
1157 let reading_join_handle = task::spawn_blocking({
1158 let mut stop_receiver = stop_sender.subscribe();
1159 let reading_fut = reading_fut.instrument(span.clone());
1160
1161 move || {
1162 Handle::current().block_on(async {
1163 select! {
1164 _ = reading_fut.fuse() => {
1165 }
1167 _ = stop_receiver.recv().fuse() => {
1168 }
1170 }
1171 });
1172 }
1173 });
1174
1175 let reading_join_handle = AsyncJoinOnDrop::new(reading_join_handle, false);
1176
1177 tasks.push(Box::pin(async move {
1178 reading_join_handle.await.map_err(|_error| {
1180 BackgroundTaskError::BackgroundTaskPanicked {
1181 task: format!("reading-{farm_index}"),
1182 }
1183 })
1184 }));
1185
1186 tasks.push(Box::pin(async move {
1187 match block_sealing(node_client, identity).await {
1188 Ok(block_sealing_fut) => {
1189 block_sealing_fut.await;
1190 }
1191 Err(error) => {
1192 return Err(BackgroundTaskError::BlockSealing(anyhow::anyhow!(
1193 "Failed to subscribe to block sealing notifications: {error}"
1194 )));
1195 }
1196 }
1197
1198 Ok(())
1199 }));
1200
1201 let farm = Self {
1202 farmer_protocol_info: farmer_app_info.protocol_info,
1203 single_disk_farm_info,
1204 sectors_metadata,
1205 pieces_in_sector,
1206 total_sectors_count: target_sector_count,
1207 span,
1208 tasks,
1209 handlers,
1210 piece_cache,
1211 plot_cache,
1212 piece_reader,
1213 start_sender: Some(start_sender),
1214 stop_sender: Some(stop_sender),
1215 _single_disk_farm_info_lock: single_disk_farm_info_lock,
1216 };
1217 Ok(farm)
1218 }
1219
1220 fn init(
1221 directory: &PathBuf,
1222 farmer_app_info: &FarmerAppInfo,
1223 allocated_space: u64,
1224 max_pieces_in_sector: u16,
1225 cache_percentage: u8,
1226 disable_farm_locking: bool,
1227 create: bool,
1228 ) -> Result<SingleDiskFarmInit, SingleDiskFarmError> {
1229 fs::create_dir_all(directory)?;
1230
1231 let identity = if create {
1232 Identity::open_or_create(directory)?
1233 } else {
1234 Identity::open(directory)?.ok_or_else(|| {
1235 IdentityError::Io(io::Error::new(
1236 io::ErrorKind::NotFound,
1237 "Farm does not exist and creation was explicitly disabled",
1238 ))
1239 })?
1240 };
1241 let public_key = identity.public_key();
1242
1243 let (single_disk_farm_info, single_disk_farm_info_lock) =
1244 match SingleDiskFarmInfo::load_from(directory)? {
1245 Some(mut single_disk_farm_info) => {
1246 if &farmer_app_info.genesis_root != single_disk_farm_info.genesis_root() {
1247 return Err(SingleDiskFarmError::WrongChain {
1248 id: *single_disk_farm_info.id(),
1249 correct_chain: hex::encode(single_disk_farm_info.genesis_root()),
1250 wrong_chain: hex::encode(farmer_app_info.genesis_root),
1251 });
1252 }
1253
1254 if &public_key != single_disk_farm_info.public_key() {
1255 return Err(SingleDiskFarmError::IdentityMismatch {
1256 id: *single_disk_farm_info.id(),
1257 correct_public_key: *single_disk_farm_info.public_key(),
1258 wrong_public_key: public_key,
1259 });
1260 }
1261
1262 let pieces_in_sector = single_disk_farm_info.pieces_in_sector();
1263
1264 if max_pieces_in_sector < pieces_in_sector {
1265 return Err(SingleDiskFarmError::InvalidPiecesInSector {
1266 id: *single_disk_farm_info.id(),
1267 max_supported: max_pieces_in_sector,
1268 initialized_with: pieces_in_sector,
1269 });
1270 }
1271
1272 if max_pieces_in_sector > pieces_in_sector {
1273 info!(
1274 pieces_in_sector,
1275 max_pieces_in_sector,
1276 "Farm initialized with smaller number of pieces in sector, farm needs \
1277 to be re-created for increase"
1278 );
1279 }
1280
1281 let mut single_disk_farm_info_lock = None;
1282
1283 if allocated_space != single_disk_farm_info.allocated_space() {
1284 info!(
1285 old_space = %ByteSize::b(single_disk_farm_info.allocated_space()).display().iec(),
1286 new_space = %ByteSize::b(allocated_space).display().iec(),
1287 "Farm size has changed"
1288 );
1289
1290 let new_allocated_space = allocated_space;
1291 match &mut single_disk_farm_info {
1292 SingleDiskFarmInfo::V0 {
1293 allocated_space, ..
1294 } => {
1295 *allocated_space = new_allocated_space;
1296 }
1297 }
1298
1299 single_disk_farm_info_lock =
1300 single_disk_farm_info.store_to(directory, !disable_farm_locking)?;
1301 } else if !disable_farm_locking {
1302 single_disk_farm_info_lock = Some(
1303 SingleDiskFarmInfo::try_lock(directory)
1304 .map_err(SingleDiskFarmError::LikelyAlreadyInUse)?,
1305 );
1306 }
1307
1308 (single_disk_farm_info, single_disk_farm_info_lock)
1309 }
1310 None => {
1311 let single_disk_farm_info = SingleDiskFarmInfo::new(
1312 FarmId::new(),
1313 farmer_app_info.genesis_root,
1314 public_key,
1315 max_pieces_in_sector,
1316 allocated_space,
1317 );
1318
1319 let single_disk_farm_info_lock =
1320 single_disk_farm_info.store_to(directory, !disable_farm_locking)?;
1321
1322 (single_disk_farm_info, single_disk_farm_info_lock)
1323 }
1324 };
1325
1326 let pieces_in_sector = single_disk_farm_info.pieces_in_sector();
1327 let sector_size = sector_size(pieces_in_sector) as u64;
1328 let sector_metadata_size = SectorMetadataChecksummed::encoded_size();
1329 let allocated_space_distribution = AllocatedSpaceDistribution::new(
1330 allocated_space,
1331 sector_size,
1332 cache_percentage,
1333 sector_metadata_size as u64,
1334 )?;
1335 let target_sector_count = allocated_space_distribution.target_sector_count;
1336
1337 let metadata_file_path = directory.join(Self::METADATA_FILE);
1338 let metadata_file = DirectIoFileWrapper::open(&metadata_file_path)?;
1339
1340 let metadata_size = metadata_file.size()?;
1341 let expected_metadata_size = allocated_space_distribution.metadata_file_size;
1342 let expected_metadata_size =
1344 expected_metadata_size.div_ceil(DISK_PAGE_SIZE as u64) * DISK_PAGE_SIZE as u64;
1345 let metadata_header = if metadata_size == 0 {
1346 let metadata_header = PlotMetadataHeader {
1347 version: SingleDiskFarm::SUPPORTED_PLOT_VERSION,
1348 plotted_sector_count: 0,
1349 };
1350
1351 metadata_file
1352 .preallocate(expected_metadata_size)
1353 .map_err(SingleDiskFarmError::CantPreallocateMetadataFile)?;
1354 metadata_file.write_all_at(metadata_header.encode().as_slice(), 0)?;
1355
1356 metadata_header
1357 } else {
1358 if metadata_size != expected_metadata_size {
1359 metadata_file
1362 .preallocate(expected_metadata_size)
1363 .map_err(SingleDiskFarmError::CantPreallocateMetadataFile)?;
1364 metadata_file.set_len(expected_metadata_size)?;
1366 }
1367
1368 let mut metadata_header_bytes = vec![0; PlotMetadataHeader::encoded_size()];
1369 metadata_file.read_exact_at(&mut metadata_header_bytes, 0)?;
1370
1371 let mut metadata_header =
1372 PlotMetadataHeader::decode(&mut metadata_header_bytes.as_ref())
1373 .map_err(SingleDiskFarmError::FailedToDecodeMetadataHeader)?;
1374
1375 if metadata_header.version != SingleDiskFarm::SUPPORTED_PLOT_VERSION {
1376 return Err(SingleDiskFarmError::UnexpectedMetadataVersion(
1377 metadata_header.version,
1378 ));
1379 }
1380
1381 if metadata_header.plotted_sector_count > target_sector_count {
1382 metadata_header.plotted_sector_count = target_sector_count;
1383 metadata_file.write_all_at(&metadata_header.encode(), 0)?;
1384 }
1385
1386 metadata_header
1387 };
1388
1389 let sectors_metadata = {
1390 let mut sectors_metadata =
1391 Vec::<SectorMetadataChecksummed>::with_capacity(usize::from(target_sector_count));
1392
1393 let mut sector_metadata_bytes = vec![0; sector_metadata_size];
1394 for sector_index in
1395 SectorIndex::ZERO..SectorIndex::new(metadata_header.plotted_sector_count)
1396 {
1397 let sector_offset =
1398 RESERVED_PLOT_METADATA + sector_metadata_size as u64 * u64::from(sector_index);
1399 metadata_file.read_exact_at(&mut sector_metadata_bytes, sector_offset)?;
1400
1401 let sector_metadata =
1402 match SectorMetadataChecksummed::decode(&mut sector_metadata_bytes.as_ref()) {
1403 Ok(sector_metadata) => sector_metadata,
1404 Err(error) => {
1405 warn!(
1406 path = %metadata_file_path.display(),
1407 %error,
1408 %sector_index,
1409 "Failed to decode sector metadata, replacing with dummy expired \
1410 sector metadata"
1411 );
1412
1413 let dummy_sector = SectorMetadataChecksummed::from(SectorMetadata {
1414 sector_index,
1415 pieces_in_sector,
1416 s_bucket_sizes: Box::new([0; Record::NUM_S_BUCKETS]),
1417 history_size: HistorySize::from(SegmentIndex::ZERO),
1418 });
1419 metadata_file.write_all_at(&dummy_sector.encode(), sector_offset)?;
1420
1421 dummy_sector
1422 }
1423 };
1424 sectors_metadata.push(sector_metadata);
1425 }
1426
1427 Arc::new(AsyncRwLock::new(sectors_metadata))
1428 };
1429
1430 let plot_file = DirectIoFileWrapper::open(directory.join(Self::PLOT_FILE))?;
1431
1432 if plot_file.size()? != allocated_space_distribution.plot_file_size {
1433 plot_file
1436 .preallocate(allocated_space_distribution.plot_file_size)
1437 .map_err(SingleDiskFarmError::CantPreallocatePlotFile)?;
1438 plot_file.set_len(allocated_space_distribution.plot_file_size)?;
1440 }
1441
1442 let plot_file = Arc::new(plot_file);
1443
1444 let plot_cache = DiskPlotCache::new(
1445 &plot_file,
1446 §ors_metadata,
1447 target_sector_count,
1448 sector_size,
1449 );
1450
1451 Ok(SingleDiskFarmInit {
1452 identity,
1453 single_disk_farm_info,
1454 single_disk_farm_info_lock,
1455 plot_file,
1456 metadata_file,
1457 metadata_header,
1458 target_sector_count,
1459 sectors_metadata,
1460 piece_cache_capacity: allocated_space_distribution.piece_cache_capacity,
1461 plot_cache,
1462 })
1463 }
1464
1465 pub fn collect_summary(directory: PathBuf) -> SingleDiskFarmSummary {
1467 let single_disk_farm_info = match SingleDiskFarmInfo::load_from(&directory) {
1468 Ok(Some(single_disk_farm_info)) => single_disk_farm_info,
1469 Ok(None) => {
1470 return SingleDiskFarmSummary::NotFound { directory };
1471 }
1472 Err(error) => {
1473 return SingleDiskFarmSummary::Error { directory, error };
1474 }
1475 };
1476
1477 SingleDiskFarmSummary::Found {
1478 info: single_disk_farm_info,
1479 directory,
1480 }
1481 }
1482
1483 pub fn effective_disk_usage(
1489 directory: &Path,
1490 cache_percentage: u8,
1491 ) -> Result<u64, SingleDiskFarmError> {
1492 let mut effective_disk_usage;
1493 match SingleDiskFarmInfo::load_from(directory)? {
1494 Some(single_disk_farm_info) => {
1495 let allocated_space_distribution = AllocatedSpaceDistribution::new(
1496 single_disk_farm_info.allocated_space(),
1497 sector_size(single_disk_farm_info.pieces_in_sector()) as u64,
1498 cache_percentage,
1499 SectorMetadataChecksummed::encoded_size() as u64,
1500 )?;
1501
1502 effective_disk_usage = single_disk_farm_info.allocated_space();
1503 effective_disk_usage -= Identity::file_size() as u64;
1504 effective_disk_usage -= allocated_space_distribution.metadata_file_size;
1505 effective_disk_usage -= allocated_space_distribution.plot_file_size;
1506 effective_disk_usage -= allocated_space_distribution.piece_cache_file_size;
1507 }
1508 None => {
1509 effective_disk_usage = 0;
1511 }
1512 };
1513
1514 if Identity::open(directory)?.is_some() {
1515 effective_disk_usage += Identity::file_size() as u64;
1516 }
1517
1518 match OpenOptions::new()
1519 .read(true)
1520 .open(directory.join(Self::METADATA_FILE))
1521 {
1522 Ok(metadata_file) => {
1523 effective_disk_usage += metadata_file.size()?;
1524 }
1525 Err(error) => {
1526 if error.kind() == io::ErrorKind::NotFound {
1527 } else {
1529 return Err(error.into());
1530 }
1531 }
1532 };
1533
1534 match OpenOptions::new()
1535 .read(true)
1536 .open(directory.join(Self::PLOT_FILE))
1537 {
1538 Ok(plot_file) => {
1539 effective_disk_usage += plot_file.size()?;
1540 }
1541 Err(error) => {
1542 if error.kind() == io::ErrorKind::NotFound {
1543 } else {
1545 return Err(error.into());
1546 }
1547 }
1548 };
1549
1550 match OpenOptions::new()
1551 .read(true)
1552 .open(directory.join(DiskPieceCache::FILE_NAME))
1553 {
1554 Ok(piece_cache) => {
1555 effective_disk_usage += piece_cache.size()?;
1556 }
1557 Err(error) => {
1558 if error.kind() == io::ErrorKind::NotFound {
1559 } else {
1561 return Err(error.into());
1562 }
1563 }
1564 };
1565
1566 Ok(effective_disk_usage)
1567 }
1568
1569 pub fn read_all_sectors_metadata(
1571 directory: &Path,
1572 ) -> io::Result<Vec<SectorMetadataChecksummed>> {
1573 let metadata_file = DirectIoFileWrapper::open(directory.join(Self::METADATA_FILE))?;
1574
1575 let metadata_size = metadata_file.size()?;
1576 let sector_metadata_size = SectorMetadataChecksummed::encoded_size();
1577
1578 let mut metadata_header_bytes = vec![0; PlotMetadataHeader::encoded_size()];
1579 metadata_file.read_exact_at(&mut metadata_header_bytes, 0)?;
1580
1581 let metadata_header = PlotMetadataHeader::decode(&mut metadata_header_bytes.as_ref())
1582 .map_err(|error| {
1583 io::Error::other(format!("Failed to decode metadata header: {error}"))
1584 })?;
1585
1586 if metadata_header.version != SingleDiskFarm::SUPPORTED_PLOT_VERSION {
1587 return Err(io::Error::other(format!(
1588 "Unsupported metadata version {}",
1589 metadata_header.version
1590 )));
1591 }
1592
1593 let mut sectors_metadata = Vec::<SectorMetadataChecksummed>::with_capacity(
1594 ((metadata_size - RESERVED_PLOT_METADATA) / sector_metadata_size as u64) as usize,
1595 );
1596
1597 let mut sector_metadata_bytes = vec![0; sector_metadata_size];
1598 for sector_index in 0..metadata_header.plotted_sector_count {
1599 metadata_file.read_exact_at(
1600 &mut sector_metadata_bytes,
1601 RESERVED_PLOT_METADATA + sector_metadata_size as u64 * u64::from(sector_index),
1602 )?;
1603 sectors_metadata.push(
1604 SectorMetadataChecksummed::decode(&mut sector_metadata_bytes.as_ref()).map_err(
1605 |error| io::Error::other(format!("Failed to decode sector metadata: {error}")),
1606 )?,
1607 );
1608 }
1609
1610 Ok(sectors_metadata)
1611 }
1612
1613 pub fn id(&self) -> &FarmId {
1615 self.single_disk_farm_info.id()
1616 }
1617
1618 pub fn info(&self) -> &SingleDiskFarmInfo {
1620 &self.single_disk_farm_info
1621 }
1622
1623 pub fn total_sectors_count(&self) -> u16 {
1625 self.total_sectors_count
1626 }
1627
1628 pub fn plotted_sectors(&self) -> SingleDiskPlottedSectors {
1630 SingleDiskPlottedSectors {
1631 public_key_hash: self.single_disk_farm_info.public_key().hash(),
1632 pieces_in_sector: self.pieces_in_sector,
1633 farmer_protocol_info: self.farmer_protocol_info,
1634 sectors_metadata: Arc::clone(&self.sectors_metadata),
1635 }
1636 }
1637
1638 pub fn piece_cache(&self) -> SingleDiskPieceCache {
1640 self.piece_cache.clone()
1641 }
1642
1643 pub fn plot_cache(&self) -> DiskPlotCache {
1645 self.plot_cache.clone()
1646 }
1647
1648 pub fn piece_reader(&self) -> DiskPieceReader {
1650 self.piece_reader.clone()
1651 }
1652
1653 pub fn on_sector_update(&self, callback: HandlerFn<(SectorIndex, SectorUpdate)>) -> HandlerId {
1655 self.handlers.sector_update.add(callback)
1656 }
1657
1658 pub fn on_farming_notification(&self, callback: HandlerFn<FarmingNotification>) -> HandlerId {
1660 self.handlers.farming_notification.add(callback)
1661 }
1662
1663 pub fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> HandlerId {
1665 self.handlers.solution.add(callback)
1666 }
1667
1668 pub async fn run(mut self) -> anyhow::Result<()> {
1670 if let Some(start_sender) = self.start_sender.take() {
1671 let _ = start_sender.send(());
1673 }
1674
1675 while let Some(result) = self.tasks.next().instrument(self.span.clone()).await {
1676 result?;
1677 }
1678
1679 Ok(())
1680 }
1681
1682 pub fn wipe(directory: &Path) -> io::Result<()> {
1684 let single_disk_info_info_path = directory.join(SingleDiskFarmInfo::FILE_NAME);
1685 match SingleDiskFarmInfo::load_from(directory) {
1686 Ok(Some(single_disk_farm_info)) => {
1687 info!("Found single disk farm {}", single_disk_farm_info.id());
1688 }
1689 Ok(None) => {
1690 return Err(io::Error::new(
1691 io::ErrorKind::NotFound,
1692 format!(
1693 "Single disk farm info not found at {}",
1694 single_disk_info_info_path.display()
1695 ),
1696 ));
1697 }
1698 Err(error) => {
1699 warn!("Found unknown single disk farm: {}", error);
1700 }
1701 }
1702
1703 {
1704 let plot = directory.join(Self::PLOT_FILE);
1705 if plot.exists() {
1706 info!("Deleting plot file at {}", plot.display());
1707 fs::remove_file(plot)?;
1708 }
1709 }
1710 {
1711 let metadata = directory.join(Self::METADATA_FILE);
1712 if metadata.exists() {
1713 info!("Deleting metadata file at {}", metadata.display());
1714 fs::remove_file(metadata)?;
1715 }
1716 }
1717 {
1720 let identity = directory.join("identity.bin");
1721 if identity.exists() {
1722 info!("Deleting identity file at {}", identity.display());
1723 fs::remove_file(identity)?;
1724 }
1725 }
1726
1727 DiskPieceCache::wipe(directory)?;
1728
1729 info!(
1730 "Deleting info file at {}",
1731 single_disk_info_info_path.display()
1732 );
1733 fs::remove_file(single_disk_info_info_path)
1734 }
1735
1736 pub fn scrub(
1739 directory: &Path,
1740 disable_farm_locking: bool,
1741 target: ScrubTarget,
1742 dry_run: bool,
1743 ) -> Result<(), SingleDiskFarmScrubError> {
1744 let span = Span::current();
1745
1746 if dry_run {
1747 info!("Dry run is used, no changes will be written to disk");
1748 }
1749
1750 if target.metadata() || target.plot() {
1751 let info = {
1752 let file = directory.join(SingleDiskFarmInfo::FILE_NAME);
1753 info!(path = %file.display(), "Checking info file");
1754
1755 match SingleDiskFarmInfo::load_from(directory) {
1756 Ok(Some(info)) => info,
1757 Ok(None) => {
1758 return Err(SingleDiskFarmScrubError::FarmInfoFileDoesNotExist { file });
1759 }
1760 Err(error) => {
1761 return Err(SingleDiskFarmScrubError::FarmInfoCantBeOpened { file, error });
1762 }
1763 }
1764 };
1765
1766 let _single_disk_farm_info_lock = if disable_farm_locking {
1767 None
1768 } else {
1769 Some(
1770 SingleDiskFarmInfo::try_lock(directory)
1771 .map_err(SingleDiskFarmScrubError::LikelyAlreadyInUse)?,
1772 )
1773 };
1774
1775 let identity = {
1776 let file = directory.join(Identity::FILE_NAME);
1777 info!(path = %file.display(), "Checking identity file");
1778
1779 match Identity::open(directory) {
1780 Ok(Some(identity)) => identity,
1781 Ok(None) => {
1782 return Err(SingleDiskFarmScrubError::IdentityFileDoesNotExist { file });
1783 }
1784 Err(error) => {
1785 return Err(SingleDiskFarmScrubError::IdentityCantBeOpened { file, error });
1786 }
1787 }
1788 };
1789
1790 if &identity.public_key() != info.public_key() {
1791 return Err(SingleDiskFarmScrubError::PublicKeyMismatch {
1792 identity: identity.public_key(),
1793 info: *info.public_key(),
1794 });
1795 }
1796
1797 let sector_metadata_size = SectorMetadataChecksummed::encoded_size();
1798
1799 let metadata_file_path = directory.join(Self::METADATA_FILE);
1800 let (metadata_file, mut metadata_header) = {
1801 info!(path = %metadata_file_path.display(), "Checking metadata file");
1802
1803 let metadata_file = match OpenOptions::new()
1804 .read(true)
1805 .write(!dry_run)
1806 .open(&metadata_file_path)
1807 {
1808 Ok(metadata_file) => metadata_file,
1809 Err(error) => {
1810 return Err(if error.kind() == io::ErrorKind::NotFound {
1811 SingleDiskFarmScrubError::MetadataFileDoesNotExist {
1812 file: metadata_file_path,
1813 }
1814 } else {
1815 SingleDiskFarmScrubError::MetadataCantBeOpened {
1816 file: metadata_file_path,
1817 error,
1818 }
1819 });
1820 }
1821 };
1822
1823 let _ = metadata_file.advise_sequential_access();
1825
1826 let metadata_size = match metadata_file.size() {
1827 Ok(metadata_size) => metadata_size,
1828 Err(error) => {
1829 return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize {
1830 file: metadata_file_path,
1831 error,
1832 });
1833 }
1834 };
1835
1836 if metadata_size < RESERVED_PLOT_METADATA {
1837 return Err(SingleDiskFarmScrubError::MetadataFileTooSmall {
1838 file: metadata_file_path,
1839 reserved_size: RESERVED_PLOT_METADATA,
1840 size: metadata_size,
1841 });
1842 }
1843
1844 let mut metadata_header = {
1845 let mut reserved_metadata = vec![0; RESERVED_PLOT_METADATA as usize];
1846
1847 if let Err(error) = metadata_file.read_exact_at(&mut reserved_metadata, 0) {
1848 return Err(SingleDiskFarmScrubError::FailedToReadBytes {
1849 file: metadata_file_path,
1850 size: RESERVED_PLOT_METADATA,
1851 offset: 0,
1852 error,
1853 });
1854 }
1855
1856 PlotMetadataHeader::decode(&mut reserved_metadata.as_slice())
1857 .map_err(SingleDiskFarmScrubError::FailedToDecodeMetadataHeader)?
1858 };
1859
1860 if metadata_header.version != SingleDiskFarm::SUPPORTED_PLOT_VERSION {
1861 return Err(SingleDiskFarmScrubError::UnexpectedMetadataVersion(
1862 metadata_header.version,
1863 ));
1864 }
1865
1866 let plotted_sector_count = metadata_header.plotted_sector_count;
1867
1868 let expected_metadata_size = RESERVED_PLOT_METADATA
1869 + sector_metadata_size as u64 * u64::from(plotted_sector_count);
1870
1871 if metadata_size < expected_metadata_size {
1872 warn!(
1873 %metadata_size,
1874 %expected_metadata_size,
1875 "Metadata file size is smaller than expected, shrinking number of plotted \
1876 sectors to correct value"
1877 );
1878
1879 metadata_header.plotted_sector_count =
1880 ((metadata_size - RESERVED_PLOT_METADATA) / sector_metadata_size as u64)
1881 as u16;
1882 let metadata_header_bytes = metadata_header.encode();
1883
1884 if !dry_run
1885 && let Err(error) = metadata_file.write_all_at(&metadata_header_bytes, 0)
1886 {
1887 return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
1888 file: metadata_file_path,
1889 size: metadata_header_bytes.len() as u64,
1890 offset: 0,
1891 error,
1892 });
1893 }
1894 }
1895
1896 (metadata_file, metadata_header)
1897 };
1898
1899 let pieces_in_sector = info.pieces_in_sector();
1900 let sector_size = sector_size(pieces_in_sector) as u64;
1901
1902 let plot_file_path = directory.join(Self::PLOT_FILE);
1903 let plot_file = {
1904 let plot_file_path = directory.join(Self::PLOT_FILE);
1905 info!(path = %plot_file_path.display(), "Checking plot file");
1906
1907 let plot_file = match OpenOptions::new()
1908 .read(true)
1909 .write(!dry_run)
1910 .open(&plot_file_path)
1911 {
1912 Ok(plot_file) => plot_file,
1913 Err(error) => {
1914 return Err(if error.kind() == io::ErrorKind::NotFound {
1915 SingleDiskFarmScrubError::MetadataFileDoesNotExist {
1916 file: plot_file_path,
1917 }
1918 } else {
1919 SingleDiskFarmScrubError::MetadataCantBeOpened {
1920 file: plot_file_path,
1921 error,
1922 }
1923 });
1924 }
1925 };
1926
1927 let _ = plot_file.advise_sequential_access();
1929
1930 let plot_size = match plot_file.size() {
1931 Ok(metadata_size) => metadata_size,
1932 Err(error) => {
1933 return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize {
1934 file: plot_file_path,
1935 error,
1936 });
1937 }
1938 };
1939
1940 let min_expected_plot_size =
1941 u64::from(metadata_header.plotted_sector_count) * sector_size;
1942 if plot_size < min_expected_plot_size {
1943 warn!(
1944 %plot_size,
1945 %min_expected_plot_size,
1946 "Plot file size is smaller than expected, shrinking number of plotted \
1947 sectors to correct value"
1948 );
1949
1950 metadata_header.plotted_sector_count = (plot_size / sector_size) as u16;
1951 let metadata_header_bytes = metadata_header.encode();
1952
1953 if !dry_run
1954 && let Err(error) = metadata_file.write_all_at(&metadata_header_bytes, 0)
1955 {
1956 return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
1957 file: plot_file_path,
1958 size: metadata_header_bytes.len() as u64,
1959 offset: 0,
1960 error,
1961 });
1962 }
1963 }
1964
1965 plot_file
1966 };
1967
1968 let sector_bytes_range = 0..(sector_size as usize - Blake3Hash::SIZE);
1969
1970 info!("Checking sectors and corresponding metadata");
1971 (0..metadata_header.plotted_sector_count)
1972 .into_par_iter()
1973 .map(SectorIndex::new)
1974 .map_init(
1975 || vec![0u8; Record::SIZE],
1976 |scratch_buffer, sector_index| {
1977 let _span_guard = span.enter();
1978
1979 let offset = RESERVED_PLOT_METADATA
1980 + u64::from(sector_index) * sector_metadata_size as u64;
1981 if let Err(error) = metadata_file
1982 .read_exact_at(&mut scratch_buffer[..sector_metadata_size], offset)
1983 {
1984 warn!(
1985 path = %metadata_file_path.display(),
1986 %error,
1987 %offset,
1988 size = %sector_metadata_size,
1989 %sector_index,
1990 "Failed to read sector metadata, replacing with dummy expired \
1991 sector metadata"
1992 );
1993
1994 if !dry_run {
1995 write_dummy_sector_metadata(
1996 &metadata_file,
1997 &metadata_file_path,
1998 sector_index,
1999 pieces_in_sector,
2000 )?;
2001 }
2002 return Ok(());
2003 }
2004
2005 let sector_metadata = match SectorMetadataChecksummed::decode(
2006 &mut &scratch_buffer[..sector_metadata_size],
2007 ) {
2008 Ok(sector_metadata) => sector_metadata,
2009 Err(error) => {
2010 warn!(
2011 path = %metadata_file_path.display(),
2012 %error,
2013 %sector_index,
2014 "Failed to decode sector metadata, replacing with dummy \
2015 expired sector metadata"
2016 );
2017
2018 if !dry_run {
2019 write_dummy_sector_metadata(
2020 &metadata_file,
2021 &metadata_file_path,
2022 sector_index,
2023 pieces_in_sector,
2024 )?;
2025 }
2026 return Ok(());
2027 }
2028 };
2029
2030 if sector_metadata.sector_index != sector_index {
2031 warn!(
2032 path = %metadata_file_path.display(),
2033 %sector_index,
2034 found_sector_index = %sector_metadata.sector_index,
2035 "Sector index mismatch, replacing with dummy expired sector \
2036 metadata"
2037 );
2038
2039 if !dry_run {
2040 write_dummy_sector_metadata(
2041 &metadata_file,
2042 &metadata_file_path,
2043 sector_index,
2044 pieces_in_sector,
2045 )?;
2046 }
2047 return Ok(());
2048 }
2049
2050 if sector_metadata.pieces_in_sector != pieces_in_sector {
2051 warn!(
2052 path = %metadata_file_path.display(),
2053 %sector_index,
2054 %pieces_in_sector,
2055 found_pieces_in_sector = sector_metadata.pieces_in_sector,
2056 "Pieces in sector mismatch, replacing with dummy expired sector \
2057 metadata"
2058 );
2059
2060 if !dry_run {
2061 write_dummy_sector_metadata(
2062 &metadata_file,
2063 &metadata_file_path,
2064 sector_index,
2065 pieces_in_sector,
2066 )?;
2067 }
2068 return Ok(());
2069 }
2070
2071 if target.plot() {
2072 let mut hasher = blake3::Hasher::new();
2073 for offset_in_sector in
2075 sector_bytes_range.clone().step_by(scratch_buffer.len())
2076 {
2077 let offset =
2078 u64::from(sector_index) * sector_size + offset_in_sector as u64;
2079 let bytes_to_read = (offset_in_sector + scratch_buffer.len())
2080 .min(sector_bytes_range.end)
2081 - offset_in_sector;
2082
2083 let bytes = &mut scratch_buffer[..bytes_to_read];
2084
2085 if let Err(error) = plot_file.read_exact_at(bytes, offset) {
2086 warn!(
2087 path = %plot_file_path.display(),
2088 %error,
2089 %sector_index,
2090 %offset,
2091 size = %bytes.len() as u64,
2092 "Failed to read sector bytes"
2093 );
2094
2095 continue;
2096 }
2097
2098 hasher.update(bytes);
2099 }
2100
2101 let actual_checksum = *hasher.finalize().as_bytes();
2102 let mut expected_checksum = [0; Blake3Hash::SIZE];
2103 {
2104 let offset = u64::from(sector_index) * sector_size
2105 + sector_bytes_range.end as u64;
2106 if let Err(error) =
2107 plot_file.read_exact_at(&mut expected_checksum, offset)
2108 {
2109 warn!(
2110 path = %plot_file_path.display(),
2111 %error,
2112 %sector_index,
2113 %offset,
2114 size = %expected_checksum.len() as u64,
2115 "Failed to read sector checksum bytes"
2116 );
2117 }
2118 }
2119
2120 if actual_checksum != expected_checksum {
2122 warn!(
2123 path = %plot_file_path.display(),
2124 %sector_index,
2125 actual_checksum = %hex::encode(actual_checksum),
2126 expected_checksum = %hex::encode(expected_checksum),
2127 "Plotted sector checksum mismatch, replacing with dummy \
2128 expired sector"
2129 );
2130
2131 if !dry_run {
2132 write_dummy_sector_metadata(
2133 &metadata_file,
2134 &metadata_file_path,
2135 sector_index,
2136 pieces_in_sector,
2137 )?;
2138 }
2139
2140 scratch_buffer.fill(0);
2141
2142 hasher.reset();
2143 for offset_in_sector in
2145 sector_bytes_range.clone().step_by(scratch_buffer.len())
2146 {
2147 let offset = u64::from(sector_index) * sector_size
2148 + offset_in_sector as u64;
2149 let bytes_to_write = (offset_in_sector + scratch_buffer.len())
2150 .min(sector_bytes_range.end)
2151 - offset_in_sector;
2152 let bytes = &mut scratch_buffer[..bytes_to_write];
2153
2154 if !dry_run
2155 && let Err(error) = plot_file.write_all_at(bytes, offset)
2156 {
2157 return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2158 file: plot_file_path.clone(),
2159 size: scratch_buffer.len() as u64,
2160 offset,
2161 error,
2162 });
2163 }
2164
2165 hasher.update(bytes);
2166 }
2167 {
2169 let checksum = *hasher.finalize().as_bytes();
2170 let offset = u64::from(sector_index) * sector_size
2171 + sector_bytes_range.end as u64;
2172 if !dry_run
2173 && let Err(error) =
2174 plot_file.write_all_at(&checksum, offset)
2175 {
2176 return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2177 file: plot_file_path.clone(),
2178 size: checksum.len() as u64,
2179 offset,
2180 error,
2181 });
2182 }
2183 }
2184
2185 return Ok(());
2186 }
2187 }
2188
2189 trace!(%sector_index, "Sector is in good shape");
2190
2191 Ok(())
2192 },
2193 )
2194 .try_for_each({
2195 let span = &span;
2196 let checked_sectors = AtomicUsize::new(0);
2197
2198 move |result| {
2199 let _span_guard = span.enter();
2200
2201 let checked_sectors = checked_sectors.fetch_add(1, Ordering::Relaxed);
2202 if checked_sectors > 1 && checked_sectors.is_multiple_of(10) {
2203 info!(
2204 "Checked {}/{} sectors",
2205 checked_sectors, metadata_header.plotted_sector_count
2206 );
2207 }
2208
2209 result
2210 }
2211 })?;
2212 }
2213
2214 if target.cache() {
2215 Self::scrub_cache(directory, dry_run)?;
2216 }
2217
2218 info!("Farm check completed");
2219
2220 Ok(())
2221 }
2222
2223 fn scrub_cache(directory: &Path, dry_run: bool) -> Result<(), SingleDiskFarmScrubError> {
2224 let span = Span::current();
2225
2226 let file = directory.join(DiskPieceCache::FILE_NAME);
2227 info!(path = %file.display(), "Checking cache file");
2228
2229 let cache_file = match OpenOptions::new().read(true).write(!dry_run).open(&file) {
2230 Ok(plot_file) => plot_file,
2231 Err(error) => {
2232 return if error.kind() == io::ErrorKind::NotFound {
2233 warn!(
2234 file = %file.display(),
2235 "Cache file does not exist, this is expected in farming cluster"
2236 );
2237 Ok(())
2238 } else {
2239 Err(SingleDiskFarmScrubError::CacheCantBeOpened { file, error })
2240 };
2241 }
2242 };
2243
2244 let _ = cache_file.advise_sequential_access();
2246
2247 let cache_size = match cache_file.size() {
2248 Ok(cache_size) => cache_size,
2249 Err(error) => {
2250 return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize { file, error });
2251 }
2252 };
2253
2254 let element_size = DiskPieceCache::element_size();
2255 let number_of_cached_elements = cache_size / u64::from(element_size);
2256 let dummy_element = vec![0; element_size as usize];
2257 (0..number_of_cached_elements)
2258 .into_par_iter()
2259 .map_with(vec![0; element_size as usize], |element, cache_offset| {
2260 let _span_guard = span.enter();
2261
2262 let offset = cache_offset * u64::from(element_size);
2263 if let Err(error) = cache_file.read_exact_at(element, offset) {
2264 warn!(
2265 path = %file.display(),
2266 %cache_offset,
2267 size = %element.len() as u64,
2268 %offset,
2269 %error,
2270 "Failed to read cached piece, replacing with dummy element"
2271 );
2272
2273 if !dry_run && let Err(error) = cache_file.write_all_at(&dummy_element, offset)
2274 {
2275 return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2276 file: file.clone(),
2277 size: u64::from(element_size),
2278 offset,
2279 error,
2280 });
2281 }
2282
2283 return Ok(());
2284 }
2285
2286 let (index_and_piece_bytes, expected_checksum) =
2287 element.split_at(element_size as usize - Blake3Hash::SIZE);
2288 let actual_checksum = *blake3::hash(index_and_piece_bytes).as_bytes();
2289 if actual_checksum != expected_checksum && element != &dummy_element {
2290 warn!(
2291 %cache_offset,
2292 actual_checksum = %hex::encode(actual_checksum),
2293 expected_checksum = %hex::encode(expected_checksum),
2294 "Cached piece checksum mismatch, replacing with dummy element"
2295 );
2296
2297 if !dry_run && let Err(error) = cache_file.write_all_at(&dummy_element, offset)
2298 {
2299 return Err(SingleDiskFarmScrubError::FailedToWriteBytes {
2300 file: file.clone(),
2301 size: u64::from(element_size),
2302 offset,
2303 error,
2304 });
2305 }
2306
2307 return Ok(());
2308 }
2309
2310 Ok(())
2311 })
2312 .try_for_each({
2313 let span = &span;
2314 let checked_elements = AtomicUsize::new(0);
2315
2316 move |result| {
2317 let _span_guard = span.enter();
2318
2319 let checked_elements = checked_elements.fetch_add(1, Ordering::Relaxed);
2320 if checked_elements > 1 && checked_elements.is_multiple_of(1000) {
2321 info!(
2322 "Checked {}/{} cache elements",
2323 checked_elements, number_of_cached_elements
2324 );
2325 }
2326
2327 result
2328 }
2329 })?;
2330
2331 Ok(())
2332 }
2333}
2334
2335fn write_dummy_sector_metadata(
2336 metadata_file: &File,
2337 metadata_file_path: &Path,
2338 sector_index: SectorIndex,
2339 pieces_in_sector: u16,
2340) -> Result<(), SingleDiskFarmScrubError> {
2341 let dummy_sector_bytes = SectorMetadataChecksummed::from(SectorMetadata {
2342 sector_index,
2343 pieces_in_sector,
2344 s_bucket_sizes: Box::new([0; Record::NUM_S_BUCKETS]),
2345 history_size: HistorySize::from(SegmentIndex::ZERO),
2346 })
2347 .encode();
2348 let sector_offset = RESERVED_PLOT_METADATA
2349 + u64::from(sector_index) * SectorMetadataChecksummed::encoded_size() as u64;
2350 metadata_file
2351 .write_all_at(&dummy_sector_bytes, sector_offset)
2352 .map_err(|error| SingleDiskFarmScrubError::FailedToWriteBytes {
2353 file: metadata_file_path.to_path_buf(),
2354 size: dummy_sector_bytes.len() as u64,
2355 offset: sector_offset,
2356 error,
2357 })
2358}