1use crate::cluster::nats_client::{GenericRequest, GenericStreamRequest, NatsClient};
10use crate::plotter::{Plotter, SectorPlottingProgress};
11use crate::utils::AsyncJoinOnDrop;
12use ab_core_primitives::ed25519::Ed25519PublicKey;
13use ab_core_primitives::sectors::SectorIndex;
14use ab_core_primitives::solutions::ShardCommitmentHash;
15use ab_farmer_components::FarmerProtocolInfo;
16use ab_farmer_components::plotting::PlottedSector;
17use ab_farmer_components::sector::sector_size;
18use anyhow::anyhow;
19use async_nats::RequestErrorKind;
20use async_trait::async_trait;
21use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder};
22use bytes::Bytes;
23use derive_more::Display;
24use event_listener_primitives::{Bag, HandlerId};
25use futures::channel::mpsc;
26use futures::future::FusedFuture;
27use futures::stream::FuturesUnordered;
28use futures::{FutureExt, Sink, SinkExt, StreamExt, select, stream};
29use parity_scale_codec::{Decode, Encode};
30use std::error::Error;
31use std::future::pending;
32use std::num::NonZeroUsize;
33use std::pin::pin;
34use std::sync::Arc;
35use std::task::Poll;
36use std::time::{Duration, Instant};
37use tokio::sync::{OwnedSemaphorePermit, Semaphore};
38use tokio::time::MissedTickBehavior;
39use tracing::{Instrument, debug, info, info_span, trace, warn};
40use ulid::Ulid;
41
42const FREE_CAPACITY_CHECK_INTERVAL: Duration = Duration::from_secs(1);
43const PING_INTERVAL: Duration = Duration::from_secs(10);
45const PING_TIMEOUT: Duration = Duration::from_mins(1);
47
48pub type HandlerFn3<A, B, C> = Arc<dyn Fn(&A, &B, &C) + Send + Sync + 'static>;
50type Handler3<A, B, C> = Bag<HandlerFn3<A, B, C>, A, B, C>;
51
52#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Display)]
54pub enum ClusterPlotterId {
55 Ulid(Ulid),
57}
58
59#[expect(clippy::new_without_default)]
60impl ClusterPlotterId {
61 pub fn new() -> Self {
63 Self::Ulid(Ulid::new())
64 }
65}
66
67#[derive(Debug, Clone, Encode, Decode)]
69struct ClusterPlotterFreeInstanceRequest;
70
71impl GenericRequest for ClusterPlotterFreeInstanceRequest {
72 const SUBJECT: &'static str = "ab.plotter.free-instance";
73 type Response = Option<String>;
75}
76
77#[derive(Debug, Encode, Decode)]
78enum ClusterSectorPlottingProgress {
79 Occupied,
81 Ping,
83 Downloading,
85 Downloaded(Duration),
87 Encoding,
89 Encoded(Duration),
91 Finished {
93 plotted_sector: PlottedSector,
95 time: Duration,
97 },
98 SectorChunk(Result<Bytes, String>),
100 Error {
102 error: String,
104 },
105}
106
107#[derive(Debug, Clone, Encode, Decode)]
109struct ClusterPlotterPlotSectorRequest {
110 public_key: Ed25519PublicKey,
111 shard_commitments_root: ShardCommitmentHash,
112 sector_index: SectorIndex,
113 farmer_protocol_info: FarmerProtocolInfo,
114 pieces_in_sector: u16,
115}
116
117impl GenericStreamRequest for ClusterPlotterPlotSectorRequest {
118 const SUBJECT: &'static str = "ab.plotter.*.plot-sector";
119 type Response = ClusterSectorPlottingProgress;
120}
121
122#[derive(Default, Debug)]
123struct Handlers {
124 plotting_progress: Handler3<Ed25519PublicKey, SectorIndex, SectorPlottingProgress>,
125}
126
127#[derive(Debug)]
129pub struct ClusterPlotter {
130 sector_encoding_semaphore: Arc<Semaphore>,
131 retry_backoff_policy: ExponentialBuilder,
132 nats_client: NatsClient,
133 handlers: Arc<Handlers>,
134 tasks_sender: mpsc::Sender<AsyncJoinOnDrop<()>>,
135 _background_tasks: AsyncJoinOnDrop<()>,
136}
137
138impl Drop for ClusterPlotter {
139 #[inline]
140 fn drop(&mut self) {
141 self.tasks_sender.close_channel();
142 }
143}
144
145#[async_trait]
146impl Plotter for ClusterPlotter {
147 async fn has_free_capacity(&self) -> Result<bool, String> {
148 Ok(self.sector_encoding_semaphore.available_permits() > 0
149 && self
150 .nats_client
151 .request(&ClusterPlotterFreeInstanceRequest, None)
152 .await
153 .map_err(|error| error.to_string())?
154 .is_some())
155 }
156
157 async fn plot_sector(
158 &self,
159 public_key: Ed25519PublicKey,
160 shard_commitments_root: ShardCommitmentHash,
161 sector_index: SectorIndex,
162 farmer_protocol_info: FarmerProtocolInfo,
163 pieces_in_sector: u16,
164 _replotting: bool,
165 mut progress_sender: mpsc::Sender<SectorPlottingProgress>,
166 ) {
167 let start = Instant::now();
168
169 let sector_encoding_permit = match Arc::clone(&self.sector_encoding_semaphore)
172 .acquire_owned()
173 .await
174 {
175 Ok(sector_encoding_permit) => sector_encoding_permit,
176 Err(error) => {
177 warn!(%error, "Failed to acquire sector encoding permit");
178
179 let progress_updater = ProgressUpdater {
180 public_key,
181 sector_index,
182 handlers: Arc::clone(&self.handlers),
183 };
184
185 progress_updater
186 .update_progress_and_events(
187 &mut progress_sender,
188 SectorPlottingProgress::Error {
189 error: format!("Failed to acquire sector encoding permit: {error}"),
190 },
191 )
192 .await;
193
194 return;
195 }
196 };
197
198 self.plot_sector_internal(
199 start,
200 sector_encoding_permit,
201 public_key,
202 shard_commitments_root,
203 sector_index,
204 farmer_protocol_info,
205 pieces_in_sector,
206 progress_sender,
207 )
208 .await;
209 }
210
211 async fn try_plot_sector(
212 &self,
213 public_key: Ed25519PublicKey,
214 shard_commitments_root: ShardCommitmentHash,
215 sector_index: SectorIndex,
216 farmer_protocol_info: FarmerProtocolInfo,
217 pieces_in_sector: u16,
218 _replotting: bool,
219 progress_sender: mpsc::Sender<SectorPlottingProgress>,
220 ) -> bool {
221 let start = Instant::now();
222
223 let Ok(sector_encoding_permit) =
224 Arc::clone(&self.sector_encoding_semaphore).try_acquire_owned()
225 else {
226 return false;
227 };
228
229 self.plot_sector_internal(
230 start,
231 sector_encoding_permit,
232 public_key,
233 shard_commitments_root,
234 sector_index,
235 farmer_protocol_info,
236 pieces_in_sector,
237 progress_sender,
238 )
239 .await;
240
241 true
242 }
243}
244
245impl ClusterPlotter {
246 pub fn new(
248 nats_client: NatsClient,
249 sector_encoding_concurrency: NonZeroUsize,
250 retry_backoff_policy: ExponentialBuilder,
251 ) -> Self {
252 let sector_encoding_semaphore = Arc::new(Semaphore::new(sector_encoding_concurrency.get()));
253
254 let (tasks_sender, mut tasks_receiver) = mpsc::channel(1);
255
256 let background_tasks = AsyncJoinOnDrop::new(
258 tokio::spawn(async move {
259 let background_tasks = FuturesUnordered::new();
260 let mut background_tasks = pin!(background_tasks);
261 background_tasks.push(AsyncJoinOnDrop::new(tokio::spawn(pending::<()>()), true));
263
264 loop {
265 select! {
266 maybe_background_task = tasks_receiver.next().fuse() => {
267 let Some(background_task) = maybe_background_task else {
268 break;
269 };
270
271 background_tasks.push(background_task);
272 },
273 _ = background_tasks.select_next_some() => {
274 }
276 }
277 }
278 }),
279 true,
280 );
281
282 Self {
283 sector_encoding_semaphore,
284 retry_backoff_policy,
285 nats_client,
286 handlers: Arc::default(),
287 tasks_sender,
288 _background_tasks: background_tasks,
289 }
290 }
291
292 pub fn on_plotting_progress(
294 &self,
295 callback: HandlerFn3<Ed25519PublicKey, SectorIndex, SectorPlottingProgress>,
296 ) -> HandlerId {
297 self.handlers.plotting_progress.add(callback)
298 }
299
300 #[expect(clippy::too_many_arguments)]
301 async fn plot_sector_internal<PS>(
302 &self,
303 start: Instant,
304 sector_encoding_permit: OwnedSemaphorePermit,
305 public_key: Ed25519PublicKey,
306 shard_commitments_root: ShardCommitmentHash,
307 sector_index: SectorIndex,
308 farmer_protocol_info: FarmerProtocolInfo,
309 pieces_in_sector: u16,
310 mut progress_sender: PS,
311 ) where
312 PS: Sink<SectorPlottingProgress> + Unpin + Send + 'static,
313 PS::Error: Error,
314 {
315 trace!("Starting plotting, getting plotting permit");
316
317 let progress_updater = ProgressUpdater {
318 public_key,
319 sector_index,
320 handlers: Arc::clone(&self.handlers),
321 };
322
323 let mut retry_backoff_policy = self.retry_backoff_policy.build();
324
325 let free_plotter_instance_fut = get_free_plotter_instance(
327 &self.nats_client,
328 &progress_updater,
329 &mut progress_sender,
330 &mut retry_backoff_policy,
331 );
332 let mut maybe_free_instance = free_plotter_instance_fut.await;
333 if maybe_free_instance.is_none() {
334 return;
335 }
336
337 trace!("Got plotting permit #1");
338
339 let nats_client = self.nats_client.clone();
340
341 let plotting_fut = async move {
342 'outer: loop {
343 let free_instance = if let Some(free_instance) = maybe_free_instance.take() {
345 free_instance
346 } else {
347 let free_plotter_instance_fut = get_free_plotter_instance(
348 &nats_client,
349 &progress_updater,
350 &mut progress_sender,
351 &mut retry_backoff_policy,
352 );
353 let Some(free_instance) = free_plotter_instance_fut.await else {
354 break;
355 };
356 trace!("Got plotting permit #2");
357 free_instance
358 };
359
360 let response_stream_result = nats_client
361 .stream_request(
362 &ClusterPlotterPlotSectorRequest {
363 public_key,
364 shard_commitments_root,
365 sector_index,
366 farmer_protocol_info,
367 pieces_in_sector,
368 },
369 Some(&free_instance),
370 )
371 .await;
372 trace!("Subscribed to plotting notifications");
373
374 let mut response_stream = match response_stream_result {
375 Ok(response_stream) => response_stream,
376 Err(error) => {
377 progress_updater
378 .update_progress_and_events(
379 &mut progress_sender,
380 SectorPlottingProgress::Error {
381 error: format!("Failed make stream request: {error}"),
382 },
383 )
384 .await;
385
386 break;
387 }
388 };
389
390 let (mut sector_sender, sector_receiver) = mpsc::channel(
393 (sector_size(pieces_in_sector) / nats_client.approximate_max_message_size())
394 .max(1),
395 );
396 let mut maybe_sector_receiver = Some(sector_receiver);
397 loop {
398 match tokio::time::timeout(PING_TIMEOUT, response_stream.next()).await {
399 Ok(Some(response)) => {
400 match process_response_notification(
401 &start,
402 &free_instance,
403 &progress_updater,
404 &mut progress_sender,
405 &mut retry_backoff_policy,
406 response,
407 &mut sector_sender,
408 &mut maybe_sector_receiver,
409 )
410 .await
411 {
412 ResponseProcessingResult::Retry => {
413 debug!("Retrying");
414 continue 'outer;
415 }
416 ResponseProcessingResult::Abort => {
417 debug!("Aborting");
418 break 'outer;
419 }
420 ResponseProcessingResult::Continue => {
421 trace!("Continue");
422 }
424 }
425 }
426 Ok(None) => {
427 trace!("Plotting done");
428 break;
429 }
430 Err(_error) => {
431 progress_updater
432 .update_progress_and_events(
433 &mut progress_sender,
434 SectorPlottingProgress::Error {
435 error: "Timed out without ping from plotter".to_string(),
436 },
437 )
438 .await;
439 break;
440 }
441 }
442 }
443
444 break;
445 }
446
447 drop(sector_encoding_permit);
448 };
449
450 let plotting_task =
451 AsyncJoinOnDrop::new(tokio::spawn(plotting_fut.in_current_span()), true);
452 if let Err(error) = self.tasks_sender.clone().send(plotting_task).await {
453 warn!(%error, "Failed to send plotting task");
454
455 let progress = SectorPlottingProgress::Error {
456 error: format!("Failed to send plotting task: {error}"),
457 };
458
459 self.handlers
460 .plotting_progress
461 .call_simple(&public_key, §or_index, &progress);
462 }
463 }
464}
465
466async fn get_free_plotter_instance<PS>(
468 nats_client: &NatsClient,
469 progress_updater: &ProgressUpdater,
470 progress_sender: &mut PS,
471 retry_backoff_policy: &mut ExponentialBackoff,
472) -> Option<String>
473where
474 PS: Sink<SectorPlottingProgress> + Unpin + Send + 'static,
475 PS::Error: Error,
476{
477 loop {
478 match nats_client
479 .request(&ClusterPlotterFreeInstanceRequest, None)
480 .await
481 {
482 Ok(Some(free_instance)) => {
483 return Some(free_instance);
484 }
485 Ok(None) => {
486 let Some(delay) = retry_backoff_policy.next() else {
487 progress_updater
488 .update_progress_and_events(
489 progress_sender,
490 SectorPlottingProgress::Error {
491 error: "Instance was occupied, exiting #1".to_string(),
492 },
493 )
494 .await;
495 return None;
496 };
497
498 debug!("Instance was occupied, retrying #1");
499
500 tokio::time::sleep(delay).await;
501 }
502 Err(error) => match error.kind() {
503 RequestErrorKind::TimedOut => {
504 let Some(delay) = retry_backoff_policy.next() else {
505 progress_updater
506 .update_progress_and_events(
507 progress_sender,
508 SectorPlottingProgress::Error {
509 error: "Plotter request timed out, exiting".to_string(),
510 },
511 )
512 .await;
513 return None;
514 };
515
516 debug!("Plotter request timed out, retrying");
517
518 tokio::time::sleep(delay).await;
519 }
520 RequestErrorKind::NoResponders => {
521 let Some(delay) = retry_backoff_policy.next() else {
522 progress_updater
523 .update_progress_and_events(
524 progress_sender,
525 SectorPlottingProgress::Error {
526 error: "No plotters, exiting".to_string(),
527 },
528 )
529 .await;
530 return None;
531 };
532
533 debug!("No plotters, retrying");
534
535 tokio::time::sleep(delay).await;
536 }
537 RequestErrorKind::InvalidSubject
538 | RequestErrorKind::MaxPayloadExceeded
539 | RequestErrorKind::Other => {
540 progress_updater
541 .update_progress_and_events(
542 progress_sender,
543 SectorPlottingProgress::Error {
544 error: format!("Failed to get free plotter instance: {error}"),
545 },
546 )
547 .await;
548 return None;
549 }
550 },
551 }
552 }
553}
554
555enum ResponseProcessingResult {
556 Retry,
557 Abort,
558 Continue,
559}
560
561#[expect(clippy::too_many_arguments)]
562async fn process_response_notification<PS>(
563 start: &Instant,
564 free_instance: &str,
565 progress_updater: &ProgressUpdater,
566 progress_sender: &mut PS,
567 retry_backoff_policy: &mut ExponentialBackoff,
568 response: ClusterSectorPlottingProgress,
569 sector_sender: &mut mpsc::Sender<Result<Bytes, String>>,
570 maybe_sector_receiver: &mut Option<mpsc::Receiver<Result<Bytes, String>>>,
571) -> ResponseProcessingResult
572where
573 PS: Sink<SectorPlottingProgress> + Unpin + Send + 'static,
574 PS::Error: Error,
575{
576 if matches!(response, ClusterSectorPlottingProgress::SectorChunk(_)) {
577 trace!("Processing plotting response notification (sector chunk)");
578 } else {
579 trace!(?response, "Processing plotting response notification");
580 }
581
582 match response {
583 ClusterSectorPlottingProgress::Occupied => {
584 debug!(%free_instance, "Instance was occupied, retrying #2");
585
586 return if let Some(delay) = retry_backoff_policy.next() {
587 debug!("Instance was occupied, retrying #2");
588
589 tokio::time::sleep(delay).await;
590 ResponseProcessingResult::Retry
591 } else {
592 debug!("Instance was occupied, exiting #2");
593 ResponseProcessingResult::Abort
594 };
595 }
596 ClusterSectorPlottingProgress::Ping => {
597 }
599 ClusterSectorPlottingProgress::Downloading => {
600 if !progress_updater
601 .update_progress_and_events(progress_sender, SectorPlottingProgress::Downloading)
602 .await
603 {
604 return ResponseProcessingResult::Abort;
605 }
606 }
607 ClusterSectorPlottingProgress::Downloaded(time) => {
608 if !progress_updater
609 .update_progress_and_events(
610 progress_sender,
611 SectorPlottingProgress::Downloaded(time),
612 )
613 .await
614 {
615 return ResponseProcessingResult::Abort;
616 }
617 }
618 ClusterSectorPlottingProgress::Encoding => {
619 if !progress_updater
620 .update_progress_and_events(progress_sender, SectorPlottingProgress::Encoding)
621 .await
622 {
623 return ResponseProcessingResult::Abort;
624 }
625 }
626 ClusterSectorPlottingProgress::Encoded(time) => {
627 if !progress_updater
628 .update_progress_and_events(progress_sender, SectorPlottingProgress::Encoded(time))
629 .await
630 {
631 return ResponseProcessingResult::Abort;
632 }
633 }
634 ClusterSectorPlottingProgress::Finished {
635 plotted_sector,
636 time: _,
637 } => {
638 let Some(sector_receiver) = maybe_sector_receiver.take() else {
639 debug!("Unexpected duplicated sector plotting progress Finished");
640
641 progress_updater
642 .update_progress_and_events(
643 progress_sender,
644 SectorPlottingProgress::Error {
645 error: "Unexpected duplicated sector plotting progress Finished"
646 .to_string(),
647 },
648 )
649 .await;
650 return ResponseProcessingResult::Abort;
651 };
652
653 let progress = SectorPlottingProgress::Finished {
654 plotted_sector,
655 time: start.elapsed(),
657 sector: Box::pin(sector_receiver),
658 };
659 if !progress_updater
660 .update_progress_and_events(progress_sender, progress)
661 .await
662 {
663 return ResponseProcessingResult::Abort;
664 }
665
666 return ResponseProcessingResult::Continue;
667 }
668 ClusterSectorPlottingProgress::SectorChunk(maybe_sector_chunk) => {
670 if let Err(error) = sector_sender.send(maybe_sector_chunk).await {
671 warn!(%error, "Failed to send sector chunk");
672 return ResponseProcessingResult::Abort;
673 }
674 return ResponseProcessingResult::Continue;
675 }
676 ClusterSectorPlottingProgress::Error { error } => {
677 if !progress_updater
678 .update_progress_and_events(
679 progress_sender,
680 SectorPlottingProgress::Error { error },
681 )
682 .await
683 {
684 return ResponseProcessingResult::Abort;
685 }
686 }
687 }
688
689 ResponseProcessingResult::Continue
690}
691
692struct ProgressUpdater {
693 public_key: Ed25519PublicKey,
694 sector_index: SectorIndex,
695 handlers: Arc<Handlers>,
696}
697
698impl ProgressUpdater {
699 async fn update_progress_and_events<PS>(
701 &self,
702 progress_sender: &mut PS,
703 progress: SectorPlottingProgress,
704 ) -> bool
705 where
706 PS: Sink<SectorPlottingProgress> + Unpin,
707 PS::Error: Error,
708 {
709 self.handlers.plotting_progress.call_simple(
710 &self.public_key,
711 &self.sector_index,
712 &progress,
713 );
714
715 if let Err(error) = progress_sender.send(progress).await {
716 warn!(%error, "Failed to send error progress update");
717
718 false
719 } else {
720 true
721 }
722 }
723}
724
725pub async fn plotter_service<P>(nats_client: &NatsClient, plotter: &P) -> anyhow::Result<()>
730where
731 P: Plotter + Sync,
732{
733 let plotter_id = ClusterPlotterId::new();
734
735 select! {
736 result = free_instance_responder(&plotter_id, nats_client, plotter).fuse() => {
737 result
738 }
739 result = plot_sector_responder(&plotter_id, nats_client, plotter).fuse() => {
740 result
741 }
742 }
743}
744
745async fn free_instance_responder<P>(
746 plotter_id: &ClusterPlotterId,
747 nats_client: &NatsClient,
748 plotter: &P,
749) -> anyhow::Result<()>
750where
751 P: Plotter + Sync,
752{
753 loop {
754 while !plotter.has_free_capacity().await.unwrap_or_default() {
755 tokio::time::sleep(FREE_CAPACITY_CHECK_INTERVAL).await;
756 }
757
758 let mut subscription = nats_client
759 .queue_subscribe(
760 ClusterPlotterFreeInstanceRequest::SUBJECT,
761 "ab.plotter".to_string(),
762 )
763 .await
764 .map_err(|error| anyhow!("Failed to subscribe to free instance requests: {error}"))?;
765 debug!(?subscription, "Free instance subscription");
766
767 while let Some(message) = subscription.next().await {
768 let Some(reply_subject) = message.reply else {
769 continue;
770 };
771
772 debug!(%reply_subject, "Free instance request");
773
774 let has_free_capacity = plotter.has_free_capacity().await.unwrap_or_default();
775 let response: <ClusterPlotterFreeInstanceRequest as GenericRequest>::Response =
776 has_free_capacity.then(|| plotter_id.to_string());
777
778 if let Err(error) = nats_client
779 .publish(reply_subject, response.encode().into())
780 .await
781 {
782 warn!(%error, "Failed to send free instance response");
783 }
784
785 if !has_free_capacity {
786 subscription.unsubscribe().await.map_err(|error| {
787 anyhow!("Failed to unsubscribe from free instance requests: {error}")
788 })?;
789 }
790 }
791 }
792}
793
794async fn plot_sector_responder<P>(
795 plotter_id: &ClusterPlotterId,
796 nats_client: &NatsClient,
797 plotter: &P,
798) -> anyhow::Result<()>
799where
800 P: Plotter + Sync,
801{
802 let plotter_id_string = plotter_id.to_string();
803
804 nats_client
805 .stream_request_responder(
806 Some(&plotter_id_string),
807 Some(plotter_id_string.clone()),
808 |request| async move {
809 let (progress_sender, mut progress_receiver) = mpsc::channel(10);
810
811 let fut =
812 process_plot_sector_request(nats_client, plotter, request, progress_sender);
813 let mut fut = Box::pin(fut.fuse());
814
815 Some(
816 stream::poll_fn(move |cx| {
818 if !fut.is_terminated() {
819 let _: Poll<()> = fut.poll_unpin(cx);
821 }
822
823 if let Poll::Ready(maybe_result) = progress_receiver.poll_next_unpin(cx) {
824 return Poll::Ready(maybe_result);
825 }
826
827 Poll::Pending
829 }),
830 )
831 },
832 )
833 .await
834}
835
836async fn process_plot_sector_request<P>(
837 nats_client: &NatsClient,
838 plotter: &P,
839 request: ClusterPlotterPlotSectorRequest,
840 mut response_proxy_sender: mpsc::Sender<ClusterSectorPlottingProgress>,
841) where
842 P: Plotter,
843{
844 let ClusterPlotterPlotSectorRequest {
845 public_key,
846 shard_commitments_root,
847 sector_index,
848 farmer_protocol_info,
849 pieces_in_sector,
850 } = request;
851
852 let inner_fut = async {
854 info!("Plot sector request");
855
856 let (progress_sender, mut progress_receiver) = mpsc::channel(1);
857
858 if !plotter
859 .try_plot_sector(
860 public_key,
861 shard_commitments_root,
862 sector_index,
863 farmer_protocol_info,
864 pieces_in_sector,
865 false,
866 progress_sender,
867 )
868 .await
869 {
870 debug!("Plotter is currently occupied and can't plot more sectors");
871
872 if let Err(error) = response_proxy_sender
873 .send(ClusterSectorPlottingProgress::Occupied)
874 .await
875 {
876 warn!(%error, "Failed to send plotting progress");
877 return;
878 }
879 return;
880 }
881
882 let progress_proxy_fut = {
883 let mut response_proxy_sender = response_proxy_sender.clone();
884 let approximate_max_message_size = nats_client.approximate_max_message_size();
885
886 async move {
887 while let Some(progress) = progress_receiver.next().await {
888 send_publish_progress(
889 &mut response_proxy_sender,
890 progress,
891 approximate_max_message_size,
892 )
893 .await;
894 }
895 }
896 };
897
898 let mut ping_interval = tokio::time::interval(PING_INTERVAL);
899 ping_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
900 let ping_fut = async {
901 loop {
902 ping_interval.tick().await;
903 if let Err(error) = response_proxy_sender
904 .send(ClusterSectorPlottingProgress::Ping)
905 .await
906 {
907 warn!(%error, "Failed to send plotting ping");
908 return;
909 }
910 }
911 };
912
913 select! {
914 _ = progress_proxy_fut.fuse() => {
915 }
917 _ = ping_fut.fuse() => {
918 unreachable!("Ping loop never ends");
919 }
920 }
921
922 info!("Finished plotting sector successfully");
923 };
924
925 inner_fut
926 .instrument(info_span!("", %public_key, %sector_index))
927 .await;
928}
929
930async fn send_publish_progress(
931 response_sender: &mut mpsc::Sender<ClusterSectorPlottingProgress>,
932 progress: SectorPlottingProgress,
933 approximate_max_message_size: usize,
934) {
935 let cluster_progress = match progress {
937 SectorPlottingProgress::Downloading => ClusterSectorPlottingProgress::Downloading,
938 SectorPlottingProgress::Downloaded(time) => ClusterSectorPlottingProgress::Downloaded(time),
939 SectorPlottingProgress::Encoding => ClusterSectorPlottingProgress::Encoding,
940 SectorPlottingProgress::Encoded(time) => ClusterSectorPlottingProgress::Encoded(time),
941 SectorPlottingProgress::Finished {
942 plotted_sector,
943 time,
944 mut sector,
945 } => {
946 if let Err(error) = response_sender
947 .send(ClusterSectorPlottingProgress::Finished {
948 plotted_sector,
949 time,
950 })
951 .await
952 {
953 warn!(%error, "Failed to send plotting progress");
954 return;
955 }
956
957 while let Some(maybe_sector_chunk) = sector.next().await {
958 match maybe_sector_chunk {
959 Ok(sector_chunk) => {
960 for small_sector_chunk in sector_chunk.chunks(approximate_max_message_size)
962 {
963 if let Err(error) = response_sender
964 .send(ClusterSectorPlottingProgress::SectorChunk(Ok(
965 sector_chunk.slice_ref(small_sector_chunk)
966 )))
967 .await
968 {
969 warn!(%error, "Failed to send plotting progress");
970 return;
971 }
972 }
973 }
974 Err(error) => {
975 if let Err(error) = response_sender
976 .send(ClusterSectorPlottingProgress::SectorChunk(Err(error)))
977 .await
978 {
979 warn!(%error, "Failed to send plotting progress");
980 return;
981 }
982 }
983 }
984 }
985
986 return;
987 }
988 SectorPlottingProgress::Error { error } => ClusterSectorPlottingProgress::Error { error },
989 };
990
991 if let Err(error) = response_sender.send(cluster_progress).await {
992 warn!(%error, "Failed to send plotting progress");
993 }
994}