1use crate::farm::{SectorExpirationDetails, SectorPlottingDetails, SectorUpdate};
2use crate::node_client::NodeClient;
3use crate::plotter::{Plotter, SectorPlottingProgress};
4use crate::single_disk_farm::direct_io_file_wrapper::DirectIoFileWrapper;
5use crate::single_disk_farm::metrics::{SectorState, SingleDiskFarmMetrics};
6use crate::single_disk_farm::{
7 BackgroundTaskError, Handlers, PlotMetadataHeader, RESERVED_PLOT_METADATA,
8};
9use ab_core_primitives::ed25519::Ed25519PublicKey;
10use ab_core_primitives::hashes::Blake3Hash;
11use ab_core_primitives::pieces::PieceOffset;
12use ab_core_primitives::sectors::{SectorId, SectorIndex};
13use ab_core_primitives::segments::{HistorySize, SegmentHeader, SegmentIndex};
14use ab_farmer_components::file_ext::FileExt;
15use ab_farmer_components::plotting::PlottedSector;
16use ab_farmer_components::sector::SectorMetadataChecksummed;
17use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock, Semaphore, SemaphoreGuard};
18use futures::channel::{mpsc, oneshot};
19use futures::stream::FuturesOrdered;
20use futures::{FutureExt, SinkExt, StreamExt, select};
21use parity_scale_codec::Encode;
22use rand::prelude::*;
23use std::collections::HashSet;
24use std::future::Future;
25use std::io;
26use std::num::NonZeroUsize;
27use std::ops::Range;
28use std::pin::pin;
29use std::sync::Arc;
30use std::time::{Duration, Instant};
31use thiserror::Error;
32use tokio::sync::watch;
33use tokio::task;
34use tracing::{Instrument, debug, info, info_span, trace, warn};
35
36const FARMER_APP_INFO_RETRY_INTERVAL: Duration = Duration::from_millis(500);
37const PLOTTING_RETRY_DELAY: Duration = Duration::from_secs(1);
38
39pub(super) struct SectorToPlot {
40 sector_index: SectorIndex,
41 progress: f32,
43 last_queued: bool,
45 acknowledgement_sender: oneshot::Sender<()>,
46}
47
48#[derive(Debug, Error)]
50pub enum PlottingError {
51 #[error("Failed to retrieve farmer info: {error}")]
53 FailedToGetFarmerInfo {
54 error: anyhow::Error,
56 },
57 #[error("Failed to get segment header: {error}")]
59 FailedToGetSegmentHeader {
60 error: anyhow::Error,
62 },
63 #[error("Missing archived segment header: {segment_index}")]
65 MissingArchivedSegmentHeader {
66 segment_index: SegmentIndex,
68 },
69 #[error("Failed to subscribe to archived segments: {error}")]
71 FailedToSubscribeArchivedSegments {
72 error: anyhow::Error,
74 },
75 #[error("Low-level plotting error: {0}")]
77 LowLevel(String),
78 #[error("Plotting I/O error: {0}")]
80 Io(#[from] io::Error),
81 #[error("Background downloading panicked")]
83 BackgroundDownloadingPanicked,
84}
85
86pub(super) struct SectorPlottingOptions<'a, NC> {
87 pub(super) public_key: Ed25519PublicKey,
88 pub(super) node_client: &'a NC,
89 pub(super) pieces_in_sector: u16,
90 pub(super) sector_size: usize,
91 pub(super) plot_file: Arc<DirectIoFileWrapper>,
92 pub(super) metadata_file: Arc<DirectIoFileWrapper>,
93 pub(super) handlers: &'a Handlers,
94 pub(super) global_mutex: &'a AsyncMutex<()>,
95 pub(super) plotter: Arc<dyn Plotter>,
96 pub(super) metrics: Option<Arc<SingleDiskFarmMetrics>>,
97}
98
99pub(super) struct PlottingOptions<'a, NC> {
100 pub(super) metadata_header: PlotMetadataHeader,
101 pub(super) sectors_metadata: &'a AsyncRwLock<Vec<SectorMetadataChecksummed>>,
102 pub(super) sectors_being_modified: &'a AsyncRwLock<HashSet<SectorIndex>>,
103 pub(super) sectors_to_plot_receiver: mpsc::Receiver<SectorToPlot>,
104 pub(super) sector_plotting_options: SectorPlottingOptions<'a, NC>,
105 pub(super) max_plotting_sectors_per_farm: NonZeroUsize,
106}
107
108pub(super) async fn plotting<NC>(
113 plotting_options: PlottingOptions<'_, NC>,
114) -> Result<(), PlottingError>
115where
116 NC: NodeClient,
117{
118 let PlottingOptions {
119 mut metadata_header,
120 sectors_metadata,
121 sectors_being_modified,
122 mut sectors_to_plot_receiver,
123 sector_plotting_options,
124 max_plotting_sectors_per_farm,
125 } = plotting_options;
126
127 let sector_plotting_options = §or_plotting_options;
128 let plotting_semaphore = Semaphore::new(max_plotting_sectors_per_farm.get());
129 let mut sectors_being_plotted = FuturesOrdered::new();
130 let (sector_plotting_result_sender, mut sector_plotting_result_receiver) = mpsc::unbounded();
133 let process_plotting_result_fut = async move {
134 while let Some(sector_plotting_result) = sector_plotting_result_receiver.next().await {
135 process_plotting_result(
136 sector_plotting_result,
137 sectors_metadata,
138 sectors_being_modified,
139 &mut metadata_header,
140 Arc::clone(§or_plotting_options.metadata_file),
141 )
142 .await?;
143 }
144
145 unreachable!(
146 "Stream will not end before the rest of the plotting process is shutting down"
147 );
148 };
149 let process_plotting_result_fut = process_plotting_result_fut.fuse();
150 let mut process_plotting_result_fut = pin!(process_plotting_result_fut);
151
152 loop {
155 select! {
156 maybe_sector_to_plot = sectors_to_plot_receiver.next() => {
157 let Some(sector_to_plot) = maybe_sector_to_plot else {
158 break;
159 };
160
161 let sector_index = sector_to_plot.sector_index;
162 let sector_plotting_init_fut = plot_single_sector(
163 sector_to_plot,
164 sector_plotting_options,
165 sectors_metadata,
166 sectors_being_modified,
167 &plotting_semaphore,
168 )
169 .instrument(info_span!("", %sector_index))
170 .fuse();
171 let mut sector_plotting_init_fut = pin!(sector_plotting_init_fut);
172
173 loop {
177 select! {
178 sector_plotting_init_result = sector_plotting_init_fut => {
179 let sector_plotting_fut = match sector_plotting_init_result {
180 PlotSingleSectorResult::Scheduled(future) => future,
181 PlotSingleSectorResult::Skipped => {
182 break;
183 }
184 PlotSingleSectorResult::FatalError(error) => {
185 return Err(error);
186 }
187 };
188 sectors_being_plotted.push_back(
189 sector_plotting_fut.instrument(info_span!("", %sector_index))
190 );
191 break;
192 }
193 maybe_sector_plotting_result = sectors_being_plotted.select_next_some() => {
194 sector_plotting_result_sender
195 .unbounded_send(maybe_sector_plotting_result?)
196 .expect("Sending means receiver is not dropped yet; qed");
197 }
198 result = process_plotting_result_fut => {
199 return result;
200 }
201 }
202 }
203 }
204 maybe_sector_plotting_result = sectors_being_plotted.select_next_some() => {
205 sector_plotting_result_sender
206 .unbounded_send(maybe_sector_plotting_result?)
207 .expect("Sending means receiver is not dropped yet; qed");
208 }
209 result = process_plotting_result_fut => {
210 return result;
211 }
212 }
213 }
214
215 Ok(())
216}
217
218async fn process_plotting_result(
219 sector_plotting_result: SectorPlottingResult<'_>,
220 sectors_metadata: &AsyncRwLock<Vec<SectorMetadataChecksummed>>,
221 sectors_being_modified: &AsyncRwLock<HashSet<SectorIndex>>,
222 metadata_header: &mut PlotMetadataHeader,
223 metadata_file: Arc<DirectIoFileWrapper>,
224) -> Result<(), PlottingError> {
225 let SectorPlottingResult {
226 sector_metadata,
227 replotting,
228 last_queued,
229 plotting_permit,
230 } = sector_plotting_result;
231
232 let sector_index = sector_metadata.sector_index;
233
234 {
235 let mut sectors_metadata = sectors_metadata.write().await;
236 if let Some(existing_sector_metadata) = sectors_metadata.get_mut(usize::from(sector_index))
238 {
239 *existing_sector_metadata = sector_metadata;
240 } else {
241 sectors_metadata.push(sector_metadata);
242 }
243 }
244
245 sectors_being_modified.write().await.remove(§or_index);
247
248 if u16::from(sector_index) + 1 > metadata_header.plotted_sector_count {
249 metadata_header.plotted_sector_count = u16::from(sector_index) + 1;
250
251 let encoded_metadata_header = metadata_header.encode();
252 let write_fut =
253 task::spawn_blocking(move || metadata_file.write_all_at(&encoded_metadata_header, 0));
254 write_fut.await.map_err(|error| {
255 PlottingError::LowLevel(format!("Failed to spawn blocking tokio task: {error}"))
256 })??;
257 }
258
259 if last_queued {
260 if replotting {
261 info!("Replotting complete");
262 } else {
263 info!("Initial plotting complete");
264 }
265 }
266
267 drop(plotting_permit);
268
269 Ok(())
270}
271
272enum PlotSingleSectorResult<F> {
273 Scheduled(F),
274 Skipped,
275 FatalError(PlottingError),
276}
277
278struct SectorPlottingResult<'a> {
279 sector_metadata: SectorMetadataChecksummed,
280 replotting: bool,
281 last_queued: bool,
282 plotting_permit: SemaphoreGuard<'a>,
283}
284
285async fn plot_single_sector<'a, NC>(
286 sector_to_plot: SectorToPlot,
287 sector_plotting_options: &'a SectorPlottingOptions<'a, NC>,
288 sectors_metadata: &'a AsyncRwLock<Vec<SectorMetadataChecksummed>>,
289 sectors_being_modified: &'a AsyncRwLock<HashSet<SectorIndex>>,
290 plotting_semaphore: &'a Semaphore,
291) -> PlotSingleSectorResult<
292 impl Future<Output = Result<SectorPlottingResult<'a>, PlottingError>> + 'a,
293>
294where
295 NC: NodeClient,
296{
297 let SectorPlottingOptions {
298 public_key,
299 node_client,
300 pieces_in_sector,
301 sector_size,
302 plot_file,
303 metadata_file,
304 handlers,
305 global_mutex,
306 plotter,
307 metrics,
308 } = sector_plotting_options;
309
310 let SectorToPlot {
311 sector_index,
312 progress,
313 last_queued,
314 acknowledgement_sender: _acknowledgement_sender,
315 } = sector_to_plot;
316 trace!("Preparing to plot sector");
317
318 {
320 let mut sectors_being_modified = sectors_being_modified.write().await;
321 if !sectors_being_modified.insert(sector_index) {
322 debug!("Skipped sector plotting, it is already in progress");
323 return PlotSingleSectorResult::Skipped;
324 }
325 }
326
327 let plotting_permit = plotting_semaphore.acquire().await;
328
329 let maybe_old_sector_metadata = sectors_metadata
330 .read()
331 .await
332 .get(usize::from(sector_index))
333 .cloned();
334 let replotting = maybe_old_sector_metadata.is_some();
335
336 if let Some(metrics) = metrics {
337 metrics.sector_plotting.inc();
338 }
339 let sector_state = SectorUpdate::Plotting(SectorPlottingDetails::Starting {
340 progress,
341 replotting,
342 last_queued,
343 });
344 handlers
345 .sector_update
346 .call_simple(&(sector_index, sector_state));
347
348 let start = Instant::now();
349
350 let farmer_app_info = loop {
356 let farmer_app_info = match node_client.farmer_app_info().await {
357 Ok(farmer_app_info) => farmer_app_info,
358 Err(error) => {
359 return PlotSingleSectorResult::FatalError(PlottingError::FailedToGetFarmerInfo {
360 error,
361 });
362 }
363 };
364
365 if let Some(old_sector_metadata) = &maybe_old_sector_metadata
366 && farmer_app_info.protocol_info.history_size <= old_sector_metadata.history_size
367 {
368 if farmer_app_info.protocol_info.min_sector_lifetime == HistorySize::ONE {
369 debug!(
370 current_history_size = %farmer_app_info.protocol_info.history_size,
371 old_sector_history_size = %old_sector_metadata.history_size,
372 "Latest protocol history size is not yet newer than old sector history \
373 size, wait for a bit and try again"
374 );
375 tokio::time::sleep(FARMER_APP_INFO_RETRY_INTERVAL).await;
376 continue;
377 } else {
378 debug!(
379 current_history_size = %farmer_app_info.protocol_info.history_size,
380 old_sector_history_size = %old_sector_metadata.history_size,
381 "Skipped sector plotting, likely redundant due to redundant archived \
382 segment notification"
383 );
384 return PlotSingleSectorResult::Skipped;
385 }
386 }
387
388 break farmer_app_info;
389 };
390
391 let (progress_sender, mut progress_receiver) = mpsc::channel(10);
392
393 plotter
395 .plot_sector(
396 *public_key,
397 sector_index,
398 farmer_app_info.protocol_info,
399 *pieces_in_sector,
400 replotting,
401 progress_sender,
402 )
403 .await;
404
405 if replotting {
406 info!("Replotting sector ({progress:.2}% complete)");
407 } else {
408 info!("Plotting sector ({progress:.2}% complete)");
409 }
410
411 PlotSingleSectorResult::Scheduled(async move {
412 let plotted_sector = loop {
413 match plot_single_sector_internal(
414 sector_index,
415 *sector_size,
416 plot_file,
417 metadata_file,
418 handlers,
419 global_mutex,
420 progress_receiver,
421 metrics,
422 )
423 .await?
424 {
425 Ok(plotted_sector) => {
426 break plotted_sector;
427 }
428 Err(error) => {
429 warn!(
430 %error,
431 "Failed to plot sector, retrying in {PLOTTING_RETRY_DELAY:?}"
432 );
433
434 tokio::time::sleep(PLOTTING_RETRY_DELAY).await;
435 }
436 }
437
438 let (retry_progress_sender, retry_progress_receiver) = mpsc::channel(10);
439 progress_receiver = retry_progress_receiver;
440
441 plotter
443 .plot_sector(
444 *public_key,
445 sector_index,
446 farmer_app_info.protocol_info,
447 *pieces_in_sector,
448 replotting,
449 retry_progress_sender,
450 )
451 .await;
452
453 if replotting {
454 info!("Replotting sector retry");
455 } else {
456 info!("Plotting sector retry");
457 }
458 };
459
460 let maybe_old_plotted_sector = maybe_old_sector_metadata.map(|old_sector_metadata| {
461 let old_history_size = old_sector_metadata.history_size;
462
463 PlottedSector {
464 sector_id: plotted_sector.sector_id,
465 sector_index: plotted_sector.sector_index,
466 sector_metadata: old_sector_metadata,
467 piece_indexes: {
468 let mut piece_indexes = Vec::with_capacity(usize::from(*pieces_in_sector));
469 (PieceOffset::ZERO..)
470 .take(usize::from(*pieces_in_sector))
471 .map(|piece_offset| {
472 plotted_sector.sector_id.derive_piece_index(
473 piece_offset,
474 old_history_size,
475 farmer_app_info.protocol_info.max_pieces_in_sector,
476 farmer_app_info.protocol_info.recent_segments,
477 farmer_app_info.protocol_info.recent_history_fraction,
478 )
479 })
480 .collect_into(&mut piece_indexes);
481 piece_indexes
482 },
483 }
484 });
485
486 if replotting {
487 debug!("Sector replotted successfully");
488 } else {
489 debug!("Sector plotted successfully");
490 }
491
492 let sector_metadata = plotted_sector.sector_metadata.clone();
493
494 let time = start.elapsed();
495 if let Some(metrics) = metrics {
496 metrics.sector_plotting_time.observe(time.as_secs_f64());
497 metrics.sector_plotted.inc();
498 metrics.update_sector_state(SectorState::Plotted);
499 }
500 let sector_state = SectorUpdate::Plotting(SectorPlottingDetails::Finished {
501 plotted_sector,
502 old_plotted_sector: maybe_old_plotted_sector,
503 time,
504 });
505 handlers
506 .sector_update
507 .call_simple(&(sector_index, sector_state));
508
509 Ok(SectorPlottingResult {
510 sector_metadata,
511 replotting,
512 last_queued,
513 plotting_permit,
514 })
515 })
516}
517
518#[allow(clippy::too_many_arguments)]
521async fn plot_single_sector_internal(
522 sector_index: SectorIndex,
523 sector_size: usize,
524 plot_file: &Arc<DirectIoFileWrapper>,
525 metadata_file: &Arc<DirectIoFileWrapper>,
526 handlers: &Handlers,
527 global_mutex: &AsyncMutex<()>,
528 mut progress_receiver: mpsc::Receiver<SectorPlottingProgress>,
529 metrics: &Option<Arc<SingleDiskFarmMetrics>>,
530) -> Result<Result<PlottedSector, PlottingError>, PlottingError> {
531 let progress_processor_fut = async {
533 while let Some(progress) = progress_receiver.next().await {
534 match progress {
535 SectorPlottingProgress::Downloading => {
536 if let Some(metrics) = metrics {
537 metrics.sector_downloading.inc();
538 }
539 handlers.sector_update.call_simple(&(
540 sector_index,
541 SectorUpdate::Plotting(SectorPlottingDetails::Downloading),
542 ));
543 }
544 SectorPlottingProgress::Downloaded(time) => {
545 if let Some(metrics) = metrics {
546 metrics.sector_downloading_time.observe(time.as_secs_f64());
547 metrics.sector_downloaded.inc();
548 }
549 handlers.sector_update.call_simple(&(
550 sector_index,
551 SectorUpdate::Plotting(SectorPlottingDetails::Downloaded(time)),
552 ));
553 }
554 SectorPlottingProgress::Encoding => {
555 if let Some(metrics) = metrics {
556 metrics.sector_encoding.inc();
557 }
558 handlers.sector_update.call_simple(&(
559 sector_index,
560 SectorUpdate::Plotting(SectorPlottingDetails::Encoding),
561 ));
562 }
563 SectorPlottingProgress::Encoded(time) => {
564 if let Some(metrics) = metrics {
565 metrics.sector_encoding_time.observe(time.as_secs_f64());
566 metrics.sector_encoded.inc();
567 }
568 handlers.sector_update.call_simple(&(
569 sector_index,
570 SectorUpdate::Plotting(SectorPlottingDetails::Encoded(time)),
571 ));
572 }
573 SectorPlottingProgress::Finished {
574 plotted_sector,
575 time: _,
576 sector,
577 } => {
578 return Ok((plotted_sector, sector));
579 }
580 SectorPlottingProgress::Error { error } => {
581 if let Some(metrics) = metrics {
582 metrics.sector_plotting_error.inc();
583 }
584 handlers.sector_update.call_simple(&(
585 sector_index,
586 SectorUpdate::Plotting(SectorPlottingDetails::Error(error.clone())),
587 ));
588 return Err(error);
589 }
590 }
591 }
592
593 Err("Plotting progress stream ended before plotting finished".to_string())
594 };
595
596 let (plotted_sector, mut sector) = match progress_processor_fut.await {
597 Ok(result) => result,
598 Err(error) => {
599 return Ok(Err(PlottingError::LowLevel(error)));
600 }
601 };
602
603 {
604 global_mutex.lock().await;
606
607 if let Some(metrics) = metrics {
608 metrics.sector_writing.inc();
609 }
610 handlers.sector_update.call_simple(&(
611 sector_index,
612 SectorUpdate::Plotting(SectorPlottingDetails::Writing),
613 ));
614
615 let start = Instant::now();
616
617 {
618 let sector_write_base_offset = u64::from(sector_index) * sector_size as u64;
619 let mut total_received = 0;
620 let mut sector_write_offset = sector_write_base_offset;
621 while let Some(maybe_sector_chunk) = sector.next().await {
622 let sector_chunk = match maybe_sector_chunk {
623 Ok(sector_chunk) => sector_chunk,
624 Err(error) => {
625 return Ok(Err(PlottingError::LowLevel(format!(
626 "Sector chunk receive error: {error}"
627 ))));
628 }
629 };
630
631 total_received += sector_chunk.len();
632
633 if total_received > sector_size {
634 return Ok(Err(PlottingError::LowLevel(format!(
635 "Received too many bytes {total_received} instead of expected \
636 {sector_size} bytes"
637 ))));
638 }
639
640 let sector_chunk_size = sector_chunk.len() as u64;
641
642 trace!(sector_chunk_size, "Writing sector chunk to disk");
643 let write_fut = task::spawn_blocking({
644 let plot_file = Arc::clone(plot_file);
645
646 move || plot_file.write_all_at(§or_chunk, sector_write_offset)
647 });
648 write_fut.await.map_err(|error| {
649 PlottingError::LowLevel(format!("Failed to spawn blocking tokio task: {error}"))
650 })??;
651
652 sector_write_offset += sector_chunk_size;
653 }
654 drop(sector);
655
656 if total_received != sector_size {
657 return Ok(Err(PlottingError::LowLevel(format!(
658 "Received only {total_received} sector bytes out of {sector_size} \
659 expected bytes"
660 ))));
661 }
662 }
663 {
664 let encoded_sector_metadata = plotted_sector.sector_metadata.encode();
665 let write_fut = task::spawn_blocking({
666 let metadata_file = Arc::clone(metadata_file);
667
668 move || {
669 metadata_file.write_all_at(
670 &encoded_sector_metadata,
671 RESERVED_PLOT_METADATA
672 + (u64::from(sector_index) * encoded_sector_metadata.len() as u64),
673 )
674 }
675 });
676 write_fut.await.map_err(|error| {
677 PlottingError::LowLevel(format!("Failed to spawn blocking tokio task: {error}"))
678 })??;
679 }
680
681 let time = start.elapsed();
682 if let Some(metrics) = metrics {
683 metrics.sector_writing_time.observe(time.as_secs_f64());
684 metrics.sector_written.inc();
685 }
686 handlers.sector_update.call_simple(&(
687 sector_index,
688 SectorUpdate::Plotting(SectorPlottingDetails::Written(time)),
689 ));
690 }
691
692 Ok(Ok(plotted_sector))
693}
694
695pub(super) struct PlottingSchedulerOptions<NC> {
696 pub(super) public_key_hash: Blake3Hash,
697 pub(super) sectors_indices_left_to_plot: Range<SectorIndex>,
698 pub(super) target_sector_count: u16,
699 pub(super) last_archived_segment_index: SegmentIndex,
700 pub(super) min_sector_lifetime: HistorySize,
701 pub(super) node_client: NC,
702 pub(super) handlers: Arc<Handlers>,
703 pub(super) sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
704 pub(super) sectors_to_plot_sender: mpsc::Sender<SectorToPlot>,
705 pub(super) new_segment_processing_delay: Duration,
708 pub(super) metrics: Option<Arc<SingleDiskFarmMetrics>>,
709}
710
711pub(super) async fn plotting_scheduler<NC>(
712 plotting_scheduler_options: PlottingSchedulerOptions<NC>,
713) -> Result<(), BackgroundTaskError>
714where
715 NC: NodeClient,
716{
717 let PlottingSchedulerOptions {
718 public_key_hash,
719 sectors_indices_left_to_plot,
720 target_sector_count,
721 last_archived_segment_index,
722 min_sector_lifetime,
723 node_client,
724 handlers,
725 sectors_metadata,
726 sectors_to_plot_sender,
727 new_segment_processing_delay,
728 metrics,
729 } = plotting_scheduler_options;
730
731 let last_archived_segment = node_client
735 .segment_headers(vec![last_archived_segment_index])
736 .await
737 .map_err(|error| PlottingError::FailedToGetSegmentHeader { error })?
738 .into_iter()
739 .next()
740 .flatten()
741 .ok_or(PlottingError::MissingArchivedSegmentHeader {
742 segment_index: last_archived_segment_index,
743 })?;
744
745 let (archived_segments_sender, archived_segments_receiver) =
746 watch::channel(last_archived_segment);
747
748 let read_archived_segments_notifications_fut = read_archived_segments_notifications(
749 &node_client,
750 archived_segments_sender,
751 new_segment_processing_delay,
752 );
753
754 let send_plotting_notifications_fut = send_plotting_notifications(
755 public_key_hash,
756 sectors_indices_left_to_plot,
757 target_sector_count,
758 min_sector_lifetime,
759 &node_client,
760 &handlers,
761 sectors_metadata,
762 archived_segments_receiver,
763 sectors_to_plot_sender,
764 &metrics,
765 );
766
767 select! {
768 result = read_archived_segments_notifications_fut.fuse() => {
769 result
770 }
771 result = send_plotting_notifications_fut.fuse() => {
772 result
773 }
774 }
775}
776
777async fn read_archived_segments_notifications<NC>(
778 node_client: &NC,
779 archived_segments_sender: watch::Sender<SegmentHeader>,
780 new_segment_processing_delay: Duration,
781) -> Result<(), BackgroundTaskError>
782where
783 NC: NodeClient,
784{
785 info!("Subscribing to archived segments");
786
787 let mut archived_segments_notifications = node_client
788 .subscribe_archived_segment_headers()
789 .await
790 .map_err(|error| PlottingError::FailedToSubscribeArchivedSegments { error })?;
791
792 while let Some(segment_header) = archived_segments_notifications.next().await {
793 debug!(?segment_header, "New archived segment");
794 if let Err(error) = node_client
795 .acknowledge_archived_segment_header(segment_header.segment_index())
796 .await
797 {
798 debug!(%error, "Failed to acknowledge segment header");
799 }
800
801 let delay = Duration::from_secs(rand::rng().random_range(
804 new_segment_processing_delay.as_secs() / 10..=new_segment_processing_delay.as_secs(),
805 ));
806 tokio::time::sleep(delay).await;
807
808 if archived_segments_sender.send(segment_header).is_err() {
809 break;
810 }
811 }
812
813 Ok(())
814}
815
816struct SectorToReplot {
817 sector_index: SectorIndex,
818 expires_at: SegmentIndex,
819}
820
821#[allow(clippy::too_many_arguments)]
822async fn send_plotting_notifications<NC>(
823 public_key_hash: Blake3Hash,
824 sectors_indices_left_to_plot: Range<SectorIndex>,
825 target_sector_count: u16,
826 min_sector_lifetime: HistorySize,
827 node_client: &NC,
828 handlers: &Handlers,
829 sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
830 mut archived_segments_receiver: watch::Receiver<SegmentHeader>,
831 mut sectors_to_plot_sender: mpsc::Sender<SectorToPlot>,
832 metrics: &Option<Arc<SingleDiskFarmMetrics>>,
833) -> Result<(), BackgroundTaskError>
834where
835 NC: NodeClient,
836{
837 for sector_index in sectors_indices_left_to_plot {
839 let (acknowledgement_sender, acknowledgement_receiver) = oneshot::channel();
840 if let Err(error) = sectors_to_plot_sender
841 .send(SectorToPlot {
842 sector_index,
843 progress: u16::from(sector_index) as f32 / target_sector_count as f32 * 100.0,
844 last_queued: u16::from(sector_index) + 1 == target_sector_count,
845 acknowledgement_sender,
846 })
847 .await
848 {
849 warn!(%error, "Failed to send sector index for initial plotting");
850 return Ok(());
851 }
852
853 let _ = acknowledgement_receiver.await;
855 }
856
857 let mut sectors_expire_at = vec![None::<SegmentIndex>; usize::from(target_sector_count)];
858 let mut sectors_to_replot = Vec::with_capacity(usize::from(target_sector_count) / 10);
860
861 loop {
862 let segment_index = archived_segments_receiver
863 .borrow_and_update()
864 .segment_index();
865 trace!(%segment_index, "New archived segment received");
866
867 let sectors_metadata = sectors_metadata.read().await;
868 let sectors_to_check = sectors_metadata
869 .iter()
870 .map(|sector_metadata| (sector_metadata.sector_index, sector_metadata.history_size));
871 for (sector_index, history_size) in sectors_to_check {
872 if let Some(Some(expires_at)) =
873 sectors_expire_at.get(usize::from(sector_index)).copied()
874 {
875 trace!(
876 %sector_index,
877 %history_size,
878 %expires_at,
879 "Checking sector for expiration"
880 );
881 if expires_at <= (segment_index + SegmentIndex::ONE) {
884 debug!(
885 %sector_index,
886 %history_size,
887 %expires_at,
888 "Sector expires soon #1, scheduling replotting"
889 );
890
891 let expiration_details = if expires_at <= segment_index {
892 if let Some(metrics) = metrics {
893 metrics.update_sector_state(SectorState::Expired);
894 }
895 SectorExpirationDetails::Expired
896 } else {
897 if let Some(metrics) = metrics {
898 metrics.update_sector_state(SectorState::AboutToExpire);
899 }
900 SectorExpirationDetails::AboutToExpire
901 };
902 handlers
903 .sector_update
904 .call_simple(&(sector_index, SectorUpdate::Expiration(expiration_details)));
905
906 sectors_to_replot.push(SectorToReplot {
908 sector_index,
909 expires_at,
910 });
911 }
912 continue;
913 }
914
915 if let Some(expiration_check_segment_index) = history_size
916 .sector_expiration_check(min_sector_lifetime)
917 .map(|expiration_check_history_size| expiration_check_history_size.segment_index())
918 {
919 trace!(
920 %sector_index,
921 %history_size,
922 %expiration_check_segment_index,
923 "Determined sector expiration check segment index"
924 );
925 let maybe_sector_expiration_check_segment_root = node_client
926 .segment_headers(vec![expiration_check_segment_index])
927 .await
928 .map_err(|error| PlottingError::FailedToGetSegmentHeader { error })?
929 .into_iter()
930 .next()
931 .flatten()
932 .map(|segment_header| segment_header.segment_root);
933
934 if let Some(sector_expiration_check_segment_root) =
935 maybe_sector_expiration_check_segment_root
936 {
937 let sector_id = SectorId::new(&public_key_hash, sector_index, history_size);
938 let expiration_history_size = sector_id
939 .derive_expiration_history_size(
940 history_size,
941 §or_expiration_check_segment_root,
942 min_sector_lifetime,
943 )
944 .expect(
945 "Farmers internally stores correct history size in sector \
946 metadata; qed",
947 );
948
949 let expires_at = expiration_history_size.segment_index();
950
951 trace!(
952 %sector_index,
953 %history_size,
954 sector_expire_at = %expires_at,
955 "Determined sector expiration segment index"
956 );
957 if expires_at <= (segment_index + SegmentIndex::ONE) {
960 debug!(
961 %sector_index,
962 %history_size,
963 %expires_at,
964 "Sector expires soon #2, scheduling replotting"
965 );
966
967 let expiration_details = if expires_at <= segment_index {
968 if let Some(metrics) = metrics {
969 metrics.update_sector_state(SectorState::Expired);
970 }
971 SectorExpirationDetails::Expired
972 } else {
973 if let Some(metrics) = metrics {
974 metrics.update_sector_state(SectorState::AboutToExpire);
975 }
976 SectorExpirationDetails::AboutToExpire
977 };
978 handlers.sector_update.call_simple(&(
979 sector_index,
980 SectorUpdate::Expiration(expiration_details),
981 ));
982
983 sectors_to_replot.push(SectorToReplot {
985 sector_index,
986 expires_at,
987 });
988 } else {
989 trace!(
990 %sector_index,
991 %history_size,
992 sector_expire_at = %expires_at,
993 "Sector expires later, remembering sector expiration"
994 );
995
996 handlers.sector_update.call_simple(&(
997 sector_index,
998 SectorUpdate::Expiration(SectorExpirationDetails::Determined {
999 expires_at,
1000 }),
1001 ));
1002
1003 if let Some(expires_at_entry) =
1005 sectors_expire_at.get_mut(usize::from(sector_index))
1006 {
1007 expires_at_entry.replace(expires_at);
1008 }
1009 }
1010 }
1011 }
1012 }
1013 drop(sectors_metadata);
1014
1015 let sectors_queued = sectors_to_replot.len();
1016 sectors_to_replot.sort_by_key(|sector_to_replot| sector_to_replot.expires_at);
1017 for (index, SectorToReplot { sector_index, .. }) in sectors_to_replot.drain(..).enumerate()
1018 {
1019 let (acknowledgement_sender, acknowledgement_receiver) = oneshot::channel();
1020 if let Err(error) = sectors_to_plot_sender
1021 .send(SectorToPlot {
1022 sector_index,
1023 progress: index as f32 / sectors_queued as f32 * 100.0,
1024 last_queued: index + 1 == sectors_queued,
1025 acknowledgement_sender,
1026 })
1027 .await
1028 {
1029 warn!(%error, "Failed to send sector index for replotting");
1030 return Ok(());
1031 }
1032
1033 let _ = acknowledgement_receiver.await;
1035
1036 if let Some(expires_at_entry) = sectors_expire_at.get_mut(usize::from(sector_index)) {
1037 expires_at_entry.take();
1038 }
1039 }
1040
1041 if archived_segments_receiver.changed().await.is_err() {
1042 break;
1043 }
1044 }
1045
1046 Ok(())
1047}