ab_farmer/single_disk_farm/
plotting.rs

1use crate::farm::{SectorExpirationDetails, SectorPlottingDetails, SectorUpdate};
2use crate::node_client::NodeClient;
3use crate::plotter::{Plotter, SectorPlottingProgress};
4use crate::single_disk_farm::direct_io_file_wrapper::DirectIoFileWrapper;
5use crate::single_disk_farm::metrics::{SectorState, SingleDiskFarmMetrics};
6use crate::single_disk_farm::{
7    BackgroundTaskError, Handlers, PlotMetadataHeader, RESERVED_PLOT_METADATA,
8};
9use ab_core_primitives::ed25519::Ed25519PublicKey;
10use ab_core_primitives::hashes::Blake3Hash;
11use ab_core_primitives::pieces::PieceOffset;
12use ab_core_primitives::sectors::{SectorId, SectorIndex};
13use ab_core_primitives::segments::{HistorySize, SegmentHeader, SegmentIndex};
14use ab_farmer_components::file_ext::FileExt;
15use ab_farmer_components::plotting::PlottedSector;
16use ab_farmer_components::sector::SectorMetadataChecksummed;
17use ab_farmer_components::shard_commitment::ShardCommitmentsRootsCache;
18use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock, Semaphore, SemaphoreGuard};
19use futures::channel::{mpsc, oneshot};
20use futures::stream::FuturesOrdered;
21use futures::{FutureExt, SinkExt, StreamExt, select};
22use parity_scale_codec::Encode;
23use parking_lot::RwLock;
24use rand::prelude::*;
25use std::collections::HashSet;
26use std::future::Future;
27use std::io;
28use std::num::NonZeroUsize;
29use std::ops::Range;
30use std::pin::pin;
31use std::sync::Arc;
32use std::time::{Duration, Instant};
33use thiserror::Error;
34use tokio::sync::watch;
35use tokio::task;
36use tracing::{Instrument, debug, info, info_span, trace, warn};
37
38const FARMER_APP_INFO_RETRY_INTERVAL: Duration = Duration::from_millis(500);
39const PLOTTING_RETRY_DELAY: Duration = Duration::from_secs(1);
40
41pub(super) struct SectorToPlot {
42    sector_index: SectorIndex,
43    /// Progress so far in % (not including this sector)
44    progress: f32,
45    /// Whether this is the last sector queued so far
46    last_queued: bool,
47    acknowledgement_sender: oneshot::Sender<()>,
48}
49
50/// Errors that happen during plotting
51#[derive(Debug, Error)]
52pub enum PlottingError {
53    /// Failed to retrieve farmer info
54    #[error("Failed to retrieve farmer info: {error}")]
55    FailedToGetFarmerInfo {
56        /// Lower-level error
57        error: anyhow::Error,
58    },
59    /// Failed to get segment header
60    #[error("Failed to get segment header: {error}")]
61    FailedToGetSegmentHeader {
62        /// Lower-level error
63        error: anyhow::Error,
64    },
65    /// Missing archived segment header
66    #[error("Missing archived segment header: {segment_index}")]
67    MissingArchivedSegmentHeader {
68        /// Segment index that was missing
69        segment_index: SegmentIndex,
70    },
71    /// Failed to subscribe to archived segments
72    #[error("Failed to subscribe to archived segments: {error}")]
73    FailedToSubscribeArchivedSegments {
74        /// Lower-level error
75        error: anyhow::Error,
76    },
77    /// Low-level plotting error
78    #[error("Low-level plotting error: {0}")]
79    LowLevel(String),
80    /// I/O error occurred
81    #[error("Plotting I/O error: {0}")]
82    Io(#[from] io::Error),
83    /// Background downloading panicked
84    #[error("Background downloading panicked")]
85    BackgroundDownloadingPanicked,
86}
87
88pub(super) struct SectorPlottingOptions<'a, NC> {
89    pub(super) public_key: Ed25519PublicKey,
90    pub(super) shard_commitments_roots_cache: ShardCommitmentsRootsCache,
91    pub(super) node_client: &'a NC,
92    pub(super) pieces_in_sector: u16,
93    pub(super) sector_size: usize,
94    pub(super) plot_file: Arc<DirectIoFileWrapper>,
95    pub(super) metadata_file: Arc<DirectIoFileWrapper>,
96    pub(super) handlers: &'a Handlers,
97    pub(super) global_mutex: &'a AsyncMutex<()>,
98    pub(super) plotter: Arc<dyn Plotter>,
99    pub(super) metrics: Option<Arc<SingleDiskFarmMetrics>>,
100}
101
102pub(super) struct PlottingOptions<'a, NC> {
103    pub(super) metadata_header: PlotMetadataHeader,
104    pub(super) sectors_metadata: &'a AsyncRwLock<Vec<SectorMetadataChecksummed>>,
105    pub(super) sectors_being_modified: &'a AsyncRwLock<HashSet<SectorIndex>>,
106    pub(super) sectors_to_plot_receiver: mpsc::Receiver<SectorToPlot>,
107    pub(super) sector_plotting_options: SectorPlottingOptions<'a, NC>,
108    pub(super) max_plotting_sectors_per_farm: NonZeroUsize,
109}
110
111/// Starts plotting process.
112///
113/// NOTE: Returned future is async, but does blocking operations and should be running in dedicated
114/// thread.
115pub(super) async fn plotting<NC>(
116    plotting_options: PlottingOptions<'_, NC>,
117) -> Result<(), PlottingError>
118where
119    NC: NodeClient,
120{
121    let PlottingOptions {
122        mut metadata_header,
123        sectors_metadata,
124        sectors_being_modified,
125        mut sectors_to_plot_receiver,
126        sector_plotting_options,
127        max_plotting_sectors_per_farm,
128    } = plotting_options;
129
130    let sector_plotting_options = &sector_plotting_options;
131    let plotting_semaphore = Semaphore::new(max_plotting_sectors_per_farm.get());
132    let max_used_history_size = RwLock::new(
133        sectors_metadata
134            .read()
135            .await
136            .iter()
137            .map(|sector_metadata| sector_metadata.history_size)
138            .max()
139            .unwrap_or(HistorySize::ONE),
140    );
141    let mut sectors_being_plotted = FuturesOrdered::new();
142    // Channel size is intentionally unbounded for easier analysis, but it is bounded by plotting
143    // semaphore in practice due to permit stored in `SectorPlottingResult`
144    let (sector_plotting_result_sender, mut sector_plotting_result_receiver) = mpsc::unbounded();
145    let process_plotting_result_fut = async move {
146        while let Some(sector_plotting_result) = sector_plotting_result_receiver.next().await {
147            process_plotting_result(
148                sector_plotting_result,
149                sectors_metadata,
150                sectors_being_modified,
151                &mut metadata_header,
152                Arc::clone(&sector_plotting_options.metadata_file),
153            )
154            .await?;
155        }
156
157        unreachable!(
158            "Stream will not end before the rest of the plotting process is shutting down"
159        );
160    };
161    let process_plotting_result_fut = process_plotting_result_fut.fuse();
162    let mut process_plotting_result_fut = pin!(process_plotting_result_fut);
163
164    // Wait for new sectors to plot from `sectors_to_plot_receiver` and wait for sectors that
165    // already started plotting to finish plotting and then update metadata header
166    loop {
167        select! {
168            maybe_sector_to_plot = sectors_to_plot_receiver.next() => {
169                let Some(sector_to_plot) = maybe_sector_to_plot else {
170                    break;
171                };
172
173                let sector_index = sector_to_plot.sector_index;
174                let sector_plotting_init_fut = plot_single_sector(
175                    sector_to_plot,
176                    sector_plotting_options,
177                    sectors_metadata,
178                    sectors_being_modified,
179                    &plotting_semaphore,
180                    &max_used_history_size,
181                )
182                    .instrument(info_span!("", %sector_index))
183                    .fuse();
184                let mut sector_plotting_init_fut = pin!(sector_plotting_init_fut);
185
186                // Wait for plotting of new sector to start (backpressure), while also waiting
187                // for sectors that already started plotting to finish plotting and then update
188                // metadata header
189                loop {
190                    select! {
191                        sector_plotting_init_result = sector_plotting_init_fut => {
192                            let sector_plotting_fut = match sector_plotting_init_result {
193                                PlotSingleSectorResult::Scheduled(future) => future,
194                                PlotSingleSectorResult::Skipped => {
195                                    break;
196                                }
197                                PlotSingleSectorResult::FatalError(error) => {
198                                    return Err(error);
199                                }
200                            };
201                            sectors_being_plotted.push_back(
202                                sector_plotting_fut.instrument(info_span!("", %sector_index))
203                            );
204                            break;
205                        }
206                        maybe_sector_plotting_result = sectors_being_plotted.select_next_some() => {
207                            sector_plotting_result_sender
208                                .unbounded_send(maybe_sector_plotting_result?)
209                                .expect("Sending means receiver is not dropped yet; qed");
210                        }
211                        result = process_plotting_result_fut => {
212                            return result;
213                        }
214                    }
215                }
216            }
217            maybe_sector_plotting_result = sectors_being_plotted.select_next_some() => {
218                sector_plotting_result_sender
219                    .unbounded_send(maybe_sector_plotting_result?)
220                    .expect("Sending means receiver is not dropped yet; qed");
221            }
222            result = process_plotting_result_fut => {
223                return result;
224            }
225        }
226    }
227
228    Ok(())
229}
230
231async fn process_plotting_result(
232    sector_plotting_result: SectorPlottingResult<'_>,
233    sectors_metadata: &AsyncRwLock<Vec<SectorMetadataChecksummed>>,
234    sectors_being_modified: &AsyncRwLock<HashSet<SectorIndex>>,
235    metadata_header: &mut PlotMetadataHeader,
236    metadata_file: Arc<DirectIoFileWrapper>,
237) -> Result<(), PlottingError> {
238    let SectorPlottingResult {
239        sector_metadata,
240        replotting,
241        last_queued,
242        plotting_permit,
243    } = sector_plotting_result;
244
245    let sector_index = sector_metadata.sector_index;
246
247    {
248        let mut sectors_metadata = sectors_metadata.write().await;
249        // If exists then we're replotting, otherwise we create sector for the first time
250        if let Some(existing_sector_metadata) = sectors_metadata.get_mut(usize::from(sector_index))
251        {
252            *existing_sector_metadata = sector_metadata;
253        } else {
254            sectors_metadata.push(sector_metadata);
255        }
256    }
257
258    // Inform others that this sector is no longer being modified
259    sectors_being_modified.write().await.remove(&sector_index);
260
261    if u16::from(sector_index) + 1 > metadata_header.plotted_sector_count {
262        metadata_header.plotted_sector_count = u16::from(sector_index) + 1;
263
264        let encoded_metadata_header = metadata_header.encode();
265        let write_fut =
266            task::spawn_blocking(move || metadata_file.write_all_at(&encoded_metadata_header, 0));
267        write_fut.await.map_err(|error| {
268            PlottingError::LowLevel(format!("Failed to spawn blocking tokio task: {error}"))
269        })??;
270    }
271
272    if last_queued {
273        if replotting {
274            info!("Replotting complete");
275        } else {
276            info!("Initial plotting complete");
277        }
278    }
279
280    drop(plotting_permit);
281
282    Ok(())
283}
284
285enum PlotSingleSectorResult<F> {
286    Scheduled(F),
287    Skipped,
288    FatalError(PlottingError),
289}
290
291struct SectorPlottingResult<'a> {
292    sector_metadata: SectorMetadataChecksummed,
293    replotting: bool,
294    last_queued: bool,
295    plotting_permit: SemaphoreGuard<'a>,
296}
297
298async fn plot_single_sector<'a, NC>(
299    sector_to_plot: SectorToPlot,
300    sector_plotting_options: &'a SectorPlottingOptions<'a, NC>,
301    sectors_metadata: &'a AsyncRwLock<Vec<SectorMetadataChecksummed>>,
302    sectors_being_modified: &'a AsyncRwLock<HashSet<SectorIndex>>,
303    plotting_semaphore: &'a Semaphore,
304    max_used_history_size: &RwLock<HistorySize>,
305) -> PlotSingleSectorResult<
306    impl Future<Output = Result<SectorPlottingResult<'a>, PlottingError>> + 'a,
307>
308where
309    NC: NodeClient,
310{
311    let SectorPlottingOptions {
312        public_key,
313        shard_commitments_roots_cache,
314        node_client,
315        pieces_in_sector,
316        sector_size,
317        plot_file,
318        metadata_file,
319        handlers,
320        global_mutex,
321        plotter,
322        metrics,
323    } = sector_plotting_options;
324
325    let SectorToPlot {
326        sector_index,
327        progress,
328        last_queued,
329        acknowledgement_sender: _acknowledgement_sender,
330    } = sector_to_plot;
331    trace!("Preparing to plot sector");
332
333    // Inform others that this sector is being modified
334    {
335        let mut sectors_being_modified = sectors_being_modified.write().await;
336        if !sectors_being_modified.insert(sector_index) {
337            debug!("Skipped sector plotting, it is already in progress");
338            return PlotSingleSectorResult::Skipped;
339        }
340    }
341
342    let plotting_permit = plotting_semaphore.acquire().await;
343
344    let maybe_old_sector_metadata = sectors_metadata
345        .read()
346        .await
347        .get(usize::from(sector_index))
348        .cloned();
349    let replotting = maybe_old_sector_metadata.is_some();
350
351    if let Some(metrics) = metrics {
352        metrics.sector_plotting.inc();
353    }
354    let sector_state = SectorUpdate::Plotting(SectorPlottingDetails::Starting {
355        progress,
356        replotting,
357        last_queued,
358    });
359    handlers
360        .sector_update
361        .call_simple(&(sector_index, sector_state));
362
363    let start = Instant::now();
364
365    // This `loop` is a workaround for an edge-case in local setup if expiration is configured to 1.
366    // In that scenario we get a replotting notification essentially straight from a block import
367    // pipeline of the node, before the block is imported. This can result in a later request for
368    // farmer app info to return old data, meaning we're replotting exactly the same sector that
369    // just expired.
370    let mut protocol_info = loop {
371        let protocol_info = match node_client.farmer_app_info().await {
372            Ok(farmer_app_info) => farmer_app_info.protocol_info,
373            Err(error) => {
374                return PlotSingleSectorResult::FatalError(PlottingError::FailedToGetFarmerInfo {
375                    error,
376                });
377            }
378        };
379
380        if let Some(old_sector_metadata) = &maybe_old_sector_metadata
381            && protocol_info.history_size <= old_sector_metadata.history_size
382        {
383            if protocol_info.min_sector_lifetime == HistorySize::ONE {
384                debug!(
385                    current_history_size = %protocol_info.history_size,
386                    old_sector_history_size = %old_sector_metadata.history_size,
387                    "Latest protocol history size is not yet newer than old sector history \
388                    size, wait for a bit and try again"
389                );
390                tokio::time::sleep(FARMER_APP_INFO_RETRY_INTERVAL).await;
391                continue;
392            } else {
393                debug!(
394                    current_history_size = %protocol_info.history_size,
395                    old_sector_history_size = %old_sector_metadata.history_size,
396                    "Skipped sector plotting, likely redundant due to redundant archived \
397                    segment notification"
398                );
399                return PlotSingleSectorResult::Skipped;
400            }
401        }
402
403        break protocol_info;
404    };
405
406    // Maintain the current max history size unless it is too old. This reduces the diversity of
407    // shard assignment, making it easier for farmers to run nodes.
408    {
409        let mut max_used_history_size = max_used_history_size.write();
410        if maybe_old_sector_metadata
411            .as_ref()
412            .map(|sector_metadata| sector_metadata.history_size)
413            != Some(*max_used_history_size)
414        {
415            // TODO: This may hypothetically result in plotting expired sectors. Add sector
416            //  expiration check in the future if it ends up being a real concern.
417            protocol_info.history_size = *max_used_history_size;
418        } else {
419            *max_used_history_size = protocol_info.history_size;
420        }
421    }
422
423    let (progress_sender, mut progress_receiver) = mpsc::channel(10);
424    let shard_commitments_root = shard_commitments_roots_cache.get(protocol_info.history_size);
425
426    // Initiate plotting
427    plotter
428        .plot_sector(
429            *public_key,
430            shard_commitments_root,
431            sector_index,
432            protocol_info,
433            *pieces_in_sector,
434            replotting,
435            progress_sender,
436        )
437        .await;
438
439    if replotting {
440        info!("Replotting sector ({progress:.2}% complete)");
441    } else {
442        info!("Plotting sector ({progress:.2}% complete)");
443    }
444
445    PlotSingleSectorResult::Scheduled(async move {
446        let plotted_sector = loop {
447            match plot_single_sector_internal(
448                sector_index,
449                *sector_size,
450                plot_file,
451                metadata_file,
452                handlers,
453                global_mutex,
454                progress_receiver,
455                metrics,
456            )
457            .await?
458            {
459                Ok(plotted_sector) => {
460                    break plotted_sector;
461                }
462                Err(error) => {
463                    warn!(
464                        %error,
465                        "Failed to plot sector, retrying in {PLOTTING_RETRY_DELAY:?}"
466                    );
467
468                    tokio::time::sleep(PLOTTING_RETRY_DELAY).await;
469                }
470            }
471
472            let (retry_progress_sender, retry_progress_receiver) = mpsc::channel(10);
473            progress_receiver = retry_progress_receiver;
474
475            // Initiate plotting
476            plotter
477                .plot_sector(
478                    *public_key,
479                    shard_commitments_root,
480                    sector_index,
481                    protocol_info,
482                    *pieces_in_sector,
483                    replotting,
484                    retry_progress_sender,
485                )
486                .await;
487
488            if replotting {
489                info!("Replotting sector retry");
490            } else {
491                info!("Plotting sector retry");
492            }
493        };
494
495        let maybe_old_plotted_sector = maybe_old_sector_metadata.map(|old_sector_metadata| {
496            let old_history_size = old_sector_metadata.history_size;
497
498            PlottedSector {
499                sector_id: plotted_sector.sector_id,
500                sector_index: plotted_sector.sector_index,
501                sector_metadata: old_sector_metadata,
502                piece_indexes: {
503                    let mut piece_indexes = Vec::with_capacity(usize::from(*pieces_in_sector));
504                    (PieceOffset::ZERO..)
505                        .take(usize::from(*pieces_in_sector))
506                        .map(|piece_offset| {
507                            plotted_sector.sector_id.derive_piece_index(
508                                piece_offset,
509                                old_history_size,
510                                protocol_info.max_pieces_in_sector,
511                                protocol_info.recent_segments,
512                                protocol_info.recent_history_fraction,
513                            )
514                        })
515                        .collect_into(&mut piece_indexes);
516                    piece_indexes
517                },
518            }
519        });
520
521        if replotting {
522            debug!("Sector replotted successfully");
523        } else {
524            debug!("Sector plotted successfully");
525        }
526
527        let sector_metadata = plotted_sector.sector_metadata.clone();
528
529        let time = start.elapsed();
530        if let Some(metrics) = metrics {
531            metrics.sector_plotting_time.observe(time.as_secs_f64());
532            metrics.sector_plotted.inc();
533            metrics.update_sector_state(SectorState::Plotted);
534        }
535        let sector_state = SectorUpdate::Plotting(SectorPlottingDetails::Finished {
536            plotted_sector,
537            old_plotted_sector: maybe_old_plotted_sector,
538            time,
539        });
540        handlers
541            .sector_update
542            .call_simple(&(sector_index, sector_state));
543
544        Ok(SectorPlottingResult {
545            sector_metadata,
546            replotting,
547            last_queued,
548            plotting_permit,
549        })
550    })
551}
552
553/// Outer error is used to indicate irrecoverable plotting errors, while inner result is for
554/// recoverable errors
555#[expect(clippy::too_many_arguments)]
556async fn plot_single_sector_internal(
557    sector_index: SectorIndex,
558    sector_size: usize,
559    plot_file: &Arc<DirectIoFileWrapper>,
560    metadata_file: &Arc<DirectIoFileWrapper>,
561    handlers: &Handlers,
562    global_mutex: &AsyncMutex<()>,
563    mut progress_receiver: mpsc::Receiver<SectorPlottingProgress>,
564    metrics: &Option<Arc<SingleDiskFarmMetrics>>,
565) -> Result<Result<PlottedSector, PlottingError>, PlottingError> {
566    // Process plotting progress notifications
567    let progress_processor_fut = async {
568        while let Some(progress) = progress_receiver.next().await {
569            match progress {
570                SectorPlottingProgress::Downloading => {
571                    if let Some(metrics) = metrics {
572                        metrics.sector_downloading.inc();
573                    }
574                    handlers.sector_update.call_simple(&(
575                        sector_index,
576                        SectorUpdate::Plotting(SectorPlottingDetails::Downloading),
577                    ));
578                }
579                SectorPlottingProgress::Downloaded(time) => {
580                    if let Some(metrics) = metrics {
581                        metrics.sector_downloading_time.observe(time.as_secs_f64());
582                        metrics.sector_downloaded.inc();
583                    }
584                    handlers.sector_update.call_simple(&(
585                        sector_index,
586                        SectorUpdate::Plotting(SectorPlottingDetails::Downloaded(time)),
587                    ));
588                }
589                SectorPlottingProgress::Encoding => {
590                    if let Some(metrics) = metrics {
591                        metrics.sector_encoding.inc();
592                    }
593                    handlers.sector_update.call_simple(&(
594                        sector_index,
595                        SectorUpdate::Plotting(SectorPlottingDetails::Encoding),
596                    ));
597                }
598                SectorPlottingProgress::Encoded(time) => {
599                    if let Some(metrics) = metrics {
600                        metrics.sector_encoding_time.observe(time.as_secs_f64());
601                        metrics.sector_encoded.inc();
602                    }
603                    handlers.sector_update.call_simple(&(
604                        sector_index,
605                        SectorUpdate::Plotting(SectorPlottingDetails::Encoded(time)),
606                    ));
607                }
608                SectorPlottingProgress::Finished {
609                    plotted_sector,
610                    time: _,
611                    sector,
612                } => {
613                    return Ok((plotted_sector, sector));
614                }
615                SectorPlottingProgress::Error { error } => {
616                    if let Some(metrics) = metrics {
617                        metrics.sector_plotting_error.inc();
618                    }
619                    handlers.sector_update.call_simple(&(
620                        sector_index,
621                        SectorUpdate::Plotting(SectorPlottingDetails::Error(error.clone())),
622                    ));
623                    return Err(error);
624                }
625            }
626        }
627
628        Err("Plotting progress stream ended before plotting finished".to_string())
629    };
630
631    let (plotted_sector, mut sector) = match progress_processor_fut.await {
632        Ok(result) => result,
633        Err(error) => {
634            return Ok(Err(PlottingError::LowLevel(error)));
635        }
636    };
637
638    {
639        // Take mutex briefly to make sure writing is allowed right now
640        global_mutex.lock().await;
641
642        if let Some(metrics) = metrics {
643            metrics.sector_writing.inc();
644        }
645        handlers.sector_update.call_simple(&(
646            sector_index,
647            SectorUpdate::Plotting(SectorPlottingDetails::Writing),
648        ));
649
650        let start = Instant::now();
651
652        {
653            let sector_write_base_offset = u64::from(sector_index) * sector_size as u64;
654            let mut total_received = 0;
655            let mut sector_write_offset = sector_write_base_offset;
656            while let Some(maybe_sector_chunk) = sector.next().await {
657                let sector_chunk = match maybe_sector_chunk {
658                    Ok(sector_chunk) => sector_chunk,
659                    Err(error) => {
660                        return Ok(Err(PlottingError::LowLevel(format!(
661                            "Sector chunk receive error: {error}"
662                        ))));
663                    }
664                };
665
666                total_received += sector_chunk.len();
667
668                if total_received > sector_size {
669                    return Ok(Err(PlottingError::LowLevel(format!(
670                        "Received too many bytes {total_received} instead of expected \
671                        {sector_size} bytes"
672                    ))));
673                }
674
675                let sector_chunk_size = sector_chunk.len() as u64;
676
677                trace!(sector_chunk_size, "Writing sector chunk to disk");
678                let write_fut = task::spawn_blocking({
679                    let plot_file = Arc::clone(plot_file);
680
681                    move || plot_file.write_all_at(&sector_chunk, sector_write_offset)
682                });
683                write_fut.await.map_err(|error| {
684                    PlottingError::LowLevel(format!("Failed to spawn blocking tokio task: {error}"))
685                })??;
686
687                sector_write_offset += sector_chunk_size;
688            }
689            drop(sector);
690
691            if total_received != sector_size {
692                return Ok(Err(PlottingError::LowLevel(format!(
693                    "Received only {total_received} sector bytes out of {sector_size} \
694                    expected bytes"
695                ))));
696            }
697        }
698        {
699            let encoded_sector_metadata = plotted_sector.sector_metadata.encode();
700            let write_fut = task::spawn_blocking({
701                let metadata_file = Arc::clone(metadata_file);
702
703                move || {
704                    metadata_file.write_all_at(
705                        &encoded_sector_metadata,
706                        RESERVED_PLOT_METADATA
707                            + (u64::from(sector_index) * encoded_sector_metadata.len() as u64),
708                    )
709                }
710            });
711            write_fut.await.map_err(|error| {
712                PlottingError::LowLevel(format!("Failed to spawn blocking tokio task: {error}"))
713            })??;
714        }
715
716        let time = start.elapsed();
717        if let Some(metrics) = metrics {
718            metrics.sector_writing_time.observe(time.as_secs_f64());
719            metrics.sector_written.inc();
720        }
721        handlers.sector_update.call_simple(&(
722            sector_index,
723            SectorUpdate::Plotting(SectorPlottingDetails::Written(time)),
724        ));
725    }
726
727    Ok(Ok(plotted_sector))
728}
729
730pub(super) struct PlottingSchedulerOptions<NC> {
731    pub(super) public_key_hash: Blake3Hash,
732    pub(super) shard_commitments_roots_cache: ShardCommitmentsRootsCache,
733    pub(super) sectors_indices_left_to_plot: Range<SectorIndex>,
734    pub(super) target_sector_count: u16,
735    pub(super) last_archived_segment_index: SegmentIndex,
736    pub(super) min_sector_lifetime: HistorySize,
737    pub(super) node_client: NC,
738    pub(super) handlers: Arc<Handlers>,
739    pub(super) sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
740    pub(super) sectors_to_plot_sender: mpsc::Sender<SectorToPlot>,
741    // Max delay between segment header being acknowledged by farmer and potentially
742    // triggering replotting
743    pub(super) new_segment_processing_delay: Duration,
744    pub(super) metrics: Option<Arc<SingleDiskFarmMetrics>>,
745}
746
747pub(super) async fn plotting_scheduler<NC>(
748    plotting_scheduler_options: PlottingSchedulerOptions<NC>,
749) -> Result<(), BackgroundTaskError>
750where
751    NC: NodeClient,
752{
753    let PlottingSchedulerOptions {
754        public_key_hash,
755        shard_commitments_roots_cache,
756        sectors_indices_left_to_plot,
757        target_sector_count,
758        last_archived_segment_index,
759        min_sector_lifetime,
760        node_client,
761        handlers,
762        sectors_metadata,
763        sectors_to_plot_sender,
764        new_segment_processing_delay,
765        metrics,
766    } = plotting_scheduler_options;
767
768    // Create a proxy channel with atomically updatable last archived segment that
769    // allows to not buffer messages from RPC subscription, but also access the most
770    // recent value at any time
771    let last_archived_segment = node_client
772        .segment_headers(vec![last_archived_segment_index])
773        .await
774        .map_err(|error| PlottingError::FailedToGetSegmentHeader { error })?
775        .into_iter()
776        .next()
777        .flatten()
778        .ok_or(PlottingError::MissingArchivedSegmentHeader {
779            segment_index: last_archived_segment_index,
780        })?;
781
782    let (archived_segments_sender, archived_segments_receiver) =
783        watch::channel(last_archived_segment);
784
785    let read_archived_segments_notifications_fut = read_archived_segments_notifications(
786        &node_client,
787        archived_segments_sender,
788        new_segment_processing_delay,
789    );
790
791    let send_plotting_notifications_fut = send_plotting_notifications(
792        public_key_hash,
793        &shard_commitments_roots_cache,
794        sectors_indices_left_to_plot,
795        target_sector_count,
796        min_sector_lifetime,
797        &node_client,
798        &handlers,
799        sectors_metadata,
800        archived_segments_receiver,
801        sectors_to_plot_sender,
802        &metrics,
803    );
804
805    select! {
806        result = read_archived_segments_notifications_fut.fuse() => {
807            result
808        }
809        result = send_plotting_notifications_fut.fuse() => {
810            result
811        }
812    }
813}
814
815async fn read_archived_segments_notifications<NC>(
816    node_client: &NC,
817    archived_segments_sender: watch::Sender<SegmentHeader>,
818    new_segment_processing_delay: Duration,
819) -> Result<(), BackgroundTaskError>
820where
821    NC: NodeClient,
822{
823    info!("Subscribing to archived segments");
824
825    let mut archived_segments_notifications = node_client
826        .subscribe_archived_segment_headers()
827        .await
828        .map_err(|error| PlottingError::FailedToSubscribeArchivedSegments { error })?;
829
830    while let Some(segment_header) = archived_segments_notifications.next().await {
831        debug!(?segment_header, "New archived segment");
832        if let Err(error) = node_client
833            .acknowledge_archived_segment_header(segment_header.segment_index())
834            .await
835        {
836            debug!(%error, "Failed to acknowledge segment header");
837        }
838
839        // There is no urgent need to rush replotting sectors immediately and this delay allows for
840        // newly archived pieces to be both cached locally and on other farmers on the network
841        let delay = Duration::from_secs(rand::rng().random_range(
842            new_segment_processing_delay.as_secs() / 10..=new_segment_processing_delay.as_secs(),
843        ));
844        tokio::time::sleep(delay).await;
845
846        if archived_segments_sender.send(segment_header).is_err() {
847            break;
848        }
849    }
850
851    Ok(())
852}
853
854struct SectorToReplot {
855    sector_index: SectorIndex,
856    expires_at: SegmentIndex,
857}
858
859#[expect(clippy::too_many_arguments)]
860async fn send_plotting_notifications<NC>(
861    public_key_hash: Blake3Hash,
862    shard_commitments_roots_cache: &ShardCommitmentsRootsCache,
863    sectors_indices_left_to_plot: Range<SectorIndex>,
864    target_sector_count: u16,
865    min_sector_lifetime: HistorySize,
866    node_client: &NC,
867    handlers: &Handlers,
868    sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
869    mut archived_segments_receiver: watch::Receiver<SegmentHeader>,
870    mut sectors_to_plot_sender: mpsc::Sender<SectorToPlot>,
871    metrics: &Option<Arc<SingleDiskFarmMetrics>>,
872) -> Result<(), BackgroundTaskError>
873where
874    NC: NodeClient,
875{
876    // Finish initial plotting if some sectors were not plotted fully yet
877    for sector_index in sectors_indices_left_to_plot {
878        let (acknowledgement_sender, acknowledgement_receiver) = oneshot::channel();
879        if let Err(error) = sectors_to_plot_sender
880            .send(SectorToPlot {
881                sector_index,
882                progress: u16::from(sector_index) as f32 / target_sector_count as f32 * 100.0,
883                last_queued: u16::from(sector_index) + 1 == target_sector_count,
884                acknowledgement_sender,
885            })
886            .await
887        {
888            warn!(%error, "Failed to send sector index for initial plotting");
889            return Ok(());
890        }
891
892        // We do not care if message was sent back or sender was just dropped
893        let _ = acknowledgement_receiver.await;
894    }
895
896    let mut sectors_expire_at = vec![None::<SegmentIndex>; usize::from(target_sector_count)];
897    // 10% capacity is generous and should prevent reallocation in most cases
898    let mut sectors_to_replot = Vec::with_capacity(usize::from(target_sector_count) / 10);
899
900    loop {
901        let segment_index = archived_segments_receiver
902            .borrow_and_update()
903            .segment_index();
904        trace!(%segment_index, "New archived segment received");
905
906        let sectors_metadata = sectors_metadata.read().await;
907        let sectors_to_check = sectors_metadata
908            .iter()
909            .map(|sector_metadata| (sector_metadata.sector_index, sector_metadata.history_size));
910        for (sector_index, history_size) in sectors_to_check {
911            if let Some(Some(expires_at)) =
912                sectors_expire_at.get(usize::from(sector_index)).copied()
913            {
914                trace!(
915                    %sector_index,
916                    %history_size,
917                    %expires_at,
918                    "Checking sector for expiration"
919                );
920                // +1 means we will start replotting a bit before it actually expires to avoid
921                // storing expired sectors
922                if expires_at <= (segment_index + SegmentIndex::ONE) {
923                    debug!(
924                        %sector_index,
925                        %history_size,
926                        %expires_at,
927                        "Sector expires soon #1, scheduling replotting"
928                    );
929
930                    let expiration_details = if expires_at <= segment_index {
931                        if let Some(metrics) = metrics {
932                            metrics.update_sector_state(SectorState::Expired);
933                        }
934                        SectorExpirationDetails::Expired
935                    } else {
936                        if let Some(metrics) = metrics {
937                            metrics.update_sector_state(SectorState::AboutToExpire);
938                        }
939                        SectorExpirationDetails::AboutToExpire
940                    };
941                    handlers
942                        .sector_update
943                        .call_simple(&(sector_index, SectorUpdate::Expiration(expiration_details)));
944
945                    // Time to replot
946                    sectors_to_replot.push(SectorToReplot {
947                        sector_index,
948                        expires_at,
949                    });
950                }
951                continue;
952            }
953
954            let shard_commitments_root = shard_commitments_roots_cache.get(history_size);
955            if let Some(expiration_check_segment_index) = history_size
956                .sector_expiration_check(min_sector_lifetime)
957                .map(|expiration_check_history_size| expiration_check_history_size.segment_index())
958            {
959                trace!(
960                    %sector_index,
961                    %history_size,
962                    %expiration_check_segment_index,
963                    "Determined sector expiration check segment index"
964                );
965                let maybe_sector_expiration_check_segment_root = node_client
966                    .segment_headers(vec![expiration_check_segment_index])
967                    .await
968                    .map_err(|error| PlottingError::FailedToGetSegmentHeader { error })?
969                    .into_iter()
970                    .next()
971                    .flatten()
972                    .map(|segment_header| segment_header.segment_root);
973
974                if let Some(sector_expiration_check_segment_root) =
975                    maybe_sector_expiration_check_segment_root
976                {
977                    let sector_id = SectorId::new(
978                        &public_key_hash,
979                        &shard_commitments_root,
980                        sector_index,
981                        history_size,
982                    );
983                    let expiration_history_size = sector_id
984                        .derive_expiration_history_size(
985                            history_size,
986                            &sector_expiration_check_segment_root,
987                            min_sector_lifetime,
988                        )
989                        .expect(
990                            "Farmers internally stores correct history size in sector \
991                            metadata; qed",
992                        );
993
994                    let expires_at = expiration_history_size.segment_index();
995
996                    trace!(
997                        %sector_index,
998                        %history_size,
999                        sector_expire_at = %expires_at,
1000                        "Determined sector expiration segment index"
1001                    );
1002                    // +1 means we will start replotting a bit before it actually expires to avoid
1003                    // storing expired sectors
1004                    if expires_at <= (segment_index + SegmentIndex::ONE) {
1005                        debug!(
1006                            %sector_index,
1007                            %history_size,
1008                            %expires_at,
1009                            "Sector expires soon #2, scheduling replotting"
1010                        );
1011
1012                        let expiration_details = if expires_at <= segment_index {
1013                            if let Some(metrics) = metrics {
1014                                metrics.update_sector_state(SectorState::Expired);
1015                            }
1016                            SectorExpirationDetails::Expired
1017                        } else {
1018                            if let Some(metrics) = metrics {
1019                                metrics.update_sector_state(SectorState::AboutToExpire);
1020                            }
1021                            SectorExpirationDetails::AboutToExpire
1022                        };
1023                        handlers.sector_update.call_simple(&(
1024                            sector_index,
1025                            SectorUpdate::Expiration(expiration_details),
1026                        ));
1027
1028                        // Time to replot
1029                        sectors_to_replot.push(SectorToReplot {
1030                            sector_index,
1031                            expires_at,
1032                        });
1033                    } else {
1034                        trace!(
1035                            %sector_index,
1036                            %history_size,
1037                            sector_expire_at = %expires_at,
1038                            "Sector expires later, remembering sector expiration"
1039                        );
1040
1041                        handlers.sector_update.call_simple(&(
1042                            sector_index,
1043                            SectorUpdate::Expiration(SectorExpirationDetails::Determined {
1044                                expires_at,
1045                            }),
1046                        ));
1047
1048                        // Store expiration so we don't have to recalculate it later
1049                        if let Some(expires_at_entry) =
1050                            sectors_expire_at.get_mut(usize::from(sector_index))
1051                        {
1052                            expires_at_entry.replace(expires_at);
1053                        }
1054                    }
1055                }
1056            }
1057        }
1058        drop(sectors_metadata);
1059
1060        let sectors_queued = sectors_to_replot.len();
1061        sectors_to_replot.sort_by_key(|sector_to_replot| sector_to_replot.expires_at);
1062        for (index, SectorToReplot { sector_index, .. }) in sectors_to_replot.drain(..).enumerate()
1063        {
1064            let (acknowledgement_sender, acknowledgement_receiver) = oneshot::channel();
1065            if let Err(error) = sectors_to_plot_sender
1066                .send(SectorToPlot {
1067                    sector_index,
1068                    progress: index as f32 / sectors_queued as f32 * 100.0,
1069                    last_queued: index + 1 == sectors_queued,
1070                    acknowledgement_sender,
1071                })
1072                .await
1073            {
1074                warn!(%error, "Failed to send sector index for replotting");
1075                return Ok(());
1076            }
1077
1078            // We do not care if message was sent back or sender was just dropped
1079            let _ = acknowledgement_receiver.await;
1080
1081            if let Some(expires_at_entry) = sectors_expire_at.get_mut(usize::from(sector_index)) {
1082                expires_at_entry.take();
1083            }
1084        }
1085
1086        if archived_segments_receiver.changed().await.is_err() {
1087            break;
1088        }
1089    }
1090
1091    Ok(())
1092}