Skip to main content

ab_farmer/single_disk_farm/
plotting.rs

1use crate::farm::{SectorExpirationDetails, SectorPlottingDetails, SectorUpdate};
2use crate::node_client::NodeClient;
3use crate::plotter::{Plotter, SectorPlottingProgress};
4use crate::single_disk_farm::direct_io_file_wrapper::DirectIoFileWrapper;
5use crate::single_disk_farm::metrics::{SectorState, SingleDiskFarmMetrics};
6use crate::single_disk_farm::{
7    BackgroundTaskError, Handlers, PlotMetadataHeader, RESERVED_PLOT_METADATA,
8};
9use ab_core_primitives::ed25519::Ed25519PublicKey;
10use ab_core_primitives::hashes::Blake3Hash;
11use ab_core_primitives::pieces::PieceOffset;
12use ab_core_primitives::sectors::{SectorId, SectorIndex};
13use ab_core_primitives::segments::{HistorySize, SegmentIndex};
14use ab_farmer_components::file_ext::FileExt;
15use ab_farmer_components::plotting::PlottedSector;
16use ab_farmer_components::sector::SectorMetadataChecksummed;
17use ab_farmer_components::shard_commitment::ShardCommitmentsRootsCache;
18use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock, Semaphore, SemaphoreGuard};
19use futures::channel::{mpsc, oneshot};
20use futures::stream::FuturesOrdered;
21use futures::{FutureExt, SinkExt, StreamExt, select};
22use parity_scale_codec::Encode;
23use parking_lot::RwLock;
24use rand::prelude::*;
25use std::collections::HashSet;
26use std::future::Future;
27use std::io;
28use std::num::NonZeroUsize;
29use std::ops::Range;
30use std::pin::pin;
31use std::sync::Arc;
32use std::time::{Duration, Instant};
33use thiserror::Error;
34use tokio::sync::watch;
35use tokio::task;
36use tracing::{Instrument, debug, info, info_span, trace, warn};
37
38const FARMER_APP_INFO_RETRY_INTERVAL: Duration = Duration::from_millis(500);
39const PLOTTING_RETRY_DELAY: Duration = Duration::from_secs(1);
40
41pub(super) struct SectorToPlot {
42    sector_index: SectorIndex,
43    /// Progress so far in % (not including this sector)
44    progress: f32,
45    /// Whether this is the last sector queued so far
46    last_queued: bool,
47    acknowledgement_sender: oneshot::Sender<()>,
48}
49
50/// Errors that happen during plotting
51#[derive(Debug, Error)]
52pub enum PlottingError {
53    /// Failed to retrieve farmer info
54    #[error("Failed to retrieve farmer info: {error}")]
55    FailedToGetFarmerInfo {
56        /// Lower-level error
57        error: anyhow::Error,
58    },
59    /// Failed to get super segment root
60    #[error("Failed to get super segment root: {error}")]
61    FailedToGetSuperSegmentRoot {
62        /// Lower-level error
63        error: anyhow::Error,
64    },
65    /// Missing archived segment header
66    #[error("Missing archived segment header: {segment_index}")]
67    MissingArchivedSegmentHeader {
68        /// Segment index that was missing
69        segment_index: SegmentIndex,
70    },
71    /// Failed to subscribe to archived segments
72    #[error("Failed to subscribe to archived segments: {error}")]
73    FailedToSubscribeArchivedSegments {
74        /// Lower-level error
75        error: anyhow::Error,
76    },
77    /// Low-level plotting error
78    #[error("Low-level plotting error: {0}")]
79    LowLevel(String),
80    /// I/O error occurred
81    #[error("Plotting I/O error: {0}")]
82    Io(#[from] io::Error),
83    /// Background downloading panicked
84    #[error("Background downloading panicked")]
85    BackgroundDownloadingPanicked,
86}
87
88pub(super) struct SectorPlottingOptions<'a, NC> {
89    pub(super) public_key: Ed25519PublicKey,
90    pub(super) shard_commitments_roots_cache: ShardCommitmentsRootsCache,
91    pub(super) node_client: &'a NC,
92    pub(super) pieces_in_sector: u16,
93    pub(super) sector_size: usize,
94    pub(super) plot_file: Arc<DirectIoFileWrapper>,
95    pub(super) metadata_file: Arc<DirectIoFileWrapper>,
96    pub(super) handlers: &'a Handlers,
97    pub(super) global_mutex: &'a AsyncMutex<()>,
98    pub(super) plotter: Arc<dyn Plotter>,
99    pub(super) metrics: Option<Arc<SingleDiskFarmMetrics>>,
100}
101
102pub(super) struct PlottingOptions<'a, NC> {
103    pub(super) metadata_header: PlotMetadataHeader,
104    pub(super) sectors_metadata: &'a AsyncRwLock<Vec<SectorMetadataChecksummed>>,
105    pub(super) sectors_being_modified: &'a AsyncRwLock<HashSet<SectorIndex>>,
106    pub(super) sectors_to_plot_receiver: mpsc::Receiver<SectorToPlot>,
107    pub(super) sector_plotting_options: SectorPlottingOptions<'a, NC>,
108    pub(super) max_plotting_sectors_per_farm: NonZeroUsize,
109}
110
111/// Starts plotting process.
112///
113/// NOTE: Returned future is async, but does blocking operations and should be running in dedicated
114/// thread.
115pub(super) async fn plotting<NC>(
116    plotting_options: PlottingOptions<'_, NC>,
117) -> Result<(), PlottingError>
118where
119    NC: NodeClient,
120{
121    let PlottingOptions {
122        mut metadata_header,
123        sectors_metadata,
124        sectors_being_modified,
125        mut sectors_to_plot_receiver,
126        sector_plotting_options,
127        max_plotting_sectors_per_farm,
128    } = plotting_options;
129
130    let sector_plotting_options = &sector_plotting_options;
131    let plotting_semaphore = Semaphore::new(max_plotting_sectors_per_farm.get());
132    let max_used_history_size = RwLock::new(
133        sectors_metadata
134            .read()
135            .await
136            .iter()
137            .map(|sector_metadata| sector_metadata.history_size)
138            .max()
139            .unwrap_or(HistorySize::ONE),
140    );
141    let mut sectors_being_plotted = FuturesOrdered::new();
142    // Channel size is intentionally unbounded for easier analysis, but it is bounded by plotting
143    // semaphore in practice due to permit stored in `SectorPlottingResult`
144    let (sector_plotting_result_sender, mut sector_plotting_result_receiver) = mpsc::unbounded();
145    let process_plotting_result_fut = async move {
146        while let Some(sector_plotting_result) = sector_plotting_result_receiver.next().await {
147            process_plotting_result(
148                sector_plotting_result,
149                sectors_metadata,
150                sectors_being_modified,
151                &mut metadata_header,
152                Arc::clone(&sector_plotting_options.metadata_file),
153            )
154            .await?;
155        }
156
157        unreachable!(
158            "Stream will not end before the rest of the plotting process is shutting down"
159        );
160    };
161    let process_plotting_result_fut = process_plotting_result_fut.fuse();
162    let mut process_plotting_result_fut = pin!(process_plotting_result_fut);
163
164    // Wait for new sectors to plot from `sectors_to_plot_receiver` and wait for sectors that
165    // already started plotting to finish plotting and then update metadata header
166    loop {
167        select! {
168            maybe_sector_to_plot = sectors_to_plot_receiver.next() => {
169                let Some(sector_to_plot) = maybe_sector_to_plot else {
170                    break;
171                };
172
173                let sector_index = sector_to_plot.sector_index;
174                let sector_plotting_init_fut = plot_single_sector(
175                    sector_to_plot,
176                    sector_plotting_options,
177                    sectors_metadata,
178                    sectors_being_modified,
179                    &plotting_semaphore,
180                    &max_used_history_size,
181                )
182                    .instrument(info_span!("", %sector_index))
183                    .fuse();
184                let mut sector_plotting_init_fut = pin!(sector_plotting_init_fut);
185
186                // Wait for plotting of new sector to start (backpressure), while also waiting
187                // for sectors that already started plotting to finish plotting and then update
188                // metadata header
189                loop {
190                    select! {
191                        sector_plotting_init_result = sector_plotting_init_fut => {
192                            let sector_plotting_fut = match sector_plotting_init_result {
193                                PlotSingleSectorResult::Scheduled(future) => future,
194                                PlotSingleSectorResult::Skipped => {
195                                    break;
196                                }
197                                PlotSingleSectorResult::FatalError(error) => {
198                                    return Err(error);
199                                }
200                            };
201                            sectors_being_plotted.push_back(
202                                sector_plotting_fut.instrument(info_span!("", %sector_index))
203                            );
204                            break;
205                        }
206                        maybe_sector_plotting_result = sectors_being_plotted.select_next_some() => {
207                            sector_plotting_result_sender
208                                .unbounded_send(maybe_sector_plotting_result?)
209                                .expect("Sending means receiver is not dropped yet; qed");
210                        }
211                        result = process_plotting_result_fut => {
212                            return result;
213                        }
214                    }
215                }
216            }
217            maybe_sector_plotting_result = sectors_being_plotted.select_next_some() => {
218                sector_plotting_result_sender
219                    .unbounded_send(maybe_sector_plotting_result?)
220                    .expect("Sending means receiver is not dropped yet; qed");
221            }
222            result = process_plotting_result_fut => {
223                return result;
224            }
225        }
226    }
227
228    Ok(())
229}
230
231async fn process_plotting_result(
232    sector_plotting_result: SectorPlottingResult<'_>,
233    sectors_metadata: &AsyncRwLock<Vec<SectorMetadataChecksummed>>,
234    sectors_being_modified: &AsyncRwLock<HashSet<SectorIndex>>,
235    metadata_header: &mut PlotMetadataHeader,
236    metadata_file: Arc<DirectIoFileWrapper>,
237) -> Result<(), PlottingError> {
238    let SectorPlottingResult {
239        sector_metadata,
240        replotting,
241        last_queued,
242        plotting_permit,
243    } = sector_plotting_result;
244
245    let sector_index = sector_metadata.sector_index;
246
247    {
248        let mut sectors_metadata = sectors_metadata.write().await;
249        // If exists then we're replotting, otherwise we create sector for the first time
250        if let Some(existing_sector_metadata) = sectors_metadata.get_mut(usize::from(sector_index))
251        {
252            *existing_sector_metadata = sector_metadata;
253        } else {
254            sectors_metadata.push(sector_metadata);
255        }
256    }
257
258    // Inform others that this sector is no longer being modified
259    sectors_being_modified.write().await.remove(&sector_index);
260
261    if u16::from(sector_index) + 1 > metadata_header.plotted_sector_count {
262        metadata_header.plotted_sector_count = u16::from(sector_index) + 1;
263
264        let encoded_metadata_header = metadata_header.encode();
265        let write_fut =
266            task::spawn_blocking(move || metadata_file.write_all_at(&encoded_metadata_header, 0));
267        write_fut.await.map_err(|error| {
268            PlottingError::LowLevel(format!("Failed to spawn blocking tokio task: {error}"))
269        })??;
270    }
271
272    if last_queued {
273        if replotting {
274            info!("Replotting complete");
275        } else {
276            info!("Initial plotting complete");
277        }
278    }
279
280    drop(plotting_permit);
281
282    Ok(())
283}
284
285enum PlotSingleSectorResult<F> {
286    Scheduled(F),
287    Skipped,
288    FatalError(PlottingError),
289}
290
291struct SectorPlottingResult<'a> {
292    sector_metadata: SectorMetadataChecksummed,
293    replotting: bool,
294    last_queued: bool,
295    plotting_permit: SemaphoreGuard<'a>,
296}
297
298async fn plot_single_sector<'a, NC>(
299    sector_to_plot: SectorToPlot,
300    sector_plotting_options: &'a SectorPlottingOptions<'a, NC>,
301    sectors_metadata: &'a AsyncRwLock<Vec<SectorMetadataChecksummed>>,
302    sectors_being_modified: &'a AsyncRwLock<HashSet<SectorIndex>>,
303    plotting_semaphore: &'a Semaphore,
304    max_used_history_size: &RwLock<HistorySize>,
305) -> PlotSingleSectorResult<
306    impl Future<Output = Result<SectorPlottingResult<'a>, PlottingError>> + 'a,
307>
308where
309    NC: NodeClient,
310{
311    let SectorPlottingOptions {
312        public_key,
313        shard_commitments_roots_cache,
314        node_client,
315        pieces_in_sector,
316        sector_size,
317        plot_file,
318        metadata_file,
319        handlers,
320        global_mutex,
321        plotter,
322        metrics,
323    } = sector_plotting_options;
324
325    let SectorToPlot {
326        sector_index,
327        progress,
328        last_queued,
329        acknowledgement_sender: _acknowledgement_sender,
330    } = sector_to_plot;
331    trace!("Preparing to plot sector");
332
333    // Inform others that this sector is being modified
334    {
335        let mut sectors_being_modified = sectors_being_modified.write().await;
336        if !sectors_being_modified.insert(sector_index) {
337            debug!("Skipped sector plotting, it is already in progress");
338            return PlotSingleSectorResult::Skipped;
339        }
340    }
341
342    let plotting_permit = plotting_semaphore.acquire().await;
343
344    let maybe_old_sector_metadata = sectors_metadata
345        .read()
346        .await
347        .get(usize::from(sector_index))
348        .cloned();
349    let replotting = maybe_old_sector_metadata.is_some();
350
351    if let Some(metrics) = metrics {
352        metrics.sector_plotting.inc();
353    }
354    let sector_state = SectorUpdate::Plotting(SectorPlottingDetails::Starting {
355        progress,
356        replotting,
357        last_queued,
358    });
359    handlers
360        .sector_update
361        .call_simple(&(sector_index, sector_state));
362
363    let start = Instant::now();
364
365    // This `loop` is a workaround for an edge-case in local setup if expiration is configured to 1.
366    // In that scenario we get a replotting notification essentially straight from a block import
367    // pipeline of the node, before the block is imported. This can result in a later request for
368    // farmer app info to return old data, meaning we're replotting exactly the same sector that
369    // just expired.
370    let mut protocol_info = loop {
371        let protocol_info = match node_client.farmer_app_info().await {
372            Ok(farmer_app_info) => farmer_app_info.protocol_info,
373            Err(error) => {
374                return PlotSingleSectorResult::FatalError(PlottingError::FailedToGetFarmerInfo {
375                    error,
376                });
377            }
378        };
379
380        if let Some(old_sector_metadata) = &maybe_old_sector_metadata
381            && protocol_info.history_size <= old_sector_metadata.history_size
382        {
383            if protocol_info.min_sector_lifetime == HistorySize::ONE {
384                debug!(
385                    current_history_size = %protocol_info.history_size,
386                    old_sector_history_size = %old_sector_metadata.history_size,
387                    "Latest protocol history size is not yet newer than old sector history \
388                    size, wait for a bit and try again"
389                );
390                tokio::time::sleep(FARMER_APP_INFO_RETRY_INTERVAL).await;
391                continue;
392            } else {
393                debug!(
394                    current_history_size = %protocol_info.history_size,
395                    old_sector_history_size = %old_sector_metadata.history_size,
396                    "Skipped sector plotting, likely redundant due to redundant archived \
397                    segment notification"
398                );
399                return PlotSingleSectorResult::Skipped;
400            }
401        }
402
403        break protocol_info;
404    };
405
406    // Maintain the current max history size unless it is too old. This reduces the diversity of
407    // shard assignment, making it easier for farmers to run nodes.
408    {
409        let mut max_used_history_size = max_used_history_size.write();
410        if maybe_old_sector_metadata
411            .as_ref()
412            .map(|sector_metadata| sector_metadata.history_size)
413            != Some(*max_used_history_size)
414        {
415            // TODO: This may hypothetically result in plotting expired sectors. Add sector
416            //  expiration check in the future if it ends up being a real concern.
417            protocol_info.history_size = *max_used_history_size;
418        } else {
419            *max_used_history_size = protocol_info.history_size;
420        }
421    }
422
423    let (progress_sender, mut progress_receiver) = mpsc::channel(10);
424    let shard_commitments_root = shard_commitments_roots_cache.get(protocol_info.history_size);
425
426    // Initiate plotting
427    plotter
428        .plot_sector(
429            *public_key,
430            shard_commitments_root,
431            sector_index,
432            protocol_info,
433            *pieces_in_sector,
434            replotting,
435            progress_sender,
436        )
437        .await;
438
439    if replotting {
440        info!("Replotting sector ({progress:.2}% complete)");
441    } else {
442        info!("Plotting sector ({progress:.2}% complete)");
443    }
444
445    PlotSingleSectorResult::Scheduled(async move {
446        let plotted_sector = loop {
447            match plot_single_sector_internal(
448                sector_index,
449                *sector_size,
450                plot_file,
451                metadata_file,
452                handlers,
453                global_mutex,
454                progress_receiver,
455                metrics,
456            )
457            .await?
458            {
459                Ok(plotted_sector) => {
460                    break plotted_sector;
461                }
462                Err(error) => {
463                    warn!(
464                        %error,
465                        "Failed to plot sector, retrying in {PLOTTING_RETRY_DELAY:?}"
466                    );
467
468                    tokio::time::sleep(PLOTTING_RETRY_DELAY).await;
469                }
470            }
471
472            let (retry_progress_sender, retry_progress_receiver) = mpsc::channel(10);
473            progress_receiver = retry_progress_receiver;
474
475            // Initiate plotting
476            plotter
477                .plot_sector(
478                    *public_key,
479                    shard_commitments_root,
480                    sector_index,
481                    protocol_info,
482                    *pieces_in_sector,
483                    replotting,
484                    retry_progress_sender,
485                )
486                .await;
487
488            if replotting {
489                info!("Replotting sector retry");
490            } else {
491                info!("Plotting sector retry");
492            }
493        };
494
495        let maybe_old_plotted_sector = maybe_old_sector_metadata.map(|old_sector_metadata| {
496            let old_history_size = old_sector_metadata.history_size;
497
498            PlottedSector {
499                sector_id: plotted_sector.sector_id,
500                sector_index: plotted_sector.sector_index,
501                sector_metadata: old_sector_metadata,
502                piece_indexes: {
503                    let mut piece_indexes = Vec::with_capacity(usize::from(*pieces_in_sector));
504                    (PieceOffset::ZERO..)
505                        .take(usize::from(*pieces_in_sector))
506                        .map(|piece_offset| {
507                            plotted_sector.sector_id.derive_piece_index(
508                                piece_offset,
509                                old_history_size,
510                                protocol_info.max_pieces_in_sector,
511                                protocol_info.recent_segments,
512                                protocol_info.recent_history_fraction,
513                            )
514                        })
515                        .collect_into(&mut piece_indexes);
516                    piece_indexes
517                },
518            }
519        });
520
521        if replotting {
522            debug!("Sector replotted successfully");
523        } else {
524            debug!("Sector plotted successfully");
525        }
526
527        let sector_metadata = plotted_sector.sector_metadata.clone();
528
529        let time = start.elapsed();
530        if let Some(metrics) = metrics {
531            metrics.sector_plotting_time.observe(time.as_secs_f64());
532            metrics.sector_plotted.inc();
533            metrics.update_sector_state(SectorState::Plotted);
534        }
535        let sector_state = SectorUpdate::Plotting(SectorPlottingDetails::Finished {
536            plotted_sector,
537            old_plotted_sector: maybe_old_plotted_sector,
538            time,
539        });
540        handlers
541            .sector_update
542            .call_simple(&(sector_index, sector_state));
543
544        Ok(SectorPlottingResult {
545            sector_metadata,
546            replotting,
547            last_queued,
548            plotting_permit,
549        })
550    })
551}
552
553/// Outer error is used to indicate irrecoverable plotting errors, while inner result is for
554/// recoverable errors
555#[expect(clippy::too_many_arguments)]
556async fn plot_single_sector_internal(
557    sector_index: SectorIndex,
558    sector_size: usize,
559    plot_file: &Arc<DirectIoFileWrapper>,
560    metadata_file: &Arc<DirectIoFileWrapper>,
561    handlers: &Handlers,
562    global_mutex: &AsyncMutex<()>,
563    mut progress_receiver: mpsc::Receiver<SectorPlottingProgress>,
564    metrics: &Option<Arc<SingleDiskFarmMetrics>>,
565) -> Result<Result<PlottedSector, PlottingError>, PlottingError> {
566    // Process plotting progress notifications
567    let progress_processor_fut = async {
568        while let Some(progress) = progress_receiver.next().await {
569            match progress {
570                SectorPlottingProgress::Downloading => {
571                    if let Some(metrics) = metrics {
572                        metrics.sector_downloading.inc();
573                    }
574                    handlers.sector_update.call_simple(&(
575                        sector_index,
576                        SectorUpdate::Plotting(SectorPlottingDetails::Downloading),
577                    ));
578                }
579                SectorPlottingProgress::Downloaded(time) => {
580                    if let Some(metrics) = metrics {
581                        metrics.sector_downloading_time.observe(time.as_secs_f64());
582                        metrics.sector_downloaded.inc();
583                    }
584                    handlers.sector_update.call_simple(&(
585                        sector_index,
586                        SectorUpdate::Plotting(SectorPlottingDetails::Downloaded(time)),
587                    ));
588                }
589                SectorPlottingProgress::Encoding => {
590                    if let Some(metrics) = metrics {
591                        metrics.sector_encoding.inc();
592                    }
593                    handlers.sector_update.call_simple(&(
594                        sector_index,
595                        SectorUpdate::Plotting(SectorPlottingDetails::Encoding),
596                    ));
597                }
598                SectorPlottingProgress::Encoded(time) => {
599                    if let Some(metrics) = metrics {
600                        metrics.sector_encoding_time.observe(time.as_secs_f64());
601                        metrics.sector_encoded.inc();
602                    }
603                    handlers.sector_update.call_simple(&(
604                        sector_index,
605                        SectorUpdate::Plotting(SectorPlottingDetails::Encoded(time)),
606                    ));
607                }
608                SectorPlottingProgress::Finished {
609                    plotted_sector,
610                    time: _,
611                    sector,
612                } => {
613                    return Ok((plotted_sector, sector));
614                }
615                SectorPlottingProgress::Error { error } => {
616                    if let Some(metrics) = metrics {
617                        metrics.sector_plotting_error.inc();
618                    }
619                    handlers.sector_update.call_simple(&(
620                        sector_index,
621                        SectorUpdate::Plotting(SectorPlottingDetails::Error(error.clone())),
622                    ));
623                    return Err(error);
624                }
625            }
626        }
627
628        Err("Plotting progress stream ended before plotting finished".to_string())
629    };
630
631    let (plotted_sector, mut sector) = match progress_processor_fut.await {
632        Ok(result) => result,
633        Err(error) => {
634            return Ok(Err(PlottingError::LowLevel(error)));
635        }
636    };
637
638    {
639        // Take mutex briefly to make sure writing is allowed right now
640        global_mutex.lock().await;
641
642        if let Some(metrics) = metrics {
643            metrics.sector_writing.inc();
644        }
645        handlers.sector_update.call_simple(&(
646            sector_index,
647            SectorUpdate::Plotting(SectorPlottingDetails::Writing),
648        ));
649
650        let start = Instant::now();
651
652        {
653            let sector_write_base_offset = u64::from(sector_index) * sector_size as u64;
654            let mut total_received = 0;
655            let mut sector_write_offset = sector_write_base_offset;
656            while let Some(maybe_sector_chunk) = sector.next().await {
657                let sector_chunk = match maybe_sector_chunk {
658                    Ok(sector_chunk) => sector_chunk,
659                    Err(error) => {
660                        return Ok(Err(PlottingError::LowLevel(format!(
661                            "Sector chunk receive error: {error}"
662                        ))));
663                    }
664                };
665
666                total_received += sector_chunk.len();
667
668                if total_received > sector_size {
669                    return Ok(Err(PlottingError::LowLevel(format!(
670                        "Received too many bytes {total_received} instead of expected \
671                        {sector_size} bytes"
672                    ))));
673                }
674
675                let sector_chunk_size = sector_chunk.len() as u64;
676
677                trace!(sector_chunk_size, "Writing sector chunk to disk");
678                let write_fut = task::spawn_blocking({
679                    let plot_file = Arc::clone(plot_file);
680
681                    move || plot_file.write_all_at(&sector_chunk, sector_write_offset)
682                });
683                write_fut.await.map_err(|error| {
684                    PlottingError::LowLevel(format!("Failed to spawn blocking tokio task: {error}"))
685                })??;
686
687                sector_write_offset += sector_chunk_size;
688            }
689            drop(sector);
690
691            if total_received != sector_size {
692                return Ok(Err(PlottingError::LowLevel(format!(
693                    "Received only {total_received} sector bytes out of {sector_size} \
694                    expected bytes"
695                ))));
696            }
697        }
698        {
699            let encoded_sector_metadata = plotted_sector.sector_metadata.encode();
700            let write_fut = task::spawn_blocking({
701                let metadata_file = Arc::clone(metadata_file);
702
703                move || {
704                    metadata_file.write_all_at(
705                        &encoded_sector_metadata,
706                        RESERVED_PLOT_METADATA
707                            + (u64::from(sector_index) * encoded_sector_metadata.len() as u64),
708                    )
709                }
710            });
711            write_fut.await.map_err(|error| {
712                PlottingError::LowLevel(format!("Failed to spawn blocking tokio task: {error}"))
713            })??;
714        }
715
716        let time = start.elapsed();
717        if let Some(metrics) = metrics {
718            metrics.sector_writing_time.observe(time.as_secs_f64());
719            metrics.sector_written.inc();
720        }
721        handlers.sector_update.call_simple(&(
722            sector_index,
723            SectorUpdate::Plotting(SectorPlottingDetails::Written(time)),
724        ));
725    }
726
727    Ok(Ok(plotted_sector))
728}
729
730pub(super) struct PlottingSchedulerOptions<NC> {
731    pub(super) public_key_hash: Blake3Hash,
732    pub(super) shard_commitments_roots_cache: ShardCommitmentsRootsCache,
733    pub(super) sectors_indices_left_to_plot: Range<SectorIndex>,
734    pub(super) target_sector_count: u16,
735    pub(super) last_archived_segment_index: SegmentIndex,
736    pub(super) min_sector_lifetime: HistorySize,
737    pub(super) node_client: NC,
738    pub(super) handlers: Arc<Handlers>,
739    pub(super) sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
740    pub(super) sectors_to_plot_sender: mpsc::Sender<SectorToPlot>,
741    // Max delay between segment header being acknowledged by farmer and potentially
742    // triggering replotting
743    pub(super) new_segment_processing_delay: Duration,
744    pub(super) metrics: Option<Arc<SingleDiskFarmMetrics>>,
745}
746
747pub(super) async fn plotting_scheduler<NC>(
748    plotting_scheduler_options: PlottingSchedulerOptions<NC>,
749) -> Result<(), BackgroundTaskError>
750where
751    NC: NodeClient,
752{
753    let PlottingSchedulerOptions {
754        public_key_hash,
755        shard_commitments_roots_cache,
756        sectors_indices_left_to_plot,
757        target_sector_count,
758        last_archived_segment_index,
759        min_sector_lifetime,
760        node_client,
761        handlers,
762        sectors_metadata,
763        sectors_to_plot_sender,
764        new_segment_processing_delay,
765        metrics,
766    } = plotting_scheduler_options;
767
768    // Create a proxy channel with atomically updatable last archived segment that allows to not
769    // buffer messages from RPC subscription but also access the most recent value at any time
770    let (new_segment_index_sender, new_segment_index_receiver) =
771        watch::channel(last_archived_segment_index);
772
773    let read_new_super_segments_notifications_fut = read_new_segment_index_notifications(
774        &node_client,
775        new_segment_index_sender,
776        new_segment_processing_delay,
777    );
778
779    let send_plotting_notifications_fut = send_plotting_notifications(
780        public_key_hash,
781        &shard_commitments_roots_cache,
782        sectors_indices_left_to_plot,
783        target_sector_count,
784        min_sector_lifetime,
785        &node_client,
786        &handlers,
787        sectors_metadata,
788        new_segment_index_receiver,
789        sectors_to_plot_sender,
790        &metrics,
791    );
792
793    select! {
794        result = read_new_super_segments_notifications_fut.fuse() => {
795            result
796        }
797        result = send_plotting_notifications_fut.fuse() => {
798            result
799        }
800    }
801}
802
803async fn read_new_segment_index_notifications<NC>(
804    node_client: &NC,
805    new_super_segments_sender: watch::Sender<SegmentIndex>,
806    new_segment_processing_delay: Duration,
807) -> Result<(), BackgroundTaskError>
808where
809    NC: NodeClient,
810{
811    info!("Subscribing to new super segments");
812
813    let mut super_segment_headers_notifications = node_client
814        .subscribe_new_super_segment_headers()
815        .await
816        .map_err(|error| PlottingError::FailedToSubscribeArchivedSegments { error })?;
817
818    while let Some(super_segment_header) = super_segment_headers_notifications.next().await {
819        debug!(?super_segment_header, "New super segment");
820
821        // There is no urgent need to rush replotting sectors immediately, and this delay allows for
822        // newly archived pieces to be both cached locally and on other farmers on the network
823        let delay = Duration::from_secs(rand::rng().random_range(
824            new_segment_processing_delay.as_secs() / 10..=new_segment_processing_delay.as_secs(),
825        ));
826        tokio::time::sleep(delay).await;
827
828        if new_super_segments_sender
829            .send(super_segment_header.max_segment_index.as_inner())
830            .is_err()
831        {
832            break;
833        }
834    }
835
836    Ok(())
837}
838
839struct SectorToReplot {
840    sector_index: SectorIndex,
841    expires_at: SegmentIndex,
842}
843
844#[expect(clippy::too_many_arguments)]
845async fn send_plotting_notifications<NC>(
846    public_key_hash: Blake3Hash,
847    shard_commitments_roots_cache: &ShardCommitmentsRootsCache,
848    sectors_indices_left_to_plot: Range<SectorIndex>,
849    target_sector_count: u16,
850    min_sector_lifetime: HistorySize,
851    node_client: &NC,
852    handlers: &Handlers,
853    sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
854    mut new_segment_index_receiver: watch::Receiver<SegmentIndex>,
855    mut sectors_to_plot_sender: mpsc::Sender<SectorToPlot>,
856    metrics: &Option<Arc<SingleDiskFarmMetrics>>,
857) -> Result<(), BackgroundTaskError>
858where
859    NC: NodeClient,
860{
861    // Finish initial plotting if some sectors were not plotted fully yet
862    for sector_index in sectors_indices_left_to_plot {
863        let (acknowledgement_sender, acknowledgement_receiver) = oneshot::channel();
864        if let Err(error) = sectors_to_plot_sender
865            .send(SectorToPlot {
866                sector_index,
867                progress: u16::from(sector_index) as f32 / target_sector_count as f32 * 100.0,
868                last_queued: u16::from(sector_index) + 1 == target_sector_count,
869                acknowledgement_sender,
870            })
871            .await
872        {
873            warn!(%error, "Failed to send sector index for initial plotting");
874            return Ok(());
875        }
876
877        // We do not care if message was sent back or sender was just dropped
878        let _ = acknowledgement_receiver.await;
879    }
880
881    let mut sectors_expire_at = vec![None::<SegmentIndex>; usize::from(target_sector_count)];
882    // 10% capacity is generous and should prevent reallocation in most cases
883    let mut sectors_to_replot = Vec::with_capacity(usize::from(target_sector_count) / 10);
884
885    loop {
886        let segment_index = *new_segment_index_receiver.borrow_and_update();
887        trace!(%segment_index, "New archived segment received");
888
889        let sectors_metadata = sectors_metadata.read().await;
890        let sectors_to_check = sectors_metadata
891            .iter()
892            .map(|sector_metadata| (sector_metadata.sector_index, sector_metadata.history_size));
893        for (sector_index, history_size) in sectors_to_check {
894            if let Some(Some(expires_at)) =
895                sectors_expire_at.get(usize::from(sector_index)).copied()
896            {
897                trace!(
898                    %sector_index,
899                    %history_size,
900                    %expires_at,
901                    "Checking sector for expiration"
902                );
903                // +1 means we will start replotting a bit before it actually expires to avoid
904                // storing expired sectors
905                if expires_at <= (segment_index + SegmentIndex::ONE) {
906                    debug!(
907                        %sector_index,
908                        %history_size,
909                        %expires_at,
910                        "Sector expires soon #1, scheduling replotting"
911                    );
912
913                    let expiration_details = if expires_at <= segment_index {
914                        if let Some(metrics) = metrics {
915                            metrics.update_sector_state(SectorState::Expired);
916                        }
917                        SectorExpirationDetails::Expired
918                    } else {
919                        if let Some(metrics) = metrics {
920                            metrics.update_sector_state(SectorState::AboutToExpire);
921                        }
922                        SectorExpirationDetails::AboutToExpire
923                    };
924                    handlers
925                        .sector_update
926                        .call_simple(&(sector_index, SectorUpdate::Expiration(expiration_details)));
927
928                    // Time to replot
929                    sectors_to_replot.push(SectorToReplot {
930                        sector_index,
931                        expires_at,
932                    });
933                }
934                continue;
935            }
936
937            let shard_commitments_root = shard_commitments_roots_cache.get(history_size);
938            if let Some(expiration_check_segment_index) = history_size
939                .sector_expiration_check(min_sector_lifetime)
940                .map(|expiration_check_history_size| expiration_check_history_size.segment_index())
941            {
942                trace!(
943                    %sector_index,
944                    %history_size,
945                    %expiration_check_segment_index,
946                    "Determined sector expiration check segment index"
947                );
948                let maybe_sector_expiration_check_super_segment_root = node_client
949                    .super_segment_root_for_segment_index(expiration_check_segment_index)
950                    .await
951                    .map_err(|error| PlottingError::FailedToGetSuperSegmentRoot { error })?;
952
953                if let Some(sector_expiration_check_super_segment_root) =
954                    maybe_sector_expiration_check_super_segment_root
955                {
956                    let sector_id = SectorId::new(
957                        &public_key_hash,
958                        &shard_commitments_root,
959                        sector_index,
960                        history_size,
961                    );
962                    let expiration_history_size = sector_id
963                        .derive_expiration_history_size(
964                            history_size,
965                            &sector_expiration_check_super_segment_root,
966                            min_sector_lifetime,
967                        )
968                        .expect(
969                            "Farmers internally stores correct history size in sector \
970                            metadata; qed",
971                        );
972
973                    let expires_at = expiration_history_size.segment_index();
974
975                    trace!(
976                        %sector_index,
977                        %history_size,
978                        sector_expire_at = %expires_at,
979                        "Determined sector expiration segment index"
980                    );
981                    // +1 means we will start replotting a bit before it actually expires to avoid
982                    // storing expired sectors
983                    if expires_at <= (segment_index + SegmentIndex::ONE) {
984                        debug!(
985                            %sector_index,
986                            %history_size,
987                            %expires_at,
988                            "Sector expires soon #2, scheduling replotting"
989                        );
990
991                        let expiration_details = if expires_at <= segment_index {
992                            if let Some(metrics) = metrics {
993                                metrics.update_sector_state(SectorState::Expired);
994                            }
995                            SectorExpirationDetails::Expired
996                        } else {
997                            if let Some(metrics) = metrics {
998                                metrics.update_sector_state(SectorState::AboutToExpire);
999                            }
1000                            SectorExpirationDetails::AboutToExpire
1001                        };
1002                        handlers.sector_update.call_simple(&(
1003                            sector_index,
1004                            SectorUpdate::Expiration(expiration_details),
1005                        ));
1006
1007                        // Time to replot
1008                        sectors_to_replot.push(SectorToReplot {
1009                            sector_index,
1010                            expires_at,
1011                        });
1012                    } else {
1013                        trace!(
1014                            %sector_index,
1015                            %history_size,
1016                            sector_expire_at = %expires_at,
1017                            "Sector expires later, remembering sector expiration"
1018                        );
1019
1020                        handlers.sector_update.call_simple(&(
1021                            sector_index,
1022                            SectorUpdate::Expiration(SectorExpirationDetails::Determined {
1023                                expires_at,
1024                            }),
1025                        ));
1026
1027                        // Store expiration so we don't have to recalculate it later
1028                        if let Some(expires_at_entry) =
1029                            sectors_expire_at.get_mut(usize::from(sector_index))
1030                        {
1031                            expires_at_entry.replace(expires_at);
1032                        }
1033                    }
1034                }
1035            }
1036        }
1037        drop(sectors_metadata);
1038
1039        let sectors_queued = sectors_to_replot.len();
1040        sectors_to_replot.sort_by_key(|sector_to_replot| sector_to_replot.expires_at);
1041        for (index, SectorToReplot { sector_index, .. }) in sectors_to_replot.drain(..).enumerate()
1042        {
1043            let (acknowledgement_sender, acknowledgement_receiver) = oneshot::channel();
1044            if let Err(error) = sectors_to_plot_sender
1045                .send(SectorToPlot {
1046                    sector_index,
1047                    progress: index as f32 / sectors_queued as f32 * 100.0,
1048                    last_queued: index + 1 == sectors_queued,
1049                    acknowledgement_sender,
1050                })
1051                .await
1052            {
1053                warn!(%error, "Failed to send sector index for replotting");
1054                return Ok(());
1055            }
1056
1057            // We do not care if message was sent back or sender was just dropped
1058            let _ = acknowledgement_receiver.await;
1059
1060            if let Some(expires_at_entry) = sectors_expire_at.get_mut(usize::from(sector_index)) {
1061                expires_at_entry.take();
1062            }
1063        }
1064
1065        if new_segment_index_receiver.changed().await.is_err() {
1066            break;
1067        }
1068    }
1069
1070    Ok(())
1071}