1use crate::cluster::nats_client::{GenericRequest, GenericStreamRequest, NatsClient};
10use crate::plotter::{Plotter, SectorPlottingProgress};
11use crate::utils::AsyncJoinOnDrop;
12use ab_core_primitives::ed25519::Ed25519PublicKey;
13use ab_core_primitives::sectors::SectorIndex;
14use ab_core_primitives::solutions::ShardCommitmentHash;
15use ab_farmer_components::FarmerProtocolInfo;
16use ab_farmer_components::plotting::PlottedSector;
17use ab_farmer_components::sector::sector_size;
18use anyhow::anyhow;
19use async_nats::RequestErrorKind;
20use async_trait::async_trait;
21use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder};
22use bytes::Bytes;
23use derive_more::Display;
24use event_listener_primitives::{Bag, HandlerId};
25use futures::channel::mpsc;
26use futures::future::FusedFuture;
27use futures::stream::FuturesUnordered;
28use futures::{FutureExt, Sink, SinkExt, StreamExt, select, stream};
29use parity_scale_codec::{Decode, Encode};
30use std::error::Error;
31use std::future::pending;
32use std::num::NonZeroUsize;
33use std::pin::pin;
34use std::sync::Arc;
35use std::task::Poll;
36use std::time::{Duration, Instant};
37use tokio::sync::{OwnedSemaphorePermit, Semaphore};
38use tokio::time::MissedTickBehavior;
39use tracing::{Instrument, debug, info, info_span, trace, warn};
40use ulid::Ulid;
41
42const FREE_CAPACITY_CHECK_INTERVAL: Duration = Duration::from_secs(1);
43const PING_INTERVAL: Duration = Duration::from_secs(10);
45const PING_TIMEOUT: Duration = Duration::from_mins(1);
47
48pub type HandlerFn3<A, B, C> = Arc<dyn Fn(&A, &B, &C) + Send + Sync + 'static>;
50type Handler3<A, B, C> = Bag<HandlerFn3<A, B, C>, A, B, C>;
51
52#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Display)]
54pub enum ClusterPlotterId {
55 Ulid(Ulid),
57}
58
59#[expect(
60 clippy::new_without_default,
61 reason = "Default has different semantics"
62)]
63impl ClusterPlotterId {
64 pub fn new() -> Self {
66 Self::Ulid(Ulid::new())
67 }
68}
69
70#[derive(Debug, Clone, Encode, Decode)]
72struct ClusterPlotterFreeInstanceRequest;
73
74impl GenericRequest for ClusterPlotterFreeInstanceRequest {
75 const SUBJECT: &'static str = "ab.plotter.free-instance";
76 type Response = Option<String>;
78}
79
80#[derive(Debug, Encode, Decode)]
81enum ClusterSectorPlottingProgress {
82 Occupied,
84 Ping,
86 Downloading,
88 Downloaded(Duration),
90 Encoding,
92 Encoded(Duration),
94 Finished {
96 plotted_sector: PlottedSector,
98 time: Duration,
100 },
101 SectorChunk(Result<Bytes, String>),
103 Error {
105 error: String,
107 },
108}
109
110#[derive(Debug, Clone, Encode, Decode)]
112struct ClusterPlotterPlotSectorRequest {
113 public_key: Ed25519PublicKey,
114 shard_commitments_root: ShardCommitmentHash,
115 sector_index: SectorIndex,
116 farmer_protocol_info: FarmerProtocolInfo,
117 pieces_in_sector: u16,
118}
119
120impl GenericStreamRequest for ClusterPlotterPlotSectorRequest {
121 const SUBJECT: &'static str = "ab.plotter.*.plot-sector";
122 type Response = ClusterSectorPlottingProgress;
123}
124
125#[derive(Default, Debug)]
126struct Handlers {
127 plotting_progress: Handler3<Ed25519PublicKey, SectorIndex, SectorPlottingProgress>,
128}
129
130#[derive(Debug)]
132pub struct ClusterPlotter {
133 sector_encoding_semaphore: Arc<Semaphore>,
134 retry_backoff_policy: ExponentialBuilder,
135 nats_client: NatsClient,
136 handlers: Arc<Handlers>,
137 tasks_sender: mpsc::Sender<AsyncJoinOnDrop<()>>,
138 _background_tasks: AsyncJoinOnDrop<()>,
139}
140
141impl Drop for ClusterPlotter {
142 #[inline]
143 fn drop(&mut self) {
144 self.tasks_sender.close_channel();
145 }
146}
147
148#[async_trait]
149impl Plotter for ClusterPlotter {
150 async fn has_free_capacity(&self) -> Result<bool, String> {
151 Ok(self.sector_encoding_semaphore.available_permits() > 0
152 && self
153 .nats_client
154 .request(&ClusterPlotterFreeInstanceRequest, None)
155 .await
156 .map_err(|error| error.to_string())?
157 .is_some())
158 }
159
160 async fn plot_sector(
161 &self,
162 public_key: Ed25519PublicKey,
163 shard_commitments_root: ShardCommitmentHash,
164 sector_index: SectorIndex,
165 farmer_protocol_info: FarmerProtocolInfo,
166 pieces_in_sector: u16,
167 _replotting: bool,
168 mut progress_sender: mpsc::Sender<SectorPlottingProgress>,
169 ) {
170 let start = Instant::now();
171
172 let sector_encoding_permit = match Arc::clone(&self.sector_encoding_semaphore)
175 .acquire_owned()
176 .await
177 {
178 Ok(sector_encoding_permit) => sector_encoding_permit,
179 Err(error) => {
180 warn!(%error, "Failed to acquire sector encoding permit");
181
182 let progress_updater = ProgressUpdater {
183 public_key,
184 sector_index,
185 handlers: Arc::clone(&self.handlers),
186 };
187
188 progress_updater
189 .update_progress_and_events(
190 &mut progress_sender,
191 SectorPlottingProgress::Error {
192 error: format!("Failed to acquire sector encoding permit: {error}"),
193 },
194 )
195 .await;
196
197 return;
198 }
199 };
200
201 self.plot_sector_internal(
202 start,
203 sector_encoding_permit,
204 public_key,
205 shard_commitments_root,
206 sector_index,
207 farmer_protocol_info,
208 pieces_in_sector,
209 progress_sender,
210 )
211 .await;
212 }
213
214 async fn try_plot_sector(
215 &self,
216 public_key: Ed25519PublicKey,
217 shard_commitments_root: ShardCommitmentHash,
218 sector_index: SectorIndex,
219 farmer_protocol_info: FarmerProtocolInfo,
220 pieces_in_sector: u16,
221 _replotting: bool,
222 progress_sender: mpsc::Sender<SectorPlottingProgress>,
223 ) -> bool {
224 let start = Instant::now();
225
226 let Ok(sector_encoding_permit) =
227 Arc::clone(&self.sector_encoding_semaphore).try_acquire_owned()
228 else {
229 return false;
230 };
231
232 self.plot_sector_internal(
233 start,
234 sector_encoding_permit,
235 public_key,
236 shard_commitments_root,
237 sector_index,
238 farmer_protocol_info,
239 pieces_in_sector,
240 progress_sender,
241 )
242 .await;
243
244 true
245 }
246}
247
248impl ClusterPlotter {
249 pub fn new(
251 nats_client: NatsClient,
252 sector_encoding_concurrency: NonZeroUsize,
253 retry_backoff_policy: ExponentialBuilder,
254 ) -> Self {
255 let sector_encoding_semaphore = Arc::new(Semaphore::new(sector_encoding_concurrency.get()));
256
257 let (tasks_sender, mut tasks_receiver) = mpsc::channel(1);
258
259 let background_tasks = AsyncJoinOnDrop::new(
261 tokio::spawn(async move {
262 let background_tasks = FuturesUnordered::new();
263 let mut background_tasks = pin!(background_tasks);
264 background_tasks.push(AsyncJoinOnDrop::new(tokio::spawn(pending::<()>()), true));
266
267 loop {
268 select! {
269 maybe_background_task = tasks_receiver.next().fuse() => {
270 let Some(background_task) = maybe_background_task else {
271 break;
272 };
273
274 background_tasks.push(background_task);
275 },
276 _ = background_tasks.select_next_some() => {
277 }
279 }
280 }
281 }),
282 true,
283 );
284
285 Self {
286 sector_encoding_semaphore,
287 retry_backoff_policy,
288 nats_client,
289 handlers: Arc::default(),
290 tasks_sender,
291 _background_tasks: background_tasks,
292 }
293 }
294
295 pub fn on_plotting_progress(
297 &self,
298 callback: HandlerFn3<Ed25519PublicKey, SectorIndex, SectorPlottingProgress>,
299 ) -> HandlerId {
300 self.handlers.plotting_progress.add(callback)
301 }
302
303 #[expect(clippy::too_many_arguments, reason = "Internal API")]
304 async fn plot_sector_internal<PS>(
305 &self,
306 start: Instant,
307 sector_encoding_permit: OwnedSemaphorePermit,
308 public_key: Ed25519PublicKey,
309 shard_commitments_root: ShardCommitmentHash,
310 sector_index: SectorIndex,
311 farmer_protocol_info: FarmerProtocolInfo,
312 pieces_in_sector: u16,
313 mut progress_sender: PS,
314 ) where
315 PS: Sink<SectorPlottingProgress> + Unpin + Send + 'static,
316 PS::Error: Error,
317 {
318 trace!("Starting plotting, getting plotting permit");
319
320 let progress_updater = ProgressUpdater {
321 public_key,
322 sector_index,
323 handlers: Arc::clone(&self.handlers),
324 };
325
326 let mut retry_backoff_policy = self.retry_backoff_policy.build();
327
328 let free_plotter_instance_fut = get_free_plotter_instance(
330 &self.nats_client,
331 &progress_updater,
332 &mut progress_sender,
333 &mut retry_backoff_policy,
334 );
335 let mut maybe_free_instance = free_plotter_instance_fut.await;
336 if maybe_free_instance.is_none() {
337 return;
338 }
339
340 trace!("Got plotting permit #1");
341
342 let nats_client = self.nats_client.clone();
343
344 let plotting_fut = async move {
345 'outer: loop {
346 let free_instance = if let Some(free_instance) = maybe_free_instance.take() {
348 free_instance
349 } else {
350 let free_plotter_instance_fut = get_free_plotter_instance(
351 &nats_client,
352 &progress_updater,
353 &mut progress_sender,
354 &mut retry_backoff_policy,
355 );
356 let Some(free_instance) = free_plotter_instance_fut.await else {
357 break;
358 };
359 trace!("Got plotting permit #2");
360 free_instance
361 };
362
363 let response_stream_result = nats_client
364 .stream_request(
365 &ClusterPlotterPlotSectorRequest {
366 public_key,
367 shard_commitments_root,
368 sector_index,
369 farmer_protocol_info,
370 pieces_in_sector,
371 },
372 Some(&free_instance),
373 )
374 .await;
375 trace!("Subscribed to plotting notifications");
376
377 let mut response_stream = match response_stream_result {
378 Ok(response_stream) => response_stream,
379 Err(error) => {
380 progress_updater
381 .update_progress_and_events(
382 &mut progress_sender,
383 SectorPlottingProgress::Error {
384 error: format!("Failed make stream request: {error}"),
385 },
386 )
387 .await;
388
389 break;
390 }
391 };
392
393 let (mut sector_sender, sector_receiver) = mpsc::channel(
396 (sector_size(pieces_in_sector) / nats_client.approximate_max_message_size())
397 .max(1),
398 );
399 let mut maybe_sector_receiver = Some(sector_receiver);
400 loop {
401 match tokio::time::timeout(PING_TIMEOUT, response_stream.next()).await {
402 Ok(Some(response)) => {
403 match process_response_notification(
404 &start,
405 &free_instance,
406 &progress_updater,
407 &mut progress_sender,
408 &mut retry_backoff_policy,
409 response,
410 &mut sector_sender,
411 &mut maybe_sector_receiver,
412 )
413 .await
414 {
415 ResponseProcessingResult::Retry => {
416 debug!("Retrying");
417 continue 'outer;
418 }
419 ResponseProcessingResult::Abort => {
420 debug!("Aborting");
421 break 'outer;
422 }
423 ResponseProcessingResult::Continue => {
424 trace!("Continue");
425 }
427 }
428 }
429 Ok(None) => {
430 trace!("Plotting done");
431 break;
432 }
433 Err(_error) => {
434 progress_updater
435 .update_progress_and_events(
436 &mut progress_sender,
437 SectorPlottingProgress::Error {
438 error: "Timed out without ping from plotter".to_string(),
439 },
440 )
441 .await;
442 break;
443 }
444 }
445 }
446
447 break;
448 }
449
450 drop(sector_encoding_permit);
451 };
452
453 let plotting_task =
454 AsyncJoinOnDrop::new(tokio::spawn(plotting_fut.in_current_span()), true);
455 if let Err(error) = self.tasks_sender.clone().send(plotting_task).await {
456 warn!(%error, "Failed to send plotting task");
457
458 let progress = SectorPlottingProgress::Error {
459 error: format!("Failed to send plotting task: {error}"),
460 };
461
462 self.handlers
463 .plotting_progress
464 .call_simple(&public_key, §or_index, &progress);
465 }
466 }
467}
468
469async fn get_free_plotter_instance<PS>(
471 nats_client: &NatsClient,
472 progress_updater: &ProgressUpdater,
473 progress_sender: &mut PS,
474 retry_backoff_policy: &mut ExponentialBackoff,
475) -> Option<String>
476where
477 PS: Sink<SectorPlottingProgress> + Unpin + Send + 'static,
478 PS::Error: Error,
479{
480 loop {
481 match nats_client
482 .request(&ClusterPlotterFreeInstanceRequest, None)
483 .await
484 {
485 Ok(Some(free_instance)) => {
486 return Some(free_instance);
487 }
488 Ok(None) => {
489 let Some(delay) = retry_backoff_policy.next() else {
490 progress_updater
491 .update_progress_and_events(
492 progress_sender,
493 SectorPlottingProgress::Error {
494 error: "Instance was occupied, exiting #1".to_string(),
495 },
496 )
497 .await;
498 return None;
499 };
500
501 debug!("Instance was occupied, retrying #1");
502
503 tokio::time::sleep(delay).await;
504 }
505 Err(error) => match error.kind() {
506 RequestErrorKind::TimedOut => {
507 let Some(delay) = retry_backoff_policy.next() else {
508 progress_updater
509 .update_progress_and_events(
510 progress_sender,
511 SectorPlottingProgress::Error {
512 error: "Plotter request timed out, exiting".to_string(),
513 },
514 )
515 .await;
516 return None;
517 };
518
519 debug!("Plotter request timed out, retrying");
520
521 tokio::time::sleep(delay).await;
522 }
523 RequestErrorKind::NoResponders => {
524 let Some(delay) = retry_backoff_policy.next() else {
525 progress_updater
526 .update_progress_and_events(
527 progress_sender,
528 SectorPlottingProgress::Error {
529 error: "No plotters, exiting".to_string(),
530 },
531 )
532 .await;
533 return None;
534 };
535
536 debug!("No plotters, retrying");
537
538 tokio::time::sleep(delay).await;
539 }
540 RequestErrorKind::InvalidSubject
541 | RequestErrorKind::MaxPayloadExceeded
542 | RequestErrorKind::Other => {
543 progress_updater
544 .update_progress_and_events(
545 progress_sender,
546 SectorPlottingProgress::Error {
547 error: format!("Failed to get free plotter instance: {error}"),
548 },
549 )
550 .await;
551 return None;
552 }
553 },
554 }
555 }
556}
557
558enum ResponseProcessingResult {
559 Retry,
560 Abort,
561 Continue,
562}
563
564#[expect(clippy::too_many_arguments, reason = "Internal API")]
565async fn process_response_notification<PS>(
566 start: &Instant,
567 free_instance: &str,
568 progress_updater: &ProgressUpdater,
569 progress_sender: &mut PS,
570 retry_backoff_policy: &mut ExponentialBackoff,
571 response: ClusterSectorPlottingProgress,
572 sector_sender: &mut mpsc::Sender<Result<Bytes, String>>,
573 maybe_sector_receiver: &mut Option<mpsc::Receiver<Result<Bytes, String>>>,
574) -> ResponseProcessingResult
575where
576 PS: Sink<SectorPlottingProgress> + Unpin + Send + 'static,
577 PS::Error: Error,
578{
579 if matches!(response, ClusterSectorPlottingProgress::SectorChunk(_)) {
580 trace!("Processing plotting response notification (sector chunk)");
581 } else {
582 trace!(?response, "Processing plotting response notification");
583 }
584
585 match response {
586 ClusterSectorPlottingProgress::Occupied => {
587 debug!(%free_instance, "Instance was occupied, retrying #2");
588
589 return if let Some(delay) = retry_backoff_policy.next() {
590 debug!("Instance was occupied, retrying #2");
591
592 tokio::time::sleep(delay).await;
593 ResponseProcessingResult::Retry
594 } else {
595 debug!("Instance was occupied, exiting #2");
596 ResponseProcessingResult::Abort
597 };
598 }
599 ClusterSectorPlottingProgress::Ping => {
600 }
602 ClusterSectorPlottingProgress::Downloading => {
603 if !progress_updater
604 .update_progress_and_events(progress_sender, SectorPlottingProgress::Downloading)
605 .await
606 {
607 return ResponseProcessingResult::Abort;
608 }
609 }
610 ClusterSectorPlottingProgress::Downloaded(time) => {
611 if !progress_updater
612 .update_progress_and_events(
613 progress_sender,
614 SectorPlottingProgress::Downloaded(time),
615 )
616 .await
617 {
618 return ResponseProcessingResult::Abort;
619 }
620 }
621 ClusterSectorPlottingProgress::Encoding => {
622 if !progress_updater
623 .update_progress_and_events(progress_sender, SectorPlottingProgress::Encoding)
624 .await
625 {
626 return ResponseProcessingResult::Abort;
627 }
628 }
629 ClusterSectorPlottingProgress::Encoded(time) => {
630 if !progress_updater
631 .update_progress_and_events(progress_sender, SectorPlottingProgress::Encoded(time))
632 .await
633 {
634 return ResponseProcessingResult::Abort;
635 }
636 }
637 ClusterSectorPlottingProgress::Finished {
638 plotted_sector,
639 time: _,
640 } => {
641 let Some(sector_receiver) = maybe_sector_receiver.take() else {
642 debug!("Unexpected duplicated sector plotting progress Finished");
643
644 progress_updater
645 .update_progress_and_events(
646 progress_sender,
647 SectorPlottingProgress::Error {
648 error: "Unexpected duplicated sector plotting progress Finished"
649 .to_string(),
650 },
651 )
652 .await;
653 return ResponseProcessingResult::Abort;
654 };
655
656 let progress = SectorPlottingProgress::Finished {
657 plotted_sector,
658 time: start.elapsed(),
660 sector: Box::pin(sector_receiver),
661 };
662 if !progress_updater
663 .update_progress_and_events(progress_sender, progress)
664 .await
665 {
666 return ResponseProcessingResult::Abort;
667 }
668
669 return ResponseProcessingResult::Continue;
670 }
671 ClusterSectorPlottingProgress::SectorChunk(maybe_sector_chunk) => {
673 if let Err(error) = sector_sender.send(maybe_sector_chunk).await {
674 warn!(%error, "Failed to send sector chunk");
675 return ResponseProcessingResult::Abort;
676 }
677 return ResponseProcessingResult::Continue;
678 }
679 ClusterSectorPlottingProgress::Error { error } => {
680 if !progress_updater
681 .update_progress_and_events(
682 progress_sender,
683 SectorPlottingProgress::Error { error },
684 )
685 .await
686 {
687 return ResponseProcessingResult::Abort;
688 }
689 }
690 }
691
692 ResponseProcessingResult::Continue
693}
694
695struct ProgressUpdater {
696 public_key: Ed25519PublicKey,
697 sector_index: SectorIndex,
698 handlers: Arc<Handlers>,
699}
700
701impl ProgressUpdater {
702 async fn update_progress_and_events<PS>(
704 &self,
705 progress_sender: &mut PS,
706 progress: SectorPlottingProgress,
707 ) -> bool
708 where
709 PS: Sink<SectorPlottingProgress> + Unpin,
710 PS::Error: Error,
711 {
712 self.handlers.plotting_progress.call_simple(
713 &self.public_key,
714 &self.sector_index,
715 &progress,
716 );
717
718 if let Err(error) = progress_sender.send(progress).await {
719 warn!(%error, "Failed to send error progress update");
720
721 false
722 } else {
723 true
724 }
725 }
726}
727
728pub async fn plotter_service<P>(nats_client: &NatsClient, plotter: &P) -> anyhow::Result<()>
733where
734 P: Plotter + Sync,
735{
736 let plotter_id = ClusterPlotterId::new();
737
738 select! {
739 result = free_instance_responder(&plotter_id, nats_client, plotter).fuse() => {
740 result
741 }
742 result = plot_sector_responder(&plotter_id, nats_client, plotter).fuse() => {
743 result
744 }
745 }
746}
747
748async fn free_instance_responder<P>(
749 plotter_id: &ClusterPlotterId,
750 nats_client: &NatsClient,
751 plotter: &P,
752) -> anyhow::Result<()>
753where
754 P: Plotter + Sync,
755{
756 loop {
757 while !plotter.has_free_capacity().await.unwrap_or_default() {
758 tokio::time::sleep(FREE_CAPACITY_CHECK_INTERVAL).await;
759 }
760
761 let mut subscription = nats_client
762 .queue_subscribe(
763 ClusterPlotterFreeInstanceRequest::SUBJECT,
764 "ab.plotter".to_string(),
765 )
766 .await
767 .map_err(|error| anyhow!("Failed to subscribe to free instance requests: {error}"))?;
768 debug!(?subscription, "Free instance subscription");
769
770 while let Some(message) = subscription.next().await {
771 let Some(reply_subject) = message.reply else {
772 continue;
773 };
774
775 debug!(%reply_subject, "Free instance request");
776
777 let has_free_capacity = plotter.has_free_capacity().await.unwrap_or_default();
778 let response: <ClusterPlotterFreeInstanceRequest as GenericRequest>::Response =
779 has_free_capacity.then(|| plotter_id.to_string());
780
781 if let Err(error) = nats_client
782 .publish(reply_subject, response.encode().into())
783 .await
784 {
785 warn!(%error, "Failed to send free instance response");
786 }
787
788 if !has_free_capacity {
789 subscription.unsubscribe().await.map_err(|error| {
790 anyhow!("Failed to unsubscribe from free instance requests: {error}")
791 })?;
792 }
793 }
794 }
795}
796
797async fn plot_sector_responder<P>(
798 plotter_id: &ClusterPlotterId,
799 nats_client: &NatsClient,
800 plotter: &P,
801) -> anyhow::Result<()>
802where
803 P: Plotter + Sync,
804{
805 let plotter_id_string = plotter_id.to_string();
806
807 nats_client
808 .stream_request_responder(
809 Some(&plotter_id_string),
810 Some(plotter_id_string.clone()),
811 |request| async move {
812 let (progress_sender, mut progress_receiver) = mpsc::channel(10);
813
814 let fut =
815 process_plot_sector_request(nats_client, plotter, request, progress_sender);
816 let mut fut = Box::pin(fut.fuse());
817
818 Some(
819 stream::poll_fn(move |cx| {
821 if !fut.is_terminated() {
822 let _: Poll<()> = fut.poll_unpin(cx);
824 }
825
826 if let Poll::Ready(maybe_result) = progress_receiver.poll_next_unpin(cx) {
827 return Poll::Ready(maybe_result);
828 }
829
830 Poll::Pending
832 }),
833 )
834 },
835 )
836 .await
837}
838
839async fn process_plot_sector_request<P>(
840 nats_client: &NatsClient,
841 plotter: &P,
842 request: ClusterPlotterPlotSectorRequest,
843 mut response_proxy_sender: mpsc::Sender<ClusterSectorPlottingProgress>,
844) where
845 P: Plotter,
846{
847 let ClusterPlotterPlotSectorRequest {
848 public_key,
849 shard_commitments_root,
850 sector_index,
851 farmer_protocol_info,
852 pieces_in_sector,
853 } = request;
854
855 let inner_fut = async {
857 info!("Plot sector request");
858
859 let (progress_sender, mut progress_receiver) = mpsc::channel(1);
860
861 if !plotter
862 .try_plot_sector(
863 public_key,
864 shard_commitments_root,
865 sector_index,
866 farmer_protocol_info,
867 pieces_in_sector,
868 false,
869 progress_sender,
870 )
871 .await
872 {
873 debug!("Plotter is currently occupied and can't plot more sectors");
874
875 if let Err(error) = response_proxy_sender
876 .send(ClusterSectorPlottingProgress::Occupied)
877 .await
878 {
879 warn!(%error, "Failed to send plotting progress");
880 return;
881 }
882 return;
883 }
884
885 let progress_proxy_fut = {
886 let mut response_proxy_sender = response_proxy_sender.clone();
887 let approximate_max_message_size = nats_client.approximate_max_message_size();
888
889 async move {
890 while let Some(progress) = progress_receiver.next().await {
891 send_publish_progress(
892 &mut response_proxy_sender,
893 progress,
894 approximate_max_message_size,
895 )
896 .await;
897 }
898 }
899 };
900
901 let mut ping_interval = tokio::time::interval(PING_INTERVAL);
902 ping_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
903 let ping_fut = async {
904 loop {
905 ping_interval.tick().await;
906 if let Err(error) = response_proxy_sender
907 .send(ClusterSectorPlottingProgress::Ping)
908 .await
909 {
910 warn!(%error, "Failed to send plotting ping");
911 return;
912 }
913 }
914 };
915
916 select! {
917 () = progress_proxy_fut.fuse() => {
918 }
920 () = ping_fut.fuse() => {
921 unreachable!("Ping loop never ends");
922 }
923 }
924
925 info!("Finished plotting sector successfully");
926 };
927
928 inner_fut
929 .instrument(info_span!("", %public_key, %sector_index))
930 .await;
931}
932
933async fn send_publish_progress(
934 response_sender: &mut mpsc::Sender<ClusterSectorPlottingProgress>,
935 progress: SectorPlottingProgress,
936 approximate_max_message_size: usize,
937) {
938 let cluster_progress = match progress {
940 SectorPlottingProgress::Downloading => ClusterSectorPlottingProgress::Downloading,
941 SectorPlottingProgress::Downloaded(time) => ClusterSectorPlottingProgress::Downloaded(time),
942 SectorPlottingProgress::Encoding => ClusterSectorPlottingProgress::Encoding,
943 SectorPlottingProgress::Encoded(time) => ClusterSectorPlottingProgress::Encoded(time),
944 SectorPlottingProgress::Finished {
945 plotted_sector,
946 time,
947 mut sector,
948 } => {
949 if let Err(error) = response_sender
950 .send(ClusterSectorPlottingProgress::Finished {
951 plotted_sector,
952 time,
953 })
954 .await
955 {
956 warn!(%error, "Failed to send plotting progress");
957 return;
958 }
959
960 while let Some(maybe_sector_chunk) = sector.next().await {
961 match maybe_sector_chunk {
962 Ok(sector_chunk) => {
963 for small_sector_chunk in sector_chunk.chunks(approximate_max_message_size)
965 {
966 if let Err(error) = response_sender
967 .send(ClusterSectorPlottingProgress::SectorChunk(Ok(
968 sector_chunk.slice_ref(small_sector_chunk)
969 )))
970 .await
971 {
972 warn!(%error, "Failed to send plotting progress");
973 return;
974 }
975 }
976 }
977 Err(error) => {
978 if let Err(error) = response_sender
979 .send(ClusterSectorPlottingProgress::SectorChunk(Err(error)))
980 .await
981 {
982 warn!(%error, "Failed to send plotting progress");
983 return;
984 }
985 }
986 }
987 }
988
989 return;
990 }
991 SectorPlottingProgress::Error { error } => ClusterSectorPlottingProgress::Error { error },
992 };
993
994 if let Err(error) = response_sender.send(cluster_progress).await {
995 warn!(%error, "Failed to send plotting progress");
996 }
997}