1use crate::farm::{SectorExpirationDetails, SectorPlottingDetails, SectorUpdate};
2use crate::node_client::NodeClient;
3use crate::plotter::{Plotter, SectorPlottingProgress};
4use crate::single_disk_farm::direct_io_file_wrapper::DirectIoFileWrapper;
5use crate::single_disk_farm::metrics::{SectorState, SingleDiskFarmMetrics};
6use crate::single_disk_farm::{
7 BackgroundTaskError, Handlers, PlotMetadataHeader, RESERVED_PLOT_METADATA,
8};
9use ab_core_primitives::ed25519::Ed25519PublicKey;
10use ab_core_primitives::hashes::Blake3Hash;
11use ab_core_primitives::pieces::PieceOffset;
12use ab_core_primitives::sectors::{SectorId, SectorIndex};
13use ab_core_primitives::segments::{HistorySize, SegmentHeader, SegmentIndex};
14use ab_farmer_components::file_ext::FileExt;
15use ab_farmer_components::plotting::PlottedSector;
16use ab_farmer_components::sector::SectorMetadataChecksummed;
17use ab_farmer_components::shard_commitment::ShardCommitmentsRootsCache;
18use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock, Semaphore, SemaphoreGuard};
19use futures::channel::{mpsc, oneshot};
20use futures::stream::FuturesOrdered;
21use futures::{FutureExt, SinkExt, StreamExt, select};
22use parity_scale_codec::Encode;
23use parking_lot::RwLock;
24use rand::prelude::*;
25use std::collections::HashSet;
26use std::future::Future;
27use std::io;
28use std::num::NonZeroUsize;
29use std::ops::Range;
30use std::pin::pin;
31use std::sync::Arc;
32use std::time::{Duration, Instant};
33use thiserror::Error;
34use tokio::sync::watch;
35use tokio::task;
36use tracing::{Instrument, debug, info, info_span, trace, warn};
37
38const FARMER_APP_INFO_RETRY_INTERVAL: Duration = Duration::from_millis(500);
39const PLOTTING_RETRY_DELAY: Duration = Duration::from_secs(1);
40
41pub(super) struct SectorToPlot {
42 sector_index: SectorIndex,
43 progress: f32,
45 last_queued: bool,
47 acknowledgement_sender: oneshot::Sender<()>,
48}
49
50#[derive(Debug, Error)]
52pub enum PlottingError {
53 #[error("Failed to retrieve farmer info: {error}")]
55 FailedToGetFarmerInfo {
56 error: anyhow::Error,
58 },
59 #[error("Failed to get segment header: {error}")]
61 FailedToGetSegmentHeader {
62 error: anyhow::Error,
64 },
65 #[error("Missing archived segment header: {segment_index}")]
67 MissingArchivedSegmentHeader {
68 segment_index: SegmentIndex,
70 },
71 #[error("Failed to subscribe to archived segments: {error}")]
73 FailedToSubscribeArchivedSegments {
74 error: anyhow::Error,
76 },
77 #[error("Low-level plotting error: {0}")]
79 LowLevel(String),
80 #[error("Plotting I/O error: {0}")]
82 Io(#[from] io::Error),
83 #[error("Background downloading panicked")]
85 BackgroundDownloadingPanicked,
86}
87
88pub(super) struct SectorPlottingOptions<'a, NC> {
89 pub(super) public_key: Ed25519PublicKey,
90 pub(super) shard_commitments_roots_cache: ShardCommitmentsRootsCache,
91 pub(super) node_client: &'a NC,
92 pub(super) pieces_in_sector: u16,
93 pub(super) sector_size: usize,
94 pub(super) plot_file: Arc<DirectIoFileWrapper>,
95 pub(super) metadata_file: Arc<DirectIoFileWrapper>,
96 pub(super) handlers: &'a Handlers,
97 pub(super) global_mutex: &'a AsyncMutex<()>,
98 pub(super) plotter: Arc<dyn Plotter>,
99 pub(super) metrics: Option<Arc<SingleDiskFarmMetrics>>,
100}
101
102pub(super) struct PlottingOptions<'a, NC> {
103 pub(super) metadata_header: PlotMetadataHeader,
104 pub(super) sectors_metadata: &'a AsyncRwLock<Vec<SectorMetadataChecksummed>>,
105 pub(super) sectors_being_modified: &'a AsyncRwLock<HashSet<SectorIndex>>,
106 pub(super) sectors_to_plot_receiver: mpsc::Receiver<SectorToPlot>,
107 pub(super) sector_plotting_options: SectorPlottingOptions<'a, NC>,
108 pub(super) max_plotting_sectors_per_farm: NonZeroUsize,
109}
110
111pub(super) async fn plotting<NC>(
116 plotting_options: PlottingOptions<'_, NC>,
117) -> Result<(), PlottingError>
118where
119 NC: NodeClient,
120{
121 let PlottingOptions {
122 mut metadata_header,
123 sectors_metadata,
124 sectors_being_modified,
125 mut sectors_to_plot_receiver,
126 sector_plotting_options,
127 max_plotting_sectors_per_farm,
128 } = plotting_options;
129
130 let sector_plotting_options = §or_plotting_options;
131 let plotting_semaphore = Semaphore::new(max_plotting_sectors_per_farm.get());
132 let max_used_history_size = RwLock::new(
133 sectors_metadata
134 .read()
135 .await
136 .iter()
137 .map(|sector_metadata| sector_metadata.history_size)
138 .max()
139 .unwrap_or(HistorySize::ONE),
140 );
141 let mut sectors_being_plotted = FuturesOrdered::new();
142 let (sector_plotting_result_sender, mut sector_plotting_result_receiver) = mpsc::unbounded();
145 let process_plotting_result_fut = async move {
146 while let Some(sector_plotting_result) = sector_plotting_result_receiver.next().await {
147 process_plotting_result(
148 sector_plotting_result,
149 sectors_metadata,
150 sectors_being_modified,
151 &mut metadata_header,
152 Arc::clone(§or_plotting_options.metadata_file),
153 )
154 .await?;
155 }
156
157 unreachable!(
158 "Stream will not end before the rest of the plotting process is shutting down"
159 );
160 };
161 let process_plotting_result_fut = process_plotting_result_fut.fuse();
162 let mut process_plotting_result_fut = pin!(process_plotting_result_fut);
163
164 loop {
167 select! {
168 maybe_sector_to_plot = sectors_to_plot_receiver.next() => {
169 let Some(sector_to_plot) = maybe_sector_to_plot else {
170 break;
171 };
172
173 let sector_index = sector_to_plot.sector_index;
174 let sector_plotting_init_fut = plot_single_sector(
175 sector_to_plot,
176 sector_plotting_options,
177 sectors_metadata,
178 sectors_being_modified,
179 &plotting_semaphore,
180 &max_used_history_size,
181 )
182 .instrument(info_span!("", %sector_index))
183 .fuse();
184 let mut sector_plotting_init_fut = pin!(sector_plotting_init_fut);
185
186 loop {
190 select! {
191 sector_plotting_init_result = sector_plotting_init_fut => {
192 let sector_plotting_fut = match sector_plotting_init_result {
193 PlotSingleSectorResult::Scheduled(future) => future,
194 PlotSingleSectorResult::Skipped => {
195 break;
196 }
197 PlotSingleSectorResult::FatalError(error) => {
198 return Err(error);
199 }
200 };
201 sectors_being_plotted.push_back(
202 sector_plotting_fut.instrument(info_span!("", %sector_index))
203 );
204 break;
205 }
206 maybe_sector_plotting_result = sectors_being_plotted.select_next_some() => {
207 sector_plotting_result_sender
208 .unbounded_send(maybe_sector_plotting_result?)
209 .expect("Sending means receiver is not dropped yet; qed");
210 }
211 result = process_plotting_result_fut => {
212 return result;
213 }
214 }
215 }
216 }
217 maybe_sector_plotting_result = sectors_being_plotted.select_next_some() => {
218 sector_plotting_result_sender
219 .unbounded_send(maybe_sector_plotting_result?)
220 .expect("Sending means receiver is not dropped yet; qed");
221 }
222 result = process_plotting_result_fut => {
223 return result;
224 }
225 }
226 }
227
228 Ok(())
229}
230
231async fn process_plotting_result(
232 sector_plotting_result: SectorPlottingResult<'_>,
233 sectors_metadata: &AsyncRwLock<Vec<SectorMetadataChecksummed>>,
234 sectors_being_modified: &AsyncRwLock<HashSet<SectorIndex>>,
235 metadata_header: &mut PlotMetadataHeader,
236 metadata_file: Arc<DirectIoFileWrapper>,
237) -> Result<(), PlottingError> {
238 let SectorPlottingResult {
239 sector_metadata,
240 replotting,
241 last_queued,
242 plotting_permit,
243 } = sector_plotting_result;
244
245 let sector_index = sector_metadata.sector_index;
246
247 {
248 let mut sectors_metadata = sectors_metadata.write().await;
249 if let Some(existing_sector_metadata) = sectors_metadata.get_mut(usize::from(sector_index))
251 {
252 *existing_sector_metadata = sector_metadata;
253 } else {
254 sectors_metadata.push(sector_metadata);
255 }
256 }
257
258 sectors_being_modified.write().await.remove(§or_index);
260
261 if u16::from(sector_index) + 1 > metadata_header.plotted_sector_count {
262 metadata_header.plotted_sector_count = u16::from(sector_index) + 1;
263
264 let encoded_metadata_header = metadata_header.encode();
265 let write_fut =
266 task::spawn_blocking(move || metadata_file.write_all_at(&encoded_metadata_header, 0));
267 write_fut.await.map_err(|error| {
268 PlottingError::LowLevel(format!("Failed to spawn blocking tokio task: {error}"))
269 })??;
270 }
271
272 if last_queued {
273 if replotting {
274 info!("Replotting complete");
275 } else {
276 info!("Initial plotting complete");
277 }
278 }
279
280 drop(plotting_permit);
281
282 Ok(())
283}
284
285enum PlotSingleSectorResult<F> {
286 Scheduled(F),
287 Skipped,
288 FatalError(PlottingError),
289}
290
291struct SectorPlottingResult<'a> {
292 sector_metadata: SectorMetadataChecksummed,
293 replotting: bool,
294 last_queued: bool,
295 plotting_permit: SemaphoreGuard<'a>,
296}
297
298async fn plot_single_sector<'a, NC>(
299 sector_to_plot: SectorToPlot,
300 sector_plotting_options: &'a SectorPlottingOptions<'a, NC>,
301 sectors_metadata: &'a AsyncRwLock<Vec<SectorMetadataChecksummed>>,
302 sectors_being_modified: &'a AsyncRwLock<HashSet<SectorIndex>>,
303 plotting_semaphore: &'a Semaphore,
304 max_used_history_size: &RwLock<HistorySize>,
305) -> PlotSingleSectorResult<
306 impl Future<Output = Result<SectorPlottingResult<'a>, PlottingError>> + 'a,
307>
308where
309 NC: NodeClient,
310{
311 let SectorPlottingOptions {
312 public_key,
313 shard_commitments_roots_cache,
314 node_client,
315 pieces_in_sector,
316 sector_size,
317 plot_file,
318 metadata_file,
319 handlers,
320 global_mutex,
321 plotter,
322 metrics,
323 } = sector_plotting_options;
324
325 let SectorToPlot {
326 sector_index,
327 progress,
328 last_queued,
329 acknowledgement_sender: _acknowledgement_sender,
330 } = sector_to_plot;
331 trace!("Preparing to plot sector");
332
333 {
335 let mut sectors_being_modified = sectors_being_modified.write().await;
336 if !sectors_being_modified.insert(sector_index) {
337 debug!("Skipped sector plotting, it is already in progress");
338 return PlotSingleSectorResult::Skipped;
339 }
340 }
341
342 let plotting_permit = plotting_semaphore.acquire().await;
343
344 let maybe_old_sector_metadata = sectors_metadata
345 .read()
346 .await
347 .get(usize::from(sector_index))
348 .cloned();
349 let replotting = maybe_old_sector_metadata.is_some();
350
351 if let Some(metrics) = metrics {
352 metrics.sector_plotting.inc();
353 }
354 let sector_state = SectorUpdate::Plotting(SectorPlottingDetails::Starting {
355 progress,
356 replotting,
357 last_queued,
358 });
359 handlers
360 .sector_update
361 .call_simple(&(sector_index, sector_state));
362
363 let start = Instant::now();
364
365 let mut protocol_info = loop {
371 let protocol_info = match node_client.farmer_app_info().await {
372 Ok(farmer_app_info) => farmer_app_info.protocol_info,
373 Err(error) => {
374 return PlotSingleSectorResult::FatalError(PlottingError::FailedToGetFarmerInfo {
375 error,
376 });
377 }
378 };
379
380 if let Some(old_sector_metadata) = &maybe_old_sector_metadata
381 && protocol_info.history_size <= old_sector_metadata.history_size
382 {
383 if protocol_info.min_sector_lifetime == HistorySize::ONE {
384 debug!(
385 current_history_size = %protocol_info.history_size,
386 old_sector_history_size = %old_sector_metadata.history_size,
387 "Latest protocol history size is not yet newer than old sector history \
388 size, wait for a bit and try again"
389 );
390 tokio::time::sleep(FARMER_APP_INFO_RETRY_INTERVAL).await;
391 continue;
392 } else {
393 debug!(
394 current_history_size = %protocol_info.history_size,
395 old_sector_history_size = %old_sector_metadata.history_size,
396 "Skipped sector plotting, likely redundant due to redundant archived \
397 segment notification"
398 );
399 return PlotSingleSectorResult::Skipped;
400 }
401 }
402
403 break protocol_info;
404 };
405
406 {
409 let mut max_used_history_size = max_used_history_size.write();
410 if maybe_old_sector_metadata
411 .as_ref()
412 .map(|sector_metadata| sector_metadata.history_size)
413 != Some(*max_used_history_size)
414 {
415 protocol_info.history_size = *max_used_history_size;
418 } else {
419 *max_used_history_size = protocol_info.history_size;
420 }
421 }
422
423 let (progress_sender, mut progress_receiver) = mpsc::channel(10);
424 let shard_commitments_root = shard_commitments_roots_cache.get(protocol_info.history_size);
425
426 plotter
428 .plot_sector(
429 *public_key,
430 shard_commitments_root,
431 sector_index,
432 protocol_info,
433 *pieces_in_sector,
434 replotting,
435 progress_sender,
436 )
437 .await;
438
439 if replotting {
440 info!("Replotting sector ({progress:.2}% complete)");
441 } else {
442 info!("Plotting sector ({progress:.2}% complete)");
443 }
444
445 PlotSingleSectorResult::Scheduled(async move {
446 let plotted_sector = loop {
447 match plot_single_sector_internal(
448 sector_index,
449 *sector_size,
450 plot_file,
451 metadata_file,
452 handlers,
453 global_mutex,
454 progress_receiver,
455 metrics,
456 )
457 .await?
458 {
459 Ok(plotted_sector) => {
460 break plotted_sector;
461 }
462 Err(error) => {
463 warn!(
464 %error,
465 "Failed to plot sector, retrying in {PLOTTING_RETRY_DELAY:?}"
466 );
467
468 tokio::time::sleep(PLOTTING_RETRY_DELAY).await;
469 }
470 }
471
472 let (retry_progress_sender, retry_progress_receiver) = mpsc::channel(10);
473 progress_receiver = retry_progress_receiver;
474
475 plotter
477 .plot_sector(
478 *public_key,
479 shard_commitments_root,
480 sector_index,
481 protocol_info,
482 *pieces_in_sector,
483 replotting,
484 retry_progress_sender,
485 )
486 .await;
487
488 if replotting {
489 info!("Replotting sector retry");
490 } else {
491 info!("Plotting sector retry");
492 }
493 };
494
495 let maybe_old_plotted_sector = maybe_old_sector_metadata.map(|old_sector_metadata| {
496 let old_history_size = old_sector_metadata.history_size;
497
498 PlottedSector {
499 sector_id: plotted_sector.sector_id,
500 sector_index: plotted_sector.sector_index,
501 sector_metadata: old_sector_metadata,
502 piece_indexes: {
503 let mut piece_indexes = Vec::with_capacity(usize::from(*pieces_in_sector));
504 (PieceOffset::ZERO..)
505 .take(usize::from(*pieces_in_sector))
506 .map(|piece_offset| {
507 plotted_sector.sector_id.derive_piece_index(
508 piece_offset,
509 old_history_size,
510 protocol_info.max_pieces_in_sector,
511 protocol_info.recent_segments,
512 protocol_info.recent_history_fraction,
513 )
514 })
515 .collect_into(&mut piece_indexes);
516 piece_indexes
517 },
518 }
519 });
520
521 if replotting {
522 debug!("Sector replotted successfully");
523 } else {
524 debug!("Sector plotted successfully");
525 }
526
527 let sector_metadata = plotted_sector.sector_metadata.clone();
528
529 let time = start.elapsed();
530 if let Some(metrics) = metrics {
531 metrics.sector_plotting_time.observe(time.as_secs_f64());
532 metrics.sector_plotted.inc();
533 metrics.update_sector_state(SectorState::Plotted);
534 }
535 let sector_state = SectorUpdate::Plotting(SectorPlottingDetails::Finished {
536 plotted_sector,
537 old_plotted_sector: maybe_old_plotted_sector,
538 time,
539 });
540 handlers
541 .sector_update
542 .call_simple(&(sector_index, sector_state));
543
544 Ok(SectorPlottingResult {
545 sector_metadata,
546 replotting,
547 last_queued,
548 plotting_permit,
549 })
550 })
551}
552
553#[expect(clippy::too_many_arguments)]
556async fn plot_single_sector_internal(
557 sector_index: SectorIndex,
558 sector_size: usize,
559 plot_file: &Arc<DirectIoFileWrapper>,
560 metadata_file: &Arc<DirectIoFileWrapper>,
561 handlers: &Handlers,
562 global_mutex: &AsyncMutex<()>,
563 mut progress_receiver: mpsc::Receiver<SectorPlottingProgress>,
564 metrics: &Option<Arc<SingleDiskFarmMetrics>>,
565) -> Result<Result<PlottedSector, PlottingError>, PlottingError> {
566 let progress_processor_fut = async {
568 while let Some(progress) = progress_receiver.next().await {
569 match progress {
570 SectorPlottingProgress::Downloading => {
571 if let Some(metrics) = metrics {
572 metrics.sector_downloading.inc();
573 }
574 handlers.sector_update.call_simple(&(
575 sector_index,
576 SectorUpdate::Plotting(SectorPlottingDetails::Downloading),
577 ));
578 }
579 SectorPlottingProgress::Downloaded(time) => {
580 if let Some(metrics) = metrics {
581 metrics.sector_downloading_time.observe(time.as_secs_f64());
582 metrics.sector_downloaded.inc();
583 }
584 handlers.sector_update.call_simple(&(
585 sector_index,
586 SectorUpdate::Plotting(SectorPlottingDetails::Downloaded(time)),
587 ));
588 }
589 SectorPlottingProgress::Encoding => {
590 if let Some(metrics) = metrics {
591 metrics.sector_encoding.inc();
592 }
593 handlers.sector_update.call_simple(&(
594 sector_index,
595 SectorUpdate::Plotting(SectorPlottingDetails::Encoding),
596 ));
597 }
598 SectorPlottingProgress::Encoded(time) => {
599 if let Some(metrics) = metrics {
600 metrics.sector_encoding_time.observe(time.as_secs_f64());
601 metrics.sector_encoded.inc();
602 }
603 handlers.sector_update.call_simple(&(
604 sector_index,
605 SectorUpdate::Plotting(SectorPlottingDetails::Encoded(time)),
606 ));
607 }
608 SectorPlottingProgress::Finished {
609 plotted_sector,
610 time: _,
611 sector,
612 } => {
613 return Ok((plotted_sector, sector));
614 }
615 SectorPlottingProgress::Error { error } => {
616 if let Some(metrics) = metrics {
617 metrics.sector_plotting_error.inc();
618 }
619 handlers.sector_update.call_simple(&(
620 sector_index,
621 SectorUpdate::Plotting(SectorPlottingDetails::Error(error.clone())),
622 ));
623 return Err(error);
624 }
625 }
626 }
627
628 Err("Plotting progress stream ended before plotting finished".to_string())
629 };
630
631 let (plotted_sector, mut sector) = match progress_processor_fut.await {
632 Ok(result) => result,
633 Err(error) => {
634 return Ok(Err(PlottingError::LowLevel(error)));
635 }
636 };
637
638 {
639 global_mutex.lock().await;
641
642 if let Some(metrics) = metrics {
643 metrics.sector_writing.inc();
644 }
645 handlers.sector_update.call_simple(&(
646 sector_index,
647 SectorUpdate::Plotting(SectorPlottingDetails::Writing),
648 ));
649
650 let start = Instant::now();
651
652 {
653 let sector_write_base_offset = u64::from(sector_index) * sector_size as u64;
654 let mut total_received = 0;
655 let mut sector_write_offset = sector_write_base_offset;
656 while let Some(maybe_sector_chunk) = sector.next().await {
657 let sector_chunk = match maybe_sector_chunk {
658 Ok(sector_chunk) => sector_chunk,
659 Err(error) => {
660 return Ok(Err(PlottingError::LowLevel(format!(
661 "Sector chunk receive error: {error}"
662 ))));
663 }
664 };
665
666 total_received += sector_chunk.len();
667
668 if total_received > sector_size {
669 return Ok(Err(PlottingError::LowLevel(format!(
670 "Received too many bytes {total_received} instead of expected \
671 {sector_size} bytes"
672 ))));
673 }
674
675 let sector_chunk_size = sector_chunk.len() as u64;
676
677 trace!(sector_chunk_size, "Writing sector chunk to disk");
678 let write_fut = task::spawn_blocking({
679 let plot_file = Arc::clone(plot_file);
680
681 move || plot_file.write_all_at(§or_chunk, sector_write_offset)
682 });
683 write_fut.await.map_err(|error| {
684 PlottingError::LowLevel(format!("Failed to spawn blocking tokio task: {error}"))
685 })??;
686
687 sector_write_offset += sector_chunk_size;
688 }
689 drop(sector);
690
691 if total_received != sector_size {
692 return Ok(Err(PlottingError::LowLevel(format!(
693 "Received only {total_received} sector bytes out of {sector_size} \
694 expected bytes"
695 ))));
696 }
697 }
698 {
699 let encoded_sector_metadata = plotted_sector.sector_metadata.encode();
700 let write_fut = task::spawn_blocking({
701 let metadata_file = Arc::clone(metadata_file);
702
703 move || {
704 metadata_file.write_all_at(
705 &encoded_sector_metadata,
706 RESERVED_PLOT_METADATA
707 + (u64::from(sector_index) * encoded_sector_metadata.len() as u64),
708 )
709 }
710 });
711 write_fut.await.map_err(|error| {
712 PlottingError::LowLevel(format!("Failed to spawn blocking tokio task: {error}"))
713 })??;
714 }
715
716 let time = start.elapsed();
717 if let Some(metrics) = metrics {
718 metrics.sector_writing_time.observe(time.as_secs_f64());
719 metrics.sector_written.inc();
720 }
721 handlers.sector_update.call_simple(&(
722 sector_index,
723 SectorUpdate::Plotting(SectorPlottingDetails::Written(time)),
724 ));
725 }
726
727 Ok(Ok(plotted_sector))
728}
729
730pub(super) struct PlottingSchedulerOptions<NC> {
731 pub(super) public_key_hash: Blake3Hash,
732 pub(super) shard_commitments_roots_cache: ShardCommitmentsRootsCache,
733 pub(super) sectors_indices_left_to_plot: Range<SectorIndex>,
734 pub(super) target_sector_count: u16,
735 pub(super) last_archived_segment_index: SegmentIndex,
736 pub(super) min_sector_lifetime: HistorySize,
737 pub(super) node_client: NC,
738 pub(super) handlers: Arc<Handlers>,
739 pub(super) sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
740 pub(super) sectors_to_plot_sender: mpsc::Sender<SectorToPlot>,
741 pub(super) new_segment_processing_delay: Duration,
744 pub(super) metrics: Option<Arc<SingleDiskFarmMetrics>>,
745}
746
747pub(super) async fn plotting_scheduler<NC>(
748 plotting_scheduler_options: PlottingSchedulerOptions<NC>,
749) -> Result<(), BackgroundTaskError>
750where
751 NC: NodeClient,
752{
753 let PlottingSchedulerOptions {
754 public_key_hash,
755 shard_commitments_roots_cache,
756 sectors_indices_left_to_plot,
757 target_sector_count,
758 last_archived_segment_index,
759 min_sector_lifetime,
760 node_client,
761 handlers,
762 sectors_metadata,
763 sectors_to_plot_sender,
764 new_segment_processing_delay,
765 metrics,
766 } = plotting_scheduler_options;
767
768 let last_archived_segment = node_client
772 .segment_headers(vec![last_archived_segment_index])
773 .await
774 .map_err(|error| PlottingError::FailedToGetSegmentHeader { error })?
775 .into_iter()
776 .next()
777 .flatten()
778 .ok_or(PlottingError::MissingArchivedSegmentHeader {
779 segment_index: last_archived_segment_index,
780 })?;
781
782 let (archived_segments_sender, archived_segments_receiver) =
783 watch::channel(last_archived_segment);
784
785 let read_archived_segments_notifications_fut = read_archived_segments_notifications(
786 &node_client,
787 archived_segments_sender,
788 new_segment_processing_delay,
789 );
790
791 let send_plotting_notifications_fut = send_plotting_notifications(
792 public_key_hash,
793 &shard_commitments_roots_cache,
794 sectors_indices_left_to_plot,
795 target_sector_count,
796 min_sector_lifetime,
797 &node_client,
798 &handlers,
799 sectors_metadata,
800 archived_segments_receiver,
801 sectors_to_plot_sender,
802 &metrics,
803 );
804
805 select! {
806 result = read_archived_segments_notifications_fut.fuse() => {
807 result
808 }
809 result = send_plotting_notifications_fut.fuse() => {
810 result
811 }
812 }
813}
814
815async fn read_archived_segments_notifications<NC>(
816 node_client: &NC,
817 archived_segments_sender: watch::Sender<SegmentHeader>,
818 new_segment_processing_delay: Duration,
819) -> Result<(), BackgroundTaskError>
820where
821 NC: NodeClient,
822{
823 info!("Subscribing to archived segments");
824
825 let mut archived_segments_notifications = node_client
826 .subscribe_archived_segment_headers()
827 .await
828 .map_err(|error| PlottingError::FailedToSubscribeArchivedSegments { error })?;
829
830 while let Some(segment_header) = archived_segments_notifications.next().await {
831 debug!(?segment_header, "New archived segment");
832 if let Err(error) = node_client
833 .acknowledge_archived_segment_header(segment_header.segment_index())
834 .await
835 {
836 debug!(%error, "Failed to acknowledge segment header");
837 }
838
839 let delay = Duration::from_secs(rand::rng().random_range(
842 new_segment_processing_delay.as_secs() / 10..=new_segment_processing_delay.as_secs(),
843 ));
844 tokio::time::sleep(delay).await;
845
846 if archived_segments_sender.send(segment_header).is_err() {
847 break;
848 }
849 }
850
851 Ok(())
852}
853
854struct SectorToReplot {
855 sector_index: SectorIndex,
856 expires_at: SegmentIndex,
857}
858
859#[expect(clippy::too_many_arguments)]
860async fn send_plotting_notifications<NC>(
861 public_key_hash: Blake3Hash,
862 shard_commitments_roots_cache: &ShardCommitmentsRootsCache,
863 sectors_indices_left_to_plot: Range<SectorIndex>,
864 target_sector_count: u16,
865 min_sector_lifetime: HistorySize,
866 node_client: &NC,
867 handlers: &Handlers,
868 sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
869 mut archived_segments_receiver: watch::Receiver<SegmentHeader>,
870 mut sectors_to_plot_sender: mpsc::Sender<SectorToPlot>,
871 metrics: &Option<Arc<SingleDiskFarmMetrics>>,
872) -> Result<(), BackgroundTaskError>
873where
874 NC: NodeClient,
875{
876 for sector_index in sectors_indices_left_to_plot {
878 let (acknowledgement_sender, acknowledgement_receiver) = oneshot::channel();
879 if let Err(error) = sectors_to_plot_sender
880 .send(SectorToPlot {
881 sector_index,
882 progress: u16::from(sector_index) as f32 / target_sector_count as f32 * 100.0,
883 last_queued: u16::from(sector_index) + 1 == target_sector_count,
884 acknowledgement_sender,
885 })
886 .await
887 {
888 warn!(%error, "Failed to send sector index for initial plotting");
889 return Ok(());
890 }
891
892 let _ = acknowledgement_receiver.await;
894 }
895
896 let mut sectors_expire_at = vec![None::<SegmentIndex>; usize::from(target_sector_count)];
897 let mut sectors_to_replot = Vec::with_capacity(usize::from(target_sector_count) / 10);
899
900 loop {
901 let segment_index = archived_segments_receiver
902 .borrow_and_update()
903 .segment_index();
904 trace!(%segment_index, "New archived segment received");
905
906 let sectors_metadata = sectors_metadata.read().await;
907 let sectors_to_check = sectors_metadata
908 .iter()
909 .map(|sector_metadata| (sector_metadata.sector_index, sector_metadata.history_size));
910 for (sector_index, history_size) in sectors_to_check {
911 if let Some(Some(expires_at)) =
912 sectors_expire_at.get(usize::from(sector_index)).copied()
913 {
914 trace!(
915 %sector_index,
916 %history_size,
917 %expires_at,
918 "Checking sector for expiration"
919 );
920 if expires_at <= (segment_index + SegmentIndex::ONE) {
923 debug!(
924 %sector_index,
925 %history_size,
926 %expires_at,
927 "Sector expires soon #1, scheduling replotting"
928 );
929
930 let expiration_details = if expires_at <= segment_index {
931 if let Some(metrics) = metrics {
932 metrics.update_sector_state(SectorState::Expired);
933 }
934 SectorExpirationDetails::Expired
935 } else {
936 if let Some(metrics) = metrics {
937 metrics.update_sector_state(SectorState::AboutToExpire);
938 }
939 SectorExpirationDetails::AboutToExpire
940 };
941 handlers
942 .sector_update
943 .call_simple(&(sector_index, SectorUpdate::Expiration(expiration_details)));
944
945 sectors_to_replot.push(SectorToReplot {
947 sector_index,
948 expires_at,
949 });
950 }
951 continue;
952 }
953
954 let shard_commitments_root = shard_commitments_roots_cache.get(history_size);
955 if let Some(expiration_check_segment_index) = history_size
956 .sector_expiration_check(min_sector_lifetime)
957 .map(|expiration_check_history_size| expiration_check_history_size.segment_index())
958 {
959 trace!(
960 %sector_index,
961 %history_size,
962 %expiration_check_segment_index,
963 "Determined sector expiration check segment index"
964 );
965 let maybe_sector_expiration_check_segment_root = node_client
966 .segment_headers(vec![expiration_check_segment_index])
967 .await
968 .map_err(|error| PlottingError::FailedToGetSegmentHeader { error })?
969 .into_iter()
970 .next()
971 .flatten()
972 .map(|segment_header| segment_header.segment_root);
973
974 if let Some(sector_expiration_check_segment_root) =
975 maybe_sector_expiration_check_segment_root
976 {
977 let sector_id = SectorId::new(
978 &public_key_hash,
979 &shard_commitments_root,
980 sector_index,
981 history_size,
982 );
983 let expiration_history_size = sector_id
984 .derive_expiration_history_size(
985 history_size,
986 §or_expiration_check_segment_root,
987 min_sector_lifetime,
988 )
989 .expect(
990 "Farmers internally stores correct history size in sector \
991 metadata; qed",
992 );
993
994 let expires_at = expiration_history_size.segment_index();
995
996 trace!(
997 %sector_index,
998 %history_size,
999 sector_expire_at = %expires_at,
1000 "Determined sector expiration segment index"
1001 );
1002 if expires_at <= (segment_index + SegmentIndex::ONE) {
1005 debug!(
1006 %sector_index,
1007 %history_size,
1008 %expires_at,
1009 "Sector expires soon #2, scheduling replotting"
1010 );
1011
1012 let expiration_details = if expires_at <= segment_index {
1013 if let Some(metrics) = metrics {
1014 metrics.update_sector_state(SectorState::Expired);
1015 }
1016 SectorExpirationDetails::Expired
1017 } else {
1018 if let Some(metrics) = metrics {
1019 metrics.update_sector_state(SectorState::AboutToExpire);
1020 }
1021 SectorExpirationDetails::AboutToExpire
1022 };
1023 handlers.sector_update.call_simple(&(
1024 sector_index,
1025 SectorUpdate::Expiration(expiration_details),
1026 ));
1027
1028 sectors_to_replot.push(SectorToReplot {
1030 sector_index,
1031 expires_at,
1032 });
1033 } else {
1034 trace!(
1035 %sector_index,
1036 %history_size,
1037 sector_expire_at = %expires_at,
1038 "Sector expires later, remembering sector expiration"
1039 );
1040
1041 handlers.sector_update.call_simple(&(
1042 sector_index,
1043 SectorUpdate::Expiration(SectorExpirationDetails::Determined {
1044 expires_at,
1045 }),
1046 ));
1047
1048 if let Some(expires_at_entry) =
1050 sectors_expire_at.get_mut(usize::from(sector_index))
1051 {
1052 expires_at_entry.replace(expires_at);
1053 }
1054 }
1055 }
1056 }
1057 }
1058 drop(sectors_metadata);
1059
1060 let sectors_queued = sectors_to_replot.len();
1061 sectors_to_replot.sort_by_key(|sector_to_replot| sector_to_replot.expires_at);
1062 for (index, SectorToReplot { sector_index, .. }) in sectors_to_replot.drain(..).enumerate()
1063 {
1064 let (acknowledgement_sender, acknowledgement_receiver) = oneshot::channel();
1065 if let Err(error) = sectors_to_plot_sender
1066 .send(SectorToPlot {
1067 sector_index,
1068 progress: index as f32 / sectors_queued as f32 * 100.0,
1069 last_queued: index + 1 == sectors_queued,
1070 acknowledgement_sender,
1071 })
1072 .await
1073 {
1074 warn!(%error, "Failed to send sector index for replotting");
1075 return Ok(());
1076 }
1077
1078 let _ = acknowledgement_receiver.await;
1080
1081 if let Some(expires_at_entry) = sectors_expire_at.get_mut(usize::from(sector_index)) {
1082 expires_at_entry.take();
1083 }
1084 }
1085
1086 if archived_segments_receiver.changed().await.is_err() {
1087 break;
1088 }
1089 }
1090
1091 Ok(())
1092}