1use crate::cluster::nats_client::{GenericRequest, GenericStreamRequest, NatsClient};
10use crate::plotter::{Plotter, SectorPlottingProgress};
11use crate::utils::AsyncJoinOnDrop;
12use ab_core_primitives::ed25519::Ed25519PublicKey;
13use ab_core_primitives::sectors::SectorIndex;
14use ab_core_primitives::solutions::ShardCommitmentHash;
15use ab_farmer_components::FarmerProtocolInfo;
16use ab_farmer_components::plotting::PlottedSector;
17use ab_farmer_components::sector::sector_size;
18use anyhow::anyhow;
19use async_nats::RequestErrorKind;
20use async_trait::async_trait;
21use backoff::ExponentialBackoff;
22use backoff::backoff::Backoff;
23use bytes::Bytes;
24use derive_more::Display;
25use event_listener_primitives::{Bag, HandlerId};
26use futures::channel::mpsc;
27use futures::future::FusedFuture;
28use futures::stream::FuturesUnordered;
29use futures::{FutureExt, Sink, SinkExt, StreamExt, select, stream};
30use parity_scale_codec::{Decode, Encode};
31use std::error::Error;
32use std::future::pending;
33use std::num::NonZeroUsize;
34use std::pin::pin;
35use std::sync::Arc;
36use std::task::Poll;
37use std::time::{Duration, Instant};
38use tokio::sync::{OwnedSemaphorePermit, Semaphore};
39use tokio::time::MissedTickBehavior;
40use tracing::{Instrument, debug, info, info_span, trace, warn};
41use ulid::Ulid;
42
43const FREE_CAPACITY_CHECK_INTERVAL: Duration = Duration::from_secs(1);
44const PING_INTERVAL: Duration = Duration::from_secs(10);
46const PING_TIMEOUT: Duration = Duration::from_mins(1);
48
49pub type HandlerFn3<A, B, C> = Arc<dyn Fn(&A, &B, &C) + Send + Sync + 'static>;
51type Handler3<A, B, C> = Bag<HandlerFn3<A, B, C>, A, B, C>;
52
53#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Display)]
55pub enum ClusterPlotterId {
56 Ulid(Ulid),
58}
59
60#[expect(clippy::new_without_default)]
61impl ClusterPlotterId {
62 pub fn new() -> Self {
64 Self::Ulid(Ulid::new())
65 }
66}
67
68#[derive(Debug, Clone, Encode, Decode)]
70struct ClusterPlotterFreeInstanceRequest;
71
72impl GenericRequest for ClusterPlotterFreeInstanceRequest {
73 const SUBJECT: &'static str = "ab.plotter.free-instance";
74 type Response = Option<String>;
76}
77
78#[derive(Debug, Encode, Decode)]
79enum ClusterSectorPlottingProgress {
80 Occupied,
82 Ping,
84 Downloading,
86 Downloaded(Duration),
88 Encoding,
90 Encoded(Duration),
92 Finished {
94 plotted_sector: PlottedSector,
96 time: Duration,
98 },
99 SectorChunk(Result<Bytes, String>),
101 Error {
103 error: String,
105 },
106}
107
108#[derive(Debug, Clone, Encode, Decode)]
110struct ClusterPlotterPlotSectorRequest {
111 public_key: Ed25519PublicKey,
112 shard_commitments_root: ShardCommitmentHash,
113 sector_index: SectorIndex,
114 farmer_protocol_info: FarmerProtocolInfo,
115 pieces_in_sector: u16,
116}
117
118impl GenericStreamRequest for ClusterPlotterPlotSectorRequest {
119 const SUBJECT: &'static str = "ab.plotter.*.plot-sector";
120 type Response = ClusterSectorPlottingProgress;
121}
122
123#[derive(Default, Debug)]
124struct Handlers {
125 plotting_progress: Handler3<Ed25519PublicKey, SectorIndex, SectorPlottingProgress>,
126}
127
128#[derive(Debug)]
130pub struct ClusterPlotter {
131 sector_encoding_semaphore: Arc<Semaphore>,
132 retry_backoff_policy: ExponentialBackoff,
133 nats_client: NatsClient,
134 handlers: Arc<Handlers>,
135 tasks_sender: mpsc::Sender<AsyncJoinOnDrop<()>>,
136 _background_tasks: AsyncJoinOnDrop<()>,
137}
138
139impl Drop for ClusterPlotter {
140 #[inline]
141 fn drop(&mut self) {
142 self.tasks_sender.close_channel();
143 }
144}
145
146#[async_trait]
147impl Plotter for ClusterPlotter {
148 async fn has_free_capacity(&self) -> Result<bool, String> {
149 Ok(self.sector_encoding_semaphore.available_permits() > 0
150 && self
151 .nats_client
152 .request(&ClusterPlotterFreeInstanceRequest, None)
153 .await
154 .map_err(|error| error.to_string())?
155 .is_some())
156 }
157
158 async fn plot_sector(
159 &self,
160 public_key: Ed25519PublicKey,
161 shard_commitments_root: ShardCommitmentHash,
162 sector_index: SectorIndex,
163 farmer_protocol_info: FarmerProtocolInfo,
164 pieces_in_sector: u16,
165 _replotting: bool,
166 mut progress_sender: mpsc::Sender<SectorPlottingProgress>,
167 ) {
168 let start = Instant::now();
169
170 let sector_encoding_permit = match Arc::clone(&self.sector_encoding_semaphore)
173 .acquire_owned()
174 .await
175 {
176 Ok(sector_encoding_permit) => sector_encoding_permit,
177 Err(error) => {
178 warn!(%error, "Failed to acquire sector encoding permit");
179
180 let progress_updater = ProgressUpdater {
181 public_key,
182 sector_index,
183 handlers: Arc::clone(&self.handlers),
184 };
185
186 progress_updater
187 .update_progress_and_events(
188 &mut progress_sender,
189 SectorPlottingProgress::Error {
190 error: format!("Failed to acquire sector encoding permit: {error}"),
191 },
192 )
193 .await;
194
195 return;
196 }
197 };
198
199 self.plot_sector_internal(
200 start,
201 sector_encoding_permit,
202 public_key,
203 shard_commitments_root,
204 sector_index,
205 farmer_protocol_info,
206 pieces_in_sector,
207 progress_sender,
208 )
209 .await
210 }
211
212 async fn try_plot_sector(
213 &self,
214 public_key: Ed25519PublicKey,
215 shard_commitments_root: ShardCommitmentHash,
216 sector_index: SectorIndex,
217 farmer_protocol_info: FarmerProtocolInfo,
218 pieces_in_sector: u16,
219 _replotting: bool,
220 progress_sender: mpsc::Sender<SectorPlottingProgress>,
221 ) -> bool {
222 let start = Instant::now();
223
224 let Ok(sector_encoding_permit) =
225 Arc::clone(&self.sector_encoding_semaphore).try_acquire_owned()
226 else {
227 return false;
228 };
229
230 self.plot_sector_internal(
231 start,
232 sector_encoding_permit,
233 public_key,
234 shard_commitments_root,
235 sector_index,
236 farmer_protocol_info,
237 pieces_in_sector,
238 progress_sender,
239 )
240 .await;
241
242 true
243 }
244}
245
246impl ClusterPlotter {
247 pub fn new(
249 nats_client: NatsClient,
250 sector_encoding_concurrency: NonZeroUsize,
251 retry_backoff_policy: ExponentialBackoff,
252 ) -> Self {
253 let sector_encoding_semaphore = Arc::new(Semaphore::new(sector_encoding_concurrency.get()));
254
255 let (tasks_sender, mut tasks_receiver) = mpsc::channel(1);
256
257 let background_tasks = AsyncJoinOnDrop::new(
259 tokio::spawn(async move {
260 let background_tasks = FuturesUnordered::new();
261 let mut background_tasks = pin!(background_tasks);
262 background_tasks.push(AsyncJoinOnDrop::new(tokio::spawn(pending::<()>()), true));
264
265 loop {
266 select! {
267 maybe_background_task = tasks_receiver.next().fuse() => {
268 let Some(background_task) = maybe_background_task else {
269 break;
270 };
271
272 background_tasks.push(background_task);
273 },
274 _ = background_tasks.select_next_some() => {
275 }
277 }
278 }
279 }),
280 true,
281 );
282
283 Self {
284 sector_encoding_semaphore,
285 retry_backoff_policy,
286 nats_client,
287 handlers: Arc::default(),
288 tasks_sender,
289 _background_tasks: background_tasks,
290 }
291 }
292
293 pub fn on_plotting_progress(
295 &self,
296 callback: HandlerFn3<Ed25519PublicKey, SectorIndex, SectorPlottingProgress>,
297 ) -> HandlerId {
298 self.handlers.plotting_progress.add(callback)
299 }
300
301 #[expect(clippy::too_many_arguments)]
302 async fn plot_sector_internal<PS>(
303 &self,
304 start: Instant,
305 sector_encoding_permit: OwnedSemaphorePermit,
306 public_key: Ed25519PublicKey,
307 shard_commitments_root: ShardCommitmentHash,
308 sector_index: SectorIndex,
309 farmer_protocol_info: FarmerProtocolInfo,
310 pieces_in_sector: u16,
311 mut progress_sender: PS,
312 ) where
313 PS: Sink<SectorPlottingProgress> + Unpin + Send + 'static,
314 PS::Error: Error,
315 {
316 trace!("Starting plotting, getting plotting permit");
317
318 let progress_updater = ProgressUpdater {
319 public_key,
320 sector_index,
321 handlers: Arc::clone(&self.handlers),
322 };
323
324 let mut retry_backoff_policy = self.retry_backoff_policy.clone();
325 retry_backoff_policy.reset();
326
327 let free_plotter_instance_fut = get_free_plotter_instance(
329 &self.nats_client,
330 &progress_updater,
331 &mut progress_sender,
332 &mut retry_backoff_policy,
333 );
334 let mut maybe_free_instance = free_plotter_instance_fut.await;
335 if maybe_free_instance.is_none() {
336 return;
337 }
338
339 trace!("Got plotting permit #1");
340
341 let nats_client = self.nats_client.clone();
342
343 let plotting_fut = async move {
344 'outer: loop {
345 let free_instance = match maybe_free_instance.take() {
347 Some(free_instance) => free_instance,
348 None => {
349 let free_plotter_instance_fut = get_free_plotter_instance(
350 &nats_client,
351 &progress_updater,
352 &mut progress_sender,
353 &mut retry_backoff_policy,
354 );
355 let Some(free_instance) = free_plotter_instance_fut.await else {
356 break;
357 };
358 trace!("Got plotting permit #2");
359 free_instance
360 }
361 };
362
363 let response_stream_result = nats_client
364 .stream_request(
365 &ClusterPlotterPlotSectorRequest {
366 public_key,
367 shard_commitments_root,
368 sector_index,
369 farmer_protocol_info,
370 pieces_in_sector,
371 },
372 Some(&free_instance),
373 )
374 .await;
375 trace!("Subscribed to plotting notifications");
376
377 let mut response_stream = match response_stream_result {
378 Ok(response_stream) => response_stream,
379 Err(error) => {
380 progress_updater
381 .update_progress_and_events(
382 &mut progress_sender,
383 SectorPlottingProgress::Error {
384 error: format!("Failed make stream request: {error}"),
385 },
386 )
387 .await;
388
389 break;
390 }
391 };
392
393 let (mut sector_sender, sector_receiver) = mpsc::channel(
396 (sector_size(pieces_in_sector) / nats_client.approximate_max_message_size())
397 .max(1),
398 );
399 let mut maybe_sector_receiver = Some(sector_receiver);
400 loop {
401 match tokio::time::timeout(PING_TIMEOUT, response_stream.next()).await {
402 Ok(Some(response)) => {
403 match process_response_notification(
404 &start,
405 &free_instance,
406 &progress_updater,
407 &mut progress_sender,
408 &mut retry_backoff_policy,
409 response,
410 &mut sector_sender,
411 &mut maybe_sector_receiver,
412 )
413 .await
414 {
415 ResponseProcessingResult::Retry => {
416 debug!("Retrying");
417 continue 'outer;
418 }
419 ResponseProcessingResult::Abort => {
420 debug!("Aborting");
421 break 'outer;
422 }
423 ResponseProcessingResult::Continue => {
424 trace!("Continue");
425 }
427 }
428 }
429 Ok(None) => {
430 trace!("Plotting done");
431 break;
432 }
433 Err(_error) => {
434 progress_updater
435 .update_progress_and_events(
436 &mut progress_sender,
437 SectorPlottingProgress::Error {
438 error: "Timed out without ping from plotter".to_string(),
439 },
440 )
441 .await;
442 break;
443 }
444 }
445 }
446
447 break;
448 }
449
450 drop(sector_encoding_permit);
451 };
452
453 let plotting_task =
454 AsyncJoinOnDrop::new(tokio::spawn(plotting_fut.in_current_span()), true);
455 if let Err(error) = self.tasks_sender.clone().send(plotting_task).await {
456 warn!(%error, "Failed to send plotting task");
457
458 let progress = SectorPlottingProgress::Error {
459 error: format!("Failed to send plotting task: {error}"),
460 };
461
462 self.handlers
463 .plotting_progress
464 .call_simple(&public_key, §or_index, &progress);
465 }
466 }
467}
468
469async fn get_free_plotter_instance<PS>(
471 nats_client: &NatsClient,
472 progress_updater: &ProgressUpdater,
473 progress_sender: &mut PS,
474 retry_backoff_policy: &mut ExponentialBackoff,
475) -> Option<String>
476where
477 PS: Sink<SectorPlottingProgress> + Unpin + Send + 'static,
478 PS::Error: Error,
479{
480 loop {
481 match nats_client
482 .request(&ClusterPlotterFreeInstanceRequest, None)
483 .await
484 {
485 Ok(Some(free_instance)) => {
486 return Some(free_instance);
487 }
488 Ok(None) => {
489 if let Some(delay) = retry_backoff_policy.next_backoff() {
490 debug!("Instance was occupied, retrying #1");
491
492 tokio::time::sleep(delay).await;
493 continue;
494 } else {
495 progress_updater
496 .update_progress_and_events(
497 progress_sender,
498 SectorPlottingProgress::Error {
499 error: "Instance was occupied, exiting #1".to_string(),
500 },
501 )
502 .await;
503 return None;
504 }
505 }
506 Err(error) => match error.kind() {
507 RequestErrorKind::TimedOut => {
508 if let Some(delay) = retry_backoff_policy.next_backoff() {
509 debug!("Plotter request timed out, retrying");
510
511 tokio::time::sleep(delay).await;
512 continue;
513 } else {
514 progress_updater
515 .update_progress_and_events(
516 progress_sender,
517 SectorPlottingProgress::Error {
518 error: "Plotter request timed out, exiting".to_string(),
519 },
520 )
521 .await;
522 return None;
523 }
524 }
525 RequestErrorKind::NoResponders => {
526 if let Some(delay) = retry_backoff_policy.next_backoff() {
527 debug!("No plotters, retrying");
528
529 tokio::time::sleep(delay).await;
530 continue;
531 } else {
532 progress_updater
533 .update_progress_and_events(
534 progress_sender,
535 SectorPlottingProgress::Error {
536 error: "No plotters, exiting".to_string(),
537 },
538 )
539 .await;
540 return None;
541 }
542 }
543 RequestErrorKind::Other => {
544 progress_updater
545 .update_progress_and_events(
546 progress_sender,
547 SectorPlottingProgress::Error {
548 error: format!("Failed to get free plotter instance: {error}"),
549 },
550 )
551 .await;
552 return None;
553 }
554 },
555 };
556 }
557}
558
559enum ResponseProcessingResult {
560 Retry,
561 Abort,
562 Continue,
563}
564
565#[expect(clippy::too_many_arguments)]
566async fn process_response_notification<PS>(
567 start: &Instant,
568 free_instance: &str,
569 progress_updater: &ProgressUpdater,
570 progress_sender: &mut PS,
571 retry_backoff_policy: &mut ExponentialBackoff,
572 response: ClusterSectorPlottingProgress,
573 sector_sender: &mut mpsc::Sender<Result<Bytes, String>>,
574 maybe_sector_receiver: &mut Option<mpsc::Receiver<Result<Bytes, String>>>,
575) -> ResponseProcessingResult
576where
577 PS: Sink<SectorPlottingProgress> + Unpin + Send + 'static,
578 PS::Error: Error,
579{
580 if !matches!(response, ClusterSectorPlottingProgress::SectorChunk(_)) {
581 trace!(?response, "Processing plotting response notification");
582 } else {
583 trace!("Processing plotting response notification (sector chunk)");
584 }
585
586 match response {
587 ClusterSectorPlottingProgress::Occupied => {
588 debug!(%free_instance, "Instance was occupied, retrying #2");
589
590 if let Some(delay) = retry_backoff_policy.next_backoff() {
591 debug!("Instance was occupied, retrying #2");
592
593 tokio::time::sleep(delay).await;
594 return ResponseProcessingResult::Retry;
595 } else {
596 debug!("Instance was occupied, exiting #2");
597 return ResponseProcessingResult::Abort;
598 }
599 }
600 ClusterSectorPlottingProgress::Ping => {
601 }
603 ClusterSectorPlottingProgress::Downloading => {
604 if !progress_updater
605 .update_progress_and_events(progress_sender, SectorPlottingProgress::Downloading)
606 .await
607 {
608 return ResponseProcessingResult::Abort;
609 }
610 }
611 ClusterSectorPlottingProgress::Downloaded(time) => {
612 if !progress_updater
613 .update_progress_and_events(
614 progress_sender,
615 SectorPlottingProgress::Downloaded(time),
616 )
617 .await
618 {
619 return ResponseProcessingResult::Abort;
620 }
621 }
622 ClusterSectorPlottingProgress::Encoding => {
623 if !progress_updater
624 .update_progress_and_events(progress_sender, SectorPlottingProgress::Encoding)
625 .await
626 {
627 return ResponseProcessingResult::Abort;
628 }
629 }
630 ClusterSectorPlottingProgress::Encoded(time) => {
631 if !progress_updater
632 .update_progress_and_events(progress_sender, SectorPlottingProgress::Encoded(time))
633 .await
634 {
635 return ResponseProcessingResult::Abort;
636 }
637 }
638 ClusterSectorPlottingProgress::Finished {
639 plotted_sector,
640 time: _,
641 } => {
642 let Some(sector_receiver) = maybe_sector_receiver.take() else {
643 debug!("Unexpected duplicated sector plotting progress Finished");
644
645 progress_updater
646 .update_progress_and_events(
647 progress_sender,
648 SectorPlottingProgress::Error {
649 error: "Unexpected duplicated sector plotting progress Finished"
650 .to_string(),
651 },
652 )
653 .await;
654 return ResponseProcessingResult::Abort;
655 };
656
657 let progress = SectorPlottingProgress::Finished {
658 plotted_sector,
659 time: start.elapsed(),
661 sector: Box::pin(sector_receiver),
662 };
663 if !progress_updater
664 .update_progress_and_events(progress_sender, progress)
665 .await
666 {
667 return ResponseProcessingResult::Abort;
668 }
669
670 return ResponseProcessingResult::Continue;
671 }
672 ClusterSectorPlottingProgress::SectorChunk(maybe_sector_chunk) => {
674 if let Err(error) = sector_sender.send(maybe_sector_chunk).await {
675 warn!(%error, "Failed to send sector chunk");
676 return ResponseProcessingResult::Abort;
677 }
678 return ResponseProcessingResult::Continue;
679 }
680 ClusterSectorPlottingProgress::Error { error } => {
681 if !progress_updater
682 .update_progress_and_events(
683 progress_sender,
684 SectorPlottingProgress::Error { error },
685 )
686 .await
687 {
688 return ResponseProcessingResult::Abort;
689 }
690 }
691 }
692
693 ResponseProcessingResult::Continue
694}
695
696struct ProgressUpdater {
697 public_key: Ed25519PublicKey,
698 sector_index: SectorIndex,
699 handlers: Arc<Handlers>,
700}
701
702impl ProgressUpdater {
703 async fn update_progress_and_events<PS>(
705 &self,
706 progress_sender: &mut PS,
707 progress: SectorPlottingProgress,
708 ) -> bool
709 where
710 PS: Sink<SectorPlottingProgress> + Unpin,
711 PS::Error: Error,
712 {
713 self.handlers.plotting_progress.call_simple(
714 &self.public_key,
715 &self.sector_index,
716 &progress,
717 );
718
719 if let Err(error) = progress_sender.send(progress).await {
720 warn!(%error, "Failed to send error progress update");
721
722 false
723 } else {
724 true
725 }
726 }
727}
728
729pub async fn plotter_service<P>(nats_client: &NatsClient, plotter: &P) -> anyhow::Result<()>
734where
735 P: Plotter + Sync,
736{
737 let plotter_id = ClusterPlotterId::new();
738
739 select! {
740 result = free_instance_responder(&plotter_id, nats_client, plotter).fuse() => {
741 result
742 }
743 result = plot_sector_responder(&plotter_id, nats_client, plotter).fuse() => {
744 result
745 }
746 }
747}
748
749async fn free_instance_responder<P>(
750 plotter_id: &ClusterPlotterId,
751 nats_client: &NatsClient,
752 plotter: &P,
753) -> anyhow::Result<()>
754where
755 P: Plotter + Sync,
756{
757 loop {
758 while !plotter.has_free_capacity().await.unwrap_or_default() {
759 tokio::time::sleep(FREE_CAPACITY_CHECK_INTERVAL).await;
760 }
761
762 let mut subscription = nats_client
763 .queue_subscribe(
764 ClusterPlotterFreeInstanceRequest::SUBJECT,
765 "ab.plotter".to_string(),
766 )
767 .await
768 .map_err(|error| anyhow!("Failed to subscribe to free instance requests: {error}"))?;
769 debug!(?subscription, "Free instance subscription");
770
771 while let Some(message) = subscription.next().await {
772 let Some(reply_subject) = message.reply else {
773 continue;
774 };
775
776 debug!(%reply_subject, "Free instance request");
777
778 let has_free_capacity = plotter.has_free_capacity().await.unwrap_or_default();
779 let response: <ClusterPlotterFreeInstanceRequest as GenericRequest>::Response =
780 has_free_capacity.then(|| plotter_id.to_string());
781
782 if let Err(error) = nats_client
783 .publish(reply_subject, response.encode().into())
784 .await
785 {
786 warn!(%error, "Failed to send free instance response");
787 }
788
789 if !has_free_capacity {
790 subscription.unsubscribe().await.map_err(|error| {
791 anyhow!("Failed to unsubscribe from free instance requests: {error}")
792 })?;
793 }
794 }
795 }
796}
797
798async fn plot_sector_responder<P>(
799 plotter_id: &ClusterPlotterId,
800 nats_client: &NatsClient,
801 plotter: &P,
802) -> anyhow::Result<()>
803where
804 P: Plotter + Sync,
805{
806 let plotter_id_string = plotter_id.to_string();
807
808 nats_client
809 .stream_request_responder(
810 Some(&plotter_id_string),
811 Some(plotter_id_string.clone()),
812 |request| async move {
813 let (progress_sender, mut progress_receiver) = mpsc::channel(10);
814
815 let fut =
816 process_plot_sector_request(nats_client, plotter, request, progress_sender);
817 let mut fut = Box::pin(fut.fuse());
818
819 Some(
820 stream::poll_fn(move |cx| {
822 if !fut.is_terminated() {
823 let _ = fut.poll_unpin(cx);
825 }
826
827 if let Poll::Ready(maybe_result) = progress_receiver.poll_next_unpin(cx) {
828 return Poll::Ready(maybe_result);
829 }
830
831 Poll::Pending
833 }),
834 )
835 },
836 )
837 .await
838}
839
840async fn process_plot_sector_request<P>(
841 nats_client: &NatsClient,
842 plotter: &P,
843 request: ClusterPlotterPlotSectorRequest,
844 mut response_proxy_sender: mpsc::Sender<ClusterSectorPlottingProgress>,
845) where
846 P: Plotter,
847{
848 let ClusterPlotterPlotSectorRequest {
849 public_key,
850 shard_commitments_root,
851 sector_index,
852 farmer_protocol_info,
853 pieces_in_sector,
854 } = request;
855
856 let inner_fut = async {
858 info!("Plot sector request");
859
860 let (progress_sender, mut progress_receiver) = mpsc::channel(1);
861
862 if !plotter
863 .try_plot_sector(
864 public_key,
865 shard_commitments_root,
866 sector_index,
867 farmer_protocol_info,
868 pieces_in_sector,
869 false,
870 progress_sender,
871 )
872 .await
873 {
874 debug!("Plotter is currently occupied and can't plot more sectors");
875
876 if let Err(error) = response_proxy_sender
877 .send(ClusterSectorPlottingProgress::Occupied)
878 .await
879 {
880 warn!(%error, "Failed to send plotting progress");
881 return;
882 }
883 return;
884 }
885
886 let progress_proxy_fut = {
887 let mut response_proxy_sender = response_proxy_sender.clone();
888 let approximate_max_message_size = nats_client.approximate_max_message_size();
889
890 async move {
891 while let Some(progress) = progress_receiver.next().await {
892 send_publish_progress(
893 &mut response_proxy_sender,
894 progress,
895 approximate_max_message_size,
896 )
897 .await;
898 }
899 }
900 };
901
902 let mut ping_interval = tokio::time::interval(PING_INTERVAL);
903 ping_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
904 let ping_fut = async {
905 loop {
906 ping_interval.tick().await;
907 if let Err(error) = response_proxy_sender
908 .send(ClusterSectorPlottingProgress::Ping)
909 .await
910 {
911 warn!(%error, "Failed to send plotting ping");
912 return;
913 }
914 }
915 };
916
917 select! {
918 _ = progress_proxy_fut.fuse() => {
919 }
921 _ = ping_fut.fuse() => {
922 unreachable!("Ping loop never ends");
923 }
924 }
925
926 info!("Finished plotting sector successfully");
927 };
928
929 inner_fut
930 .instrument(info_span!("", %public_key, %sector_index))
931 .await
932}
933
934async fn send_publish_progress(
935 response_sender: &mut mpsc::Sender<ClusterSectorPlottingProgress>,
936 progress: SectorPlottingProgress,
937 approximate_max_message_size: usize,
938) {
939 let cluster_progress = match progress {
941 SectorPlottingProgress::Downloading => ClusterSectorPlottingProgress::Downloading,
942 SectorPlottingProgress::Downloaded(time) => ClusterSectorPlottingProgress::Downloaded(time),
943 SectorPlottingProgress::Encoding => ClusterSectorPlottingProgress::Encoding,
944 SectorPlottingProgress::Encoded(time) => ClusterSectorPlottingProgress::Encoded(time),
945 SectorPlottingProgress::Finished {
946 plotted_sector,
947 time,
948 mut sector,
949 } => {
950 if let Err(error) = response_sender
951 .send(ClusterSectorPlottingProgress::Finished {
952 plotted_sector,
953 time,
954 })
955 .await
956 {
957 warn!(%error, "Failed to send plotting progress");
958 return;
959 }
960
961 while let Some(maybe_sector_chunk) = sector.next().await {
962 match maybe_sector_chunk {
963 Ok(sector_chunk) => {
964 for small_sector_chunk in sector_chunk.chunks(approximate_max_message_size)
966 {
967 if let Err(error) = response_sender
968 .send(ClusterSectorPlottingProgress::SectorChunk(Ok(
969 sector_chunk.slice_ref(small_sector_chunk)
970 )))
971 .await
972 {
973 warn!(%error, "Failed to send plotting progress");
974 return;
975 }
976 }
977 }
978 Err(error) => {
979 if let Err(error) = response_sender
980 .send(ClusterSectorPlottingProgress::SectorChunk(Err(error)))
981 .await
982 {
983 warn!(%error, "Failed to send plotting progress");
984 return;
985 }
986 }
987 }
988 }
989
990 return;
991 }
992 SectorPlottingProgress::Error { error } => ClusterSectorPlottingProgress::Error { error },
993 };
994
995 if let Err(error) = response_sender.send(cluster_progress).await {
996 warn!(%error, "Failed to send plotting progress");
997 }
998}