ab_farmer/single_disk_farm/
plotting.rs

1use crate::farm::{SectorExpirationDetails, SectorPlottingDetails, SectorUpdate};
2use crate::node_client::NodeClient;
3use crate::plotter::{Plotter, SectorPlottingProgress};
4use crate::single_disk_farm::direct_io_file_wrapper::DirectIoFileWrapper;
5use crate::single_disk_farm::metrics::{SectorState, SingleDiskFarmMetrics};
6use crate::single_disk_farm::{
7    BackgroundTaskError, Handlers, PlotMetadataHeader, RESERVED_PLOT_METADATA,
8};
9use ab_core_primitives::ed25519::Ed25519PublicKey;
10use ab_core_primitives::hashes::Blake3Hash;
11use ab_core_primitives::pieces::PieceOffset;
12use ab_core_primitives::sectors::{SectorId, SectorIndex};
13use ab_core_primitives::segments::{HistorySize, SegmentHeader, SegmentIndex};
14use ab_farmer_components::file_ext::FileExt;
15use ab_farmer_components::plotting::PlottedSector;
16use ab_farmer_components::sector::SectorMetadataChecksummed;
17use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock, Semaphore, SemaphoreGuard};
18use futures::channel::{mpsc, oneshot};
19use futures::stream::FuturesOrdered;
20use futures::{FutureExt, SinkExt, StreamExt, select};
21use parity_scale_codec::Encode;
22use rand::prelude::*;
23use std::collections::HashSet;
24use std::future::Future;
25use std::io;
26use std::num::NonZeroUsize;
27use std::ops::Range;
28use std::pin::pin;
29use std::sync::Arc;
30use std::time::{Duration, Instant};
31use thiserror::Error;
32use tokio::sync::watch;
33use tokio::task;
34use tracing::{Instrument, debug, info, info_span, trace, warn};
35
36const FARMER_APP_INFO_RETRY_INTERVAL: Duration = Duration::from_millis(500);
37const PLOTTING_RETRY_DELAY: Duration = Duration::from_secs(1);
38
39pub(super) struct SectorToPlot {
40    sector_index: SectorIndex,
41    /// Progress so far in % (not including this sector)
42    progress: f32,
43    /// Whether this is the last sector queued so far
44    last_queued: bool,
45    acknowledgement_sender: oneshot::Sender<()>,
46}
47
48/// Errors that happen during plotting
49#[derive(Debug, Error)]
50pub enum PlottingError {
51    /// Failed to retrieve farmer info
52    #[error("Failed to retrieve farmer info: {error}")]
53    FailedToGetFarmerInfo {
54        /// Lower-level error
55        error: anyhow::Error,
56    },
57    /// Failed to get segment header
58    #[error("Failed to get segment header: {error}")]
59    FailedToGetSegmentHeader {
60        /// Lower-level error
61        error: anyhow::Error,
62    },
63    /// Missing archived segment header
64    #[error("Missing archived segment header: {segment_index}")]
65    MissingArchivedSegmentHeader {
66        /// Segment index that was missing
67        segment_index: SegmentIndex,
68    },
69    /// Failed to subscribe to archived segments
70    #[error("Failed to subscribe to archived segments: {error}")]
71    FailedToSubscribeArchivedSegments {
72        /// Lower-level error
73        error: anyhow::Error,
74    },
75    /// Low-level plotting error
76    #[error("Low-level plotting error: {0}")]
77    LowLevel(String),
78    /// I/O error occurred
79    #[error("Plotting I/O error: {0}")]
80    Io(#[from] io::Error),
81    /// Background downloading panicked
82    #[error("Background downloading panicked")]
83    BackgroundDownloadingPanicked,
84}
85
86pub(super) struct SectorPlottingOptions<'a, NC> {
87    pub(super) public_key: Ed25519PublicKey,
88    pub(super) node_client: &'a NC,
89    pub(super) pieces_in_sector: u16,
90    pub(super) sector_size: usize,
91    pub(super) plot_file: Arc<DirectIoFileWrapper>,
92    pub(super) metadata_file: Arc<DirectIoFileWrapper>,
93    pub(super) handlers: &'a Handlers,
94    pub(super) global_mutex: &'a AsyncMutex<()>,
95    pub(super) plotter: Arc<dyn Plotter>,
96    pub(super) metrics: Option<Arc<SingleDiskFarmMetrics>>,
97}
98
99pub(super) struct PlottingOptions<'a, NC> {
100    pub(super) metadata_header: PlotMetadataHeader,
101    pub(super) sectors_metadata: &'a AsyncRwLock<Vec<SectorMetadataChecksummed>>,
102    pub(super) sectors_being_modified: &'a AsyncRwLock<HashSet<SectorIndex>>,
103    pub(super) sectors_to_plot_receiver: mpsc::Receiver<SectorToPlot>,
104    pub(super) sector_plotting_options: SectorPlottingOptions<'a, NC>,
105    pub(super) max_plotting_sectors_per_farm: NonZeroUsize,
106}
107
108/// Starts plotting process.
109///
110/// NOTE: Returned future is async, but does blocking operations and should be running in dedicated
111/// thread.
112pub(super) async fn plotting<NC>(
113    plotting_options: PlottingOptions<'_, NC>,
114) -> Result<(), PlottingError>
115where
116    NC: NodeClient,
117{
118    let PlottingOptions {
119        mut metadata_header,
120        sectors_metadata,
121        sectors_being_modified,
122        mut sectors_to_plot_receiver,
123        sector_plotting_options,
124        max_plotting_sectors_per_farm,
125    } = plotting_options;
126
127    let sector_plotting_options = &sector_plotting_options;
128    let plotting_semaphore = Semaphore::new(max_plotting_sectors_per_farm.get());
129    let mut sectors_being_plotted = FuturesOrdered::new();
130    // Channel size is intentionally unbounded for easier analysis, but it is bounded by plotting
131    // semaphore in practice due to permit stored in `SectorPlottingResult`
132    let (sector_plotting_result_sender, mut sector_plotting_result_receiver) = mpsc::unbounded();
133    let process_plotting_result_fut = async move {
134        while let Some(sector_plotting_result) = sector_plotting_result_receiver.next().await {
135            process_plotting_result(
136                sector_plotting_result,
137                sectors_metadata,
138                sectors_being_modified,
139                &mut metadata_header,
140                Arc::clone(&sector_plotting_options.metadata_file),
141            )
142            .await?;
143        }
144
145        unreachable!(
146            "Stream will not end before the rest of the plotting process is shutting down"
147        );
148    };
149    let process_plotting_result_fut = process_plotting_result_fut.fuse();
150    let mut process_plotting_result_fut = pin!(process_plotting_result_fut);
151
152    // Wait for new sectors to plot from `sectors_to_plot_receiver` and wait for sectors that
153    // already started plotting to finish plotting and then update metadata header
154    loop {
155        select! {
156            maybe_sector_to_plot = sectors_to_plot_receiver.next() => {
157                let Some(sector_to_plot) = maybe_sector_to_plot else {
158                    break;
159                };
160
161                let sector_index = sector_to_plot.sector_index;
162                let sector_plotting_init_fut = plot_single_sector(
163                    sector_to_plot,
164                    sector_plotting_options,
165                    sectors_metadata,
166                    sectors_being_modified,
167                    &plotting_semaphore,
168                )
169                    .instrument(info_span!("", %sector_index))
170                    .fuse();
171                let mut sector_plotting_init_fut = pin!(sector_plotting_init_fut);
172
173                // Wait for plotting of new sector to start (backpressure), while also waiting
174                // for sectors that already started plotting to finish plotting and then update
175                // metadata header
176                loop {
177                    select! {
178                        sector_plotting_init_result = sector_plotting_init_fut => {
179                            let sector_plotting_fut = match sector_plotting_init_result {
180                                PlotSingleSectorResult::Scheduled(future) => future,
181                                PlotSingleSectorResult::Skipped => {
182                                    break;
183                                }
184                                PlotSingleSectorResult::FatalError(error) => {
185                                    return Err(error);
186                                }
187                            };
188                            sectors_being_plotted.push_back(
189                                sector_plotting_fut.instrument(info_span!("", %sector_index))
190                            );
191                            break;
192                        }
193                        maybe_sector_plotting_result = sectors_being_plotted.select_next_some() => {
194                            sector_plotting_result_sender
195                                .unbounded_send(maybe_sector_plotting_result?)
196                                .expect("Sending means receiver is not dropped yet; qed");
197                        }
198                        result = process_plotting_result_fut => {
199                            return result;
200                        }
201                    }
202                }
203            }
204            maybe_sector_plotting_result = sectors_being_plotted.select_next_some() => {
205                sector_plotting_result_sender
206                    .unbounded_send(maybe_sector_plotting_result?)
207                    .expect("Sending means receiver is not dropped yet; qed");
208            }
209            result = process_plotting_result_fut => {
210                return result;
211            }
212        }
213    }
214
215    Ok(())
216}
217
218async fn process_plotting_result(
219    sector_plotting_result: SectorPlottingResult<'_>,
220    sectors_metadata: &AsyncRwLock<Vec<SectorMetadataChecksummed>>,
221    sectors_being_modified: &AsyncRwLock<HashSet<SectorIndex>>,
222    metadata_header: &mut PlotMetadataHeader,
223    metadata_file: Arc<DirectIoFileWrapper>,
224) -> Result<(), PlottingError> {
225    let SectorPlottingResult {
226        sector_metadata,
227        replotting,
228        last_queued,
229        plotting_permit,
230    } = sector_plotting_result;
231
232    let sector_index = sector_metadata.sector_index;
233
234    {
235        let mut sectors_metadata = sectors_metadata.write().await;
236        // If exists then we're replotting, otherwise we create sector for the first time
237        if let Some(existing_sector_metadata) = sectors_metadata.get_mut(usize::from(sector_index))
238        {
239            *existing_sector_metadata = sector_metadata;
240        } else {
241            sectors_metadata.push(sector_metadata);
242        }
243    }
244
245    // Inform others that this sector is no longer being modified
246    sectors_being_modified.write().await.remove(&sector_index);
247
248    if u16::from(sector_index) + 1 > metadata_header.plotted_sector_count {
249        metadata_header.plotted_sector_count = u16::from(sector_index) + 1;
250
251        let encoded_metadata_header = metadata_header.encode();
252        let write_fut =
253            task::spawn_blocking(move || metadata_file.write_all_at(&encoded_metadata_header, 0));
254        write_fut.await.map_err(|error| {
255            PlottingError::LowLevel(format!("Failed to spawn blocking tokio task: {error}"))
256        })??;
257    }
258
259    if last_queued {
260        if replotting {
261            info!("Replotting complete");
262        } else {
263            info!("Initial plotting complete");
264        }
265    }
266
267    drop(plotting_permit);
268
269    Ok(())
270}
271
272enum PlotSingleSectorResult<F> {
273    Scheduled(F),
274    Skipped,
275    FatalError(PlottingError),
276}
277
278struct SectorPlottingResult<'a> {
279    sector_metadata: SectorMetadataChecksummed,
280    replotting: bool,
281    last_queued: bool,
282    plotting_permit: SemaphoreGuard<'a>,
283}
284
285async fn plot_single_sector<'a, NC>(
286    sector_to_plot: SectorToPlot,
287    sector_plotting_options: &'a SectorPlottingOptions<'a, NC>,
288    sectors_metadata: &'a AsyncRwLock<Vec<SectorMetadataChecksummed>>,
289    sectors_being_modified: &'a AsyncRwLock<HashSet<SectorIndex>>,
290    plotting_semaphore: &'a Semaphore,
291) -> PlotSingleSectorResult<
292    impl Future<Output = Result<SectorPlottingResult<'a>, PlottingError>> + 'a,
293>
294where
295    NC: NodeClient,
296{
297    let SectorPlottingOptions {
298        public_key,
299        node_client,
300        pieces_in_sector,
301        sector_size,
302        plot_file,
303        metadata_file,
304        handlers,
305        global_mutex,
306        plotter,
307        metrics,
308    } = sector_plotting_options;
309
310    let SectorToPlot {
311        sector_index,
312        progress,
313        last_queued,
314        acknowledgement_sender: _acknowledgement_sender,
315    } = sector_to_plot;
316    trace!("Preparing to plot sector");
317
318    // Inform others that this sector is being modified
319    {
320        let mut sectors_being_modified = sectors_being_modified.write().await;
321        if !sectors_being_modified.insert(sector_index) {
322            debug!("Skipped sector plotting, it is already in progress");
323            return PlotSingleSectorResult::Skipped;
324        }
325    }
326
327    let plotting_permit = plotting_semaphore.acquire().await;
328
329    let maybe_old_sector_metadata = sectors_metadata
330        .read()
331        .await
332        .get(usize::from(sector_index))
333        .cloned();
334    let replotting = maybe_old_sector_metadata.is_some();
335
336    if let Some(metrics) = metrics {
337        metrics.sector_plotting.inc();
338    }
339    let sector_state = SectorUpdate::Plotting(SectorPlottingDetails::Starting {
340        progress,
341        replotting,
342        last_queued,
343    });
344    handlers
345        .sector_update
346        .call_simple(&(sector_index, sector_state));
347
348    let start = Instant::now();
349
350    // This `loop` is a workaround for edge-case in local setup if expiration is configured to 1.
351    // In that scenario we get replotting notification essentially straight from block import
352    // pipeline of the node, before block is imported. This can result in subsequent request for
353    // farmer app info to return old data, meaning we're replotting exactly the same sector that
354    // just expired.
355    let farmer_app_info = loop {
356        let farmer_app_info = match node_client.farmer_app_info().await {
357            Ok(farmer_app_info) => farmer_app_info,
358            Err(error) => {
359                return PlotSingleSectorResult::FatalError(PlottingError::FailedToGetFarmerInfo {
360                    error,
361                });
362            }
363        };
364
365        if let Some(old_sector_metadata) = &maybe_old_sector_metadata
366            && farmer_app_info.protocol_info.history_size <= old_sector_metadata.history_size
367        {
368            if farmer_app_info.protocol_info.min_sector_lifetime == HistorySize::ONE {
369                debug!(
370                    current_history_size = %farmer_app_info.protocol_info.history_size,
371                    old_sector_history_size = %old_sector_metadata.history_size,
372                    "Latest protocol history size is not yet newer than old sector history \
373                    size, wait for a bit and try again"
374                );
375                tokio::time::sleep(FARMER_APP_INFO_RETRY_INTERVAL).await;
376                continue;
377            } else {
378                debug!(
379                    current_history_size = %farmer_app_info.protocol_info.history_size,
380                    old_sector_history_size = %old_sector_metadata.history_size,
381                    "Skipped sector plotting, likely redundant due to redundant archived \
382                    segment notification"
383                );
384                return PlotSingleSectorResult::Skipped;
385            }
386        }
387
388        break farmer_app_info;
389    };
390
391    let (progress_sender, mut progress_receiver) = mpsc::channel(10);
392
393    // Initiate plotting
394    plotter
395        .plot_sector(
396            *public_key,
397            sector_index,
398            farmer_app_info.protocol_info,
399            *pieces_in_sector,
400            replotting,
401            progress_sender,
402        )
403        .await;
404
405    if replotting {
406        info!("Replotting sector ({progress:.2}% complete)");
407    } else {
408        info!("Plotting sector ({progress:.2}% complete)");
409    }
410
411    PlotSingleSectorResult::Scheduled(async move {
412        let plotted_sector = loop {
413            match plot_single_sector_internal(
414                sector_index,
415                *sector_size,
416                plot_file,
417                metadata_file,
418                handlers,
419                global_mutex,
420                progress_receiver,
421                metrics,
422            )
423            .await?
424            {
425                Ok(plotted_sector) => {
426                    break plotted_sector;
427                }
428                Err(error) => {
429                    warn!(
430                        %error,
431                        "Failed to plot sector, retrying in {PLOTTING_RETRY_DELAY:?}"
432                    );
433
434                    tokio::time::sleep(PLOTTING_RETRY_DELAY).await;
435                }
436            }
437
438            let (retry_progress_sender, retry_progress_receiver) = mpsc::channel(10);
439            progress_receiver = retry_progress_receiver;
440
441            // Initiate plotting
442            plotter
443                .plot_sector(
444                    *public_key,
445                    sector_index,
446                    farmer_app_info.protocol_info,
447                    *pieces_in_sector,
448                    replotting,
449                    retry_progress_sender,
450                )
451                .await;
452
453            if replotting {
454                info!("Replotting sector retry");
455            } else {
456                info!("Plotting sector retry");
457            }
458        };
459
460        let maybe_old_plotted_sector = maybe_old_sector_metadata.map(|old_sector_metadata| {
461            let old_history_size = old_sector_metadata.history_size;
462
463            PlottedSector {
464                sector_id: plotted_sector.sector_id,
465                sector_index: plotted_sector.sector_index,
466                sector_metadata: old_sector_metadata,
467                piece_indexes: {
468                    let mut piece_indexes = Vec::with_capacity(usize::from(*pieces_in_sector));
469                    (PieceOffset::ZERO..)
470                        .take(usize::from(*pieces_in_sector))
471                        .map(|piece_offset| {
472                            plotted_sector.sector_id.derive_piece_index(
473                                piece_offset,
474                                old_history_size,
475                                farmer_app_info.protocol_info.max_pieces_in_sector,
476                                farmer_app_info.protocol_info.recent_segments,
477                                farmer_app_info.protocol_info.recent_history_fraction,
478                            )
479                        })
480                        .collect_into(&mut piece_indexes);
481                    piece_indexes
482                },
483            }
484        });
485
486        if replotting {
487            debug!("Sector replotted successfully");
488        } else {
489            debug!("Sector plotted successfully");
490        }
491
492        let sector_metadata = plotted_sector.sector_metadata.clone();
493
494        let time = start.elapsed();
495        if let Some(metrics) = metrics {
496            metrics.sector_plotting_time.observe(time.as_secs_f64());
497            metrics.sector_plotted.inc();
498            metrics.update_sector_state(SectorState::Plotted);
499        }
500        let sector_state = SectorUpdate::Plotting(SectorPlottingDetails::Finished {
501            plotted_sector,
502            old_plotted_sector: maybe_old_plotted_sector,
503            time,
504        });
505        handlers
506            .sector_update
507            .call_simple(&(sector_index, sector_state));
508
509        Ok(SectorPlottingResult {
510            sector_metadata,
511            replotting,
512            last_queued,
513            plotting_permit,
514        })
515    })
516}
517
518/// Outer error is used to indicate irrecoverable plotting errors, while inner result is for
519/// recoverable errors
520#[allow(clippy::too_many_arguments)]
521async fn plot_single_sector_internal(
522    sector_index: SectorIndex,
523    sector_size: usize,
524    plot_file: &Arc<DirectIoFileWrapper>,
525    metadata_file: &Arc<DirectIoFileWrapper>,
526    handlers: &Handlers,
527    global_mutex: &AsyncMutex<()>,
528    mut progress_receiver: mpsc::Receiver<SectorPlottingProgress>,
529    metrics: &Option<Arc<SingleDiskFarmMetrics>>,
530) -> Result<Result<PlottedSector, PlottingError>, PlottingError> {
531    // Process plotting progress notifications
532    let progress_processor_fut = async {
533        while let Some(progress) = progress_receiver.next().await {
534            match progress {
535                SectorPlottingProgress::Downloading => {
536                    if let Some(metrics) = metrics {
537                        metrics.sector_downloading.inc();
538                    }
539                    handlers.sector_update.call_simple(&(
540                        sector_index,
541                        SectorUpdate::Plotting(SectorPlottingDetails::Downloading),
542                    ));
543                }
544                SectorPlottingProgress::Downloaded(time) => {
545                    if let Some(metrics) = metrics {
546                        metrics.sector_downloading_time.observe(time.as_secs_f64());
547                        metrics.sector_downloaded.inc();
548                    }
549                    handlers.sector_update.call_simple(&(
550                        sector_index,
551                        SectorUpdate::Plotting(SectorPlottingDetails::Downloaded(time)),
552                    ));
553                }
554                SectorPlottingProgress::Encoding => {
555                    if let Some(metrics) = metrics {
556                        metrics.sector_encoding.inc();
557                    }
558                    handlers.sector_update.call_simple(&(
559                        sector_index,
560                        SectorUpdate::Plotting(SectorPlottingDetails::Encoding),
561                    ));
562                }
563                SectorPlottingProgress::Encoded(time) => {
564                    if let Some(metrics) = metrics {
565                        metrics.sector_encoding_time.observe(time.as_secs_f64());
566                        metrics.sector_encoded.inc();
567                    }
568                    handlers.sector_update.call_simple(&(
569                        sector_index,
570                        SectorUpdate::Plotting(SectorPlottingDetails::Encoded(time)),
571                    ));
572                }
573                SectorPlottingProgress::Finished {
574                    plotted_sector,
575                    time: _,
576                    sector,
577                } => {
578                    return Ok((plotted_sector, sector));
579                }
580                SectorPlottingProgress::Error { error } => {
581                    if let Some(metrics) = metrics {
582                        metrics.sector_plotting_error.inc();
583                    }
584                    handlers.sector_update.call_simple(&(
585                        sector_index,
586                        SectorUpdate::Plotting(SectorPlottingDetails::Error(error.clone())),
587                    ));
588                    return Err(error);
589                }
590            }
591        }
592
593        Err("Plotting progress stream ended before plotting finished".to_string())
594    };
595
596    let (plotted_sector, mut sector) = match progress_processor_fut.await {
597        Ok(result) => result,
598        Err(error) => {
599            return Ok(Err(PlottingError::LowLevel(error)));
600        }
601    };
602
603    {
604        // Take mutex briefly to make sure writing is allowed right now
605        global_mutex.lock().await;
606
607        if let Some(metrics) = metrics {
608            metrics.sector_writing.inc();
609        }
610        handlers.sector_update.call_simple(&(
611            sector_index,
612            SectorUpdate::Plotting(SectorPlottingDetails::Writing),
613        ));
614
615        let start = Instant::now();
616
617        {
618            let sector_write_base_offset = u64::from(sector_index) * sector_size as u64;
619            let mut total_received = 0;
620            let mut sector_write_offset = sector_write_base_offset;
621            while let Some(maybe_sector_chunk) = sector.next().await {
622                let sector_chunk = match maybe_sector_chunk {
623                    Ok(sector_chunk) => sector_chunk,
624                    Err(error) => {
625                        return Ok(Err(PlottingError::LowLevel(format!(
626                            "Sector chunk receive error: {error}"
627                        ))));
628                    }
629                };
630
631                total_received += sector_chunk.len();
632
633                if total_received > sector_size {
634                    return Ok(Err(PlottingError::LowLevel(format!(
635                        "Received too many bytes {total_received} instead of expected \
636                        {sector_size} bytes"
637                    ))));
638                }
639
640                let sector_chunk_size = sector_chunk.len() as u64;
641
642                trace!(sector_chunk_size, "Writing sector chunk to disk");
643                let write_fut = task::spawn_blocking({
644                    let plot_file = Arc::clone(plot_file);
645
646                    move || plot_file.write_all_at(&sector_chunk, sector_write_offset)
647                });
648                write_fut.await.map_err(|error| {
649                    PlottingError::LowLevel(format!("Failed to spawn blocking tokio task: {error}"))
650                })??;
651
652                sector_write_offset += sector_chunk_size;
653            }
654            drop(sector);
655
656            if total_received != sector_size {
657                return Ok(Err(PlottingError::LowLevel(format!(
658                    "Received only {total_received} sector bytes out of {sector_size} \
659                    expected bytes"
660                ))));
661            }
662        }
663        {
664            let encoded_sector_metadata = plotted_sector.sector_metadata.encode();
665            let write_fut = task::spawn_blocking({
666                let metadata_file = Arc::clone(metadata_file);
667
668                move || {
669                    metadata_file.write_all_at(
670                        &encoded_sector_metadata,
671                        RESERVED_PLOT_METADATA
672                            + (u64::from(sector_index) * encoded_sector_metadata.len() as u64),
673                    )
674                }
675            });
676            write_fut.await.map_err(|error| {
677                PlottingError::LowLevel(format!("Failed to spawn blocking tokio task: {error}"))
678            })??;
679        }
680
681        let time = start.elapsed();
682        if let Some(metrics) = metrics {
683            metrics.sector_writing_time.observe(time.as_secs_f64());
684            metrics.sector_written.inc();
685        }
686        handlers.sector_update.call_simple(&(
687            sector_index,
688            SectorUpdate::Plotting(SectorPlottingDetails::Written(time)),
689        ));
690    }
691
692    Ok(Ok(plotted_sector))
693}
694
695pub(super) struct PlottingSchedulerOptions<NC> {
696    pub(super) public_key_hash: Blake3Hash,
697    pub(super) sectors_indices_left_to_plot: Range<SectorIndex>,
698    pub(super) target_sector_count: u16,
699    pub(super) last_archived_segment_index: SegmentIndex,
700    pub(super) min_sector_lifetime: HistorySize,
701    pub(super) node_client: NC,
702    pub(super) handlers: Arc<Handlers>,
703    pub(super) sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
704    pub(super) sectors_to_plot_sender: mpsc::Sender<SectorToPlot>,
705    // Max delay between segment header being acknowledged by farmer and potentially
706    // triggering replotting
707    pub(super) new_segment_processing_delay: Duration,
708    pub(super) metrics: Option<Arc<SingleDiskFarmMetrics>>,
709}
710
711pub(super) async fn plotting_scheduler<NC>(
712    plotting_scheduler_options: PlottingSchedulerOptions<NC>,
713) -> Result<(), BackgroundTaskError>
714where
715    NC: NodeClient,
716{
717    let PlottingSchedulerOptions {
718        public_key_hash,
719        sectors_indices_left_to_plot,
720        target_sector_count,
721        last_archived_segment_index,
722        min_sector_lifetime,
723        node_client,
724        handlers,
725        sectors_metadata,
726        sectors_to_plot_sender,
727        new_segment_processing_delay,
728        metrics,
729    } = plotting_scheduler_options;
730
731    // Create a proxy channel with atomically updatable last archived segment that
732    // allows to not buffer messages from RPC subscription, but also access the most
733    // recent value at any time
734    let last_archived_segment = node_client
735        .segment_headers(vec![last_archived_segment_index])
736        .await
737        .map_err(|error| PlottingError::FailedToGetSegmentHeader { error })?
738        .into_iter()
739        .next()
740        .flatten()
741        .ok_or(PlottingError::MissingArchivedSegmentHeader {
742            segment_index: last_archived_segment_index,
743        })?;
744
745    let (archived_segments_sender, archived_segments_receiver) =
746        watch::channel(last_archived_segment);
747
748    let read_archived_segments_notifications_fut = read_archived_segments_notifications(
749        &node_client,
750        archived_segments_sender,
751        new_segment_processing_delay,
752    );
753
754    let send_plotting_notifications_fut = send_plotting_notifications(
755        public_key_hash,
756        sectors_indices_left_to_plot,
757        target_sector_count,
758        min_sector_lifetime,
759        &node_client,
760        &handlers,
761        sectors_metadata,
762        archived_segments_receiver,
763        sectors_to_plot_sender,
764        &metrics,
765    );
766
767    select! {
768        result = read_archived_segments_notifications_fut.fuse() => {
769            result
770        }
771        result = send_plotting_notifications_fut.fuse() => {
772            result
773        }
774    }
775}
776
777async fn read_archived_segments_notifications<NC>(
778    node_client: &NC,
779    archived_segments_sender: watch::Sender<SegmentHeader>,
780    new_segment_processing_delay: Duration,
781) -> Result<(), BackgroundTaskError>
782where
783    NC: NodeClient,
784{
785    info!("Subscribing to archived segments");
786
787    let mut archived_segments_notifications = node_client
788        .subscribe_archived_segment_headers()
789        .await
790        .map_err(|error| PlottingError::FailedToSubscribeArchivedSegments { error })?;
791
792    while let Some(segment_header) = archived_segments_notifications.next().await {
793        debug!(?segment_header, "New archived segment");
794        if let Err(error) = node_client
795            .acknowledge_archived_segment_header(segment_header.segment_index())
796            .await
797        {
798            debug!(%error, "Failed to acknowledge segment header");
799        }
800
801        // There is no urgent need to rush replotting sectors immediately and this delay allows for
802        // newly archived pieces to be both cached locally and on other farmers on the network
803        let delay = Duration::from_secs(rand::rng().random_range(
804            new_segment_processing_delay.as_secs() / 10..=new_segment_processing_delay.as_secs(),
805        ));
806        tokio::time::sleep(delay).await;
807
808        if archived_segments_sender.send(segment_header).is_err() {
809            break;
810        }
811    }
812
813    Ok(())
814}
815
816struct SectorToReplot {
817    sector_index: SectorIndex,
818    expires_at: SegmentIndex,
819}
820
821#[allow(clippy::too_many_arguments)]
822async fn send_plotting_notifications<NC>(
823    public_key_hash: Blake3Hash,
824    sectors_indices_left_to_plot: Range<SectorIndex>,
825    target_sector_count: u16,
826    min_sector_lifetime: HistorySize,
827    node_client: &NC,
828    handlers: &Handlers,
829    sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
830    mut archived_segments_receiver: watch::Receiver<SegmentHeader>,
831    mut sectors_to_plot_sender: mpsc::Sender<SectorToPlot>,
832    metrics: &Option<Arc<SingleDiskFarmMetrics>>,
833) -> Result<(), BackgroundTaskError>
834where
835    NC: NodeClient,
836{
837    // Finish initial plotting if some sectors were not plotted fully yet
838    for sector_index in sectors_indices_left_to_plot {
839        let (acknowledgement_sender, acknowledgement_receiver) = oneshot::channel();
840        if let Err(error) = sectors_to_plot_sender
841            .send(SectorToPlot {
842                sector_index,
843                progress: u16::from(sector_index) as f32 / target_sector_count as f32 * 100.0,
844                last_queued: u16::from(sector_index) + 1 == target_sector_count,
845                acknowledgement_sender,
846            })
847            .await
848        {
849            warn!(%error, "Failed to send sector index for initial plotting");
850            return Ok(());
851        }
852
853        // We do not care if message was sent back or sender was just dropped
854        let _ = acknowledgement_receiver.await;
855    }
856
857    let mut sectors_expire_at = vec![None::<SegmentIndex>; usize::from(target_sector_count)];
858    // 10% capacity is generous and should prevent reallocation in most cases
859    let mut sectors_to_replot = Vec::with_capacity(usize::from(target_sector_count) / 10);
860
861    loop {
862        let segment_index = archived_segments_receiver
863            .borrow_and_update()
864            .segment_index();
865        trace!(%segment_index, "New archived segment received");
866
867        let sectors_metadata = sectors_metadata.read().await;
868        let sectors_to_check = sectors_metadata
869            .iter()
870            .map(|sector_metadata| (sector_metadata.sector_index, sector_metadata.history_size));
871        for (sector_index, history_size) in sectors_to_check {
872            if let Some(Some(expires_at)) =
873                sectors_expire_at.get(usize::from(sector_index)).copied()
874            {
875                trace!(
876                    %sector_index,
877                    %history_size,
878                    %expires_at,
879                    "Checking sector for expiration"
880                );
881                // +1 means we will start replotting a bit before it actually expires to avoid
882                // storing expired sectors
883                if expires_at <= (segment_index + SegmentIndex::ONE) {
884                    debug!(
885                        %sector_index,
886                        %history_size,
887                        %expires_at,
888                        "Sector expires soon #1, scheduling replotting"
889                    );
890
891                    let expiration_details = if expires_at <= segment_index {
892                        if let Some(metrics) = metrics {
893                            metrics.update_sector_state(SectorState::Expired);
894                        }
895                        SectorExpirationDetails::Expired
896                    } else {
897                        if let Some(metrics) = metrics {
898                            metrics.update_sector_state(SectorState::AboutToExpire);
899                        }
900                        SectorExpirationDetails::AboutToExpire
901                    };
902                    handlers
903                        .sector_update
904                        .call_simple(&(sector_index, SectorUpdate::Expiration(expiration_details)));
905
906                    // Time to replot
907                    sectors_to_replot.push(SectorToReplot {
908                        sector_index,
909                        expires_at,
910                    });
911                }
912                continue;
913            }
914
915            if let Some(expiration_check_segment_index) = history_size
916                .sector_expiration_check(min_sector_lifetime)
917                .map(|expiration_check_history_size| expiration_check_history_size.segment_index())
918            {
919                trace!(
920                    %sector_index,
921                    %history_size,
922                    %expiration_check_segment_index,
923                    "Determined sector expiration check segment index"
924                );
925                let maybe_sector_expiration_check_segment_root = node_client
926                    .segment_headers(vec![expiration_check_segment_index])
927                    .await
928                    .map_err(|error| PlottingError::FailedToGetSegmentHeader { error })?
929                    .into_iter()
930                    .next()
931                    .flatten()
932                    .map(|segment_header| segment_header.segment_root);
933
934                if let Some(sector_expiration_check_segment_root) =
935                    maybe_sector_expiration_check_segment_root
936                {
937                    let sector_id = SectorId::new(&public_key_hash, sector_index, history_size);
938                    let expiration_history_size = sector_id
939                        .derive_expiration_history_size(
940                            history_size,
941                            &sector_expiration_check_segment_root,
942                            min_sector_lifetime,
943                        )
944                        .expect(
945                            "Farmers internally stores correct history size in sector \
946                            metadata; qed",
947                        );
948
949                    let expires_at = expiration_history_size.segment_index();
950
951                    trace!(
952                        %sector_index,
953                        %history_size,
954                        sector_expire_at = %expires_at,
955                        "Determined sector expiration segment index"
956                    );
957                    // +1 means we will start replotting a bit before it actually expires to avoid
958                    // storing expired sectors
959                    if expires_at <= (segment_index + SegmentIndex::ONE) {
960                        debug!(
961                            %sector_index,
962                            %history_size,
963                            %expires_at,
964                            "Sector expires soon #2, scheduling replotting"
965                        );
966
967                        let expiration_details = if expires_at <= segment_index {
968                            if let Some(metrics) = metrics {
969                                metrics.update_sector_state(SectorState::Expired);
970                            }
971                            SectorExpirationDetails::Expired
972                        } else {
973                            if let Some(metrics) = metrics {
974                                metrics.update_sector_state(SectorState::AboutToExpire);
975                            }
976                            SectorExpirationDetails::AboutToExpire
977                        };
978                        handlers.sector_update.call_simple(&(
979                            sector_index,
980                            SectorUpdate::Expiration(expiration_details),
981                        ));
982
983                        // Time to replot
984                        sectors_to_replot.push(SectorToReplot {
985                            sector_index,
986                            expires_at,
987                        });
988                    } else {
989                        trace!(
990                            %sector_index,
991                            %history_size,
992                            sector_expire_at = %expires_at,
993                            "Sector expires later, remembering sector expiration"
994                        );
995
996                        handlers.sector_update.call_simple(&(
997                            sector_index,
998                            SectorUpdate::Expiration(SectorExpirationDetails::Determined {
999                                expires_at,
1000                            }),
1001                        ));
1002
1003                        // Store expiration so we don't have to recalculate it later
1004                        if let Some(expires_at_entry) =
1005                            sectors_expire_at.get_mut(usize::from(sector_index))
1006                        {
1007                            expires_at_entry.replace(expires_at);
1008                        }
1009                    }
1010                }
1011            }
1012        }
1013        drop(sectors_metadata);
1014
1015        let sectors_queued = sectors_to_replot.len();
1016        sectors_to_replot.sort_by_key(|sector_to_replot| sector_to_replot.expires_at);
1017        for (index, SectorToReplot { sector_index, .. }) in sectors_to_replot.drain(..).enumerate()
1018        {
1019            let (acknowledgement_sender, acknowledgement_receiver) = oneshot::channel();
1020            if let Err(error) = sectors_to_plot_sender
1021                .send(SectorToPlot {
1022                    sector_index,
1023                    progress: index as f32 / sectors_queued as f32 * 100.0,
1024                    last_queued: index + 1 == sectors_queued,
1025                    acknowledgement_sender,
1026                })
1027                .await
1028            {
1029                warn!(%error, "Failed to send sector index for replotting");
1030                return Ok(());
1031            }
1032
1033            // We do not care if message was sent back or sender was just dropped
1034            let _ = acknowledgement_receiver.await;
1035
1036            if let Some(expires_at_entry) = sectors_expire_at.get_mut(usize::from(sector_index)) {
1037                expires_at_entry.take();
1038            }
1039        }
1040
1041        if archived_segments_receiver.changed().await.is_err() {
1042            break;
1043        }
1044    }
1045
1046    Ok(())
1047}