1use crate::cluster::nats_client::{GenericRequest, GenericStreamRequest, NatsClient};
10use crate::plotter::{Plotter, SectorPlottingProgress};
11use crate::utils::AsyncJoinOnDrop;
12use ab_core_primitives::ed25519::Ed25519PublicKey;
13use ab_core_primitives::sectors::SectorIndex;
14use ab_core_primitives::solutions::ShardCommitmentHash;
15use ab_farmer_components::FarmerProtocolInfo;
16use ab_farmer_components::plotting::PlottedSector;
17use ab_farmer_components::sector::sector_size;
18use anyhow::anyhow;
19use async_nats::RequestErrorKind;
20use async_trait::async_trait;
21use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder};
22use bytes::Bytes;
23use derive_more::Display;
24use event_listener_primitives::{Bag, HandlerId};
25use futures::channel::mpsc;
26use futures::future::FusedFuture;
27use futures::stream::FuturesUnordered;
28use futures::{FutureExt, Sink, SinkExt, StreamExt, select, stream};
29use parity_scale_codec::{Decode, Encode};
30use std::error::Error;
31use std::future::pending;
32use std::num::NonZeroUsize;
33use std::pin::pin;
34use std::sync::Arc;
35use std::task::Poll;
36use std::time::{Duration, Instant};
37use tokio::sync::{OwnedSemaphorePermit, Semaphore};
38use tokio::time::MissedTickBehavior;
39use tracing::{Instrument, debug, info, info_span, trace, warn};
40use ulid::Ulid;
41
42const FREE_CAPACITY_CHECK_INTERVAL: Duration = Duration::from_secs(1);
43const PING_INTERVAL: Duration = Duration::from_secs(10);
45const PING_TIMEOUT: Duration = Duration::from_mins(1);
47
48pub type HandlerFn3<A, B, C> = Arc<dyn Fn(&A, &B, &C) + Send + Sync + 'static>;
50type Handler3<A, B, C> = Bag<HandlerFn3<A, B, C>, A, B, C>;
51
52#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Display)]
54pub enum ClusterPlotterId {
55 Ulid(Ulid),
57}
58
59#[expect(clippy::new_without_default)]
60impl ClusterPlotterId {
61 pub fn new() -> Self {
63 Self::Ulid(Ulid::new())
64 }
65}
66
67#[derive(Debug, Clone, Encode, Decode)]
69struct ClusterPlotterFreeInstanceRequest;
70
71impl GenericRequest for ClusterPlotterFreeInstanceRequest {
72 const SUBJECT: &'static str = "ab.plotter.free-instance";
73 type Response = Option<String>;
75}
76
77#[derive(Debug, Encode, Decode)]
78enum ClusterSectorPlottingProgress {
79 Occupied,
81 Ping,
83 Downloading,
85 Downloaded(Duration),
87 Encoding,
89 Encoded(Duration),
91 Finished {
93 plotted_sector: PlottedSector,
95 time: Duration,
97 },
98 SectorChunk(Result<Bytes, String>),
100 Error {
102 error: String,
104 },
105}
106
107#[derive(Debug, Clone, Encode, Decode)]
109struct ClusterPlotterPlotSectorRequest {
110 public_key: Ed25519PublicKey,
111 shard_commitments_root: ShardCommitmentHash,
112 sector_index: SectorIndex,
113 farmer_protocol_info: FarmerProtocolInfo,
114 pieces_in_sector: u16,
115}
116
117impl GenericStreamRequest for ClusterPlotterPlotSectorRequest {
118 const SUBJECT: &'static str = "ab.plotter.*.plot-sector";
119 type Response = ClusterSectorPlottingProgress;
120}
121
122#[derive(Default, Debug)]
123struct Handlers {
124 plotting_progress: Handler3<Ed25519PublicKey, SectorIndex, SectorPlottingProgress>,
125}
126
127#[derive(Debug)]
129pub struct ClusterPlotter {
130 sector_encoding_semaphore: Arc<Semaphore>,
131 retry_backoff_policy: ExponentialBuilder,
132 nats_client: NatsClient,
133 handlers: Arc<Handlers>,
134 tasks_sender: mpsc::Sender<AsyncJoinOnDrop<()>>,
135 _background_tasks: AsyncJoinOnDrop<()>,
136}
137
138impl Drop for ClusterPlotter {
139 #[inline]
140 fn drop(&mut self) {
141 self.tasks_sender.close_channel();
142 }
143}
144
145#[async_trait]
146impl Plotter for ClusterPlotter {
147 async fn has_free_capacity(&self) -> Result<bool, String> {
148 Ok(self.sector_encoding_semaphore.available_permits() > 0
149 && self
150 .nats_client
151 .request(&ClusterPlotterFreeInstanceRequest, None)
152 .await
153 .map_err(|error| error.to_string())?
154 .is_some())
155 }
156
157 async fn plot_sector(
158 &self,
159 public_key: Ed25519PublicKey,
160 shard_commitments_root: ShardCommitmentHash,
161 sector_index: SectorIndex,
162 farmer_protocol_info: FarmerProtocolInfo,
163 pieces_in_sector: u16,
164 _replotting: bool,
165 mut progress_sender: mpsc::Sender<SectorPlottingProgress>,
166 ) {
167 let start = Instant::now();
168
169 let sector_encoding_permit = match Arc::clone(&self.sector_encoding_semaphore)
172 .acquire_owned()
173 .await
174 {
175 Ok(sector_encoding_permit) => sector_encoding_permit,
176 Err(error) => {
177 warn!(%error, "Failed to acquire sector encoding permit");
178
179 let progress_updater = ProgressUpdater {
180 public_key,
181 sector_index,
182 handlers: Arc::clone(&self.handlers),
183 };
184
185 progress_updater
186 .update_progress_and_events(
187 &mut progress_sender,
188 SectorPlottingProgress::Error {
189 error: format!("Failed to acquire sector encoding permit: {error}"),
190 },
191 )
192 .await;
193
194 return;
195 }
196 };
197
198 self.plot_sector_internal(
199 start,
200 sector_encoding_permit,
201 public_key,
202 shard_commitments_root,
203 sector_index,
204 farmer_protocol_info,
205 pieces_in_sector,
206 progress_sender,
207 )
208 .await
209 }
210
211 async fn try_plot_sector(
212 &self,
213 public_key: Ed25519PublicKey,
214 shard_commitments_root: ShardCommitmentHash,
215 sector_index: SectorIndex,
216 farmer_protocol_info: FarmerProtocolInfo,
217 pieces_in_sector: u16,
218 _replotting: bool,
219 progress_sender: mpsc::Sender<SectorPlottingProgress>,
220 ) -> bool {
221 let start = Instant::now();
222
223 let Ok(sector_encoding_permit) =
224 Arc::clone(&self.sector_encoding_semaphore).try_acquire_owned()
225 else {
226 return false;
227 };
228
229 self.plot_sector_internal(
230 start,
231 sector_encoding_permit,
232 public_key,
233 shard_commitments_root,
234 sector_index,
235 farmer_protocol_info,
236 pieces_in_sector,
237 progress_sender,
238 )
239 .await;
240
241 true
242 }
243}
244
245impl ClusterPlotter {
246 pub fn new(
248 nats_client: NatsClient,
249 sector_encoding_concurrency: NonZeroUsize,
250 retry_backoff_policy: ExponentialBuilder,
251 ) -> Self {
252 let sector_encoding_semaphore = Arc::new(Semaphore::new(sector_encoding_concurrency.get()));
253
254 let (tasks_sender, mut tasks_receiver) = mpsc::channel(1);
255
256 let background_tasks = AsyncJoinOnDrop::new(
258 tokio::spawn(async move {
259 let background_tasks = FuturesUnordered::new();
260 let mut background_tasks = pin!(background_tasks);
261 background_tasks.push(AsyncJoinOnDrop::new(tokio::spawn(pending::<()>()), true));
263
264 loop {
265 select! {
266 maybe_background_task = tasks_receiver.next().fuse() => {
267 let Some(background_task) = maybe_background_task else {
268 break;
269 };
270
271 background_tasks.push(background_task);
272 },
273 _ = background_tasks.select_next_some() => {
274 }
276 }
277 }
278 }),
279 true,
280 );
281
282 Self {
283 sector_encoding_semaphore,
284 retry_backoff_policy,
285 nats_client,
286 handlers: Arc::default(),
287 tasks_sender,
288 _background_tasks: background_tasks,
289 }
290 }
291
292 pub fn on_plotting_progress(
294 &self,
295 callback: HandlerFn3<Ed25519PublicKey, SectorIndex, SectorPlottingProgress>,
296 ) -> HandlerId {
297 self.handlers.plotting_progress.add(callback)
298 }
299
300 #[expect(clippy::too_many_arguments)]
301 async fn plot_sector_internal<PS>(
302 &self,
303 start: Instant,
304 sector_encoding_permit: OwnedSemaphorePermit,
305 public_key: Ed25519PublicKey,
306 shard_commitments_root: ShardCommitmentHash,
307 sector_index: SectorIndex,
308 farmer_protocol_info: FarmerProtocolInfo,
309 pieces_in_sector: u16,
310 mut progress_sender: PS,
311 ) where
312 PS: Sink<SectorPlottingProgress> + Unpin + Send + 'static,
313 PS::Error: Error,
314 {
315 trace!("Starting plotting, getting plotting permit");
316
317 let progress_updater = ProgressUpdater {
318 public_key,
319 sector_index,
320 handlers: Arc::clone(&self.handlers),
321 };
322
323 let mut retry_backoff_policy = self.retry_backoff_policy.build();
324
325 let free_plotter_instance_fut = get_free_plotter_instance(
327 &self.nats_client,
328 &progress_updater,
329 &mut progress_sender,
330 &mut retry_backoff_policy,
331 );
332 let mut maybe_free_instance = free_plotter_instance_fut.await;
333 if maybe_free_instance.is_none() {
334 return;
335 }
336
337 trace!("Got plotting permit #1");
338
339 let nats_client = self.nats_client.clone();
340
341 let plotting_fut = async move {
342 'outer: loop {
343 let free_instance = match maybe_free_instance.take() {
345 Some(free_instance) => free_instance,
346 None => {
347 let free_plotter_instance_fut = get_free_plotter_instance(
348 &nats_client,
349 &progress_updater,
350 &mut progress_sender,
351 &mut retry_backoff_policy,
352 );
353 let Some(free_instance) = free_plotter_instance_fut.await else {
354 break;
355 };
356 trace!("Got plotting permit #2");
357 free_instance
358 }
359 };
360
361 let response_stream_result = nats_client
362 .stream_request(
363 &ClusterPlotterPlotSectorRequest {
364 public_key,
365 shard_commitments_root,
366 sector_index,
367 farmer_protocol_info,
368 pieces_in_sector,
369 },
370 Some(&free_instance),
371 )
372 .await;
373 trace!("Subscribed to plotting notifications");
374
375 let mut response_stream = match response_stream_result {
376 Ok(response_stream) => response_stream,
377 Err(error) => {
378 progress_updater
379 .update_progress_and_events(
380 &mut progress_sender,
381 SectorPlottingProgress::Error {
382 error: format!("Failed make stream request: {error}"),
383 },
384 )
385 .await;
386
387 break;
388 }
389 };
390
391 let (mut sector_sender, sector_receiver) = mpsc::channel(
394 (sector_size(pieces_in_sector) / nats_client.approximate_max_message_size())
395 .max(1),
396 );
397 let mut maybe_sector_receiver = Some(sector_receiver);
398 loop {
399 match tokio::time::timeout(PING_TIMEOUT, response_stream.next()).await {
400 Ok(Some(response)) => {
401 match process_response_notification(
402 &start,
403 &free_instance,
404 &progress_updater,
405 &mut progress_sender,
406 &mut retry_backoff_policy,
407 response,
408 &mut sector_sender,
409 &mut maybe_sector_receiver,
410 )
411 .await
412 {
413 ResponseProcessingResult::Retry => {
414 debug!("Retrying");
415 continue 'outer;
416 }
417 ResponseProcessingResult::Abort => {
418 debug!("Aborting");
419 break 'outer;
420 }
421 ResponseProcessingResult::Continue => {
422 trace!("Continue");
423 }
425 }
426 }
427 Ok(None) => {
428 trace!("Plotting done");
429 break;
430 }
431 Err(_error) => {
432 progress_updater
433 .update_progress_and_events(
434 &mut progress_sender,
435 SectorPlottingProgress::Error {
436 error: "Timed out without ping from plotter".to_string(),
437 },
438 )
439 .await;
440 break;
441 }
442 }
443 }
444
445 break;
446 }
447
448 drop(sector_encoding_permit);
449 };
450
451 let plotting_task =
452 AsyncJoinOnDrop::new(tokio::spawn(plotting_fut.in_current_span()), true);
453 if let Err(error) = self.tasks_sender.clone().send(plotting_task).await {
454 warn!(%error, "Failed to send plotting task");
455
456 let progress = SectorPlottingProgress::Error {
457 error: format!("Failed to send plotting task: {error}"),
458 };
459
460 self.handlers
461 .plotting_progress
462 .call_simple(&public_key, §or_index, &progress);
463 }
464 }
465}
466
467async fn get_free_plotter_instance<PS>(
469 nats_client: &NatsClient,
470 progress_updater: &ProgressUpdater,
471 progress_sender: &mut PS,
472 retry_backoff_policy: &mut ExponentialBackoff,
473) -> Option<String>
474where
475 PS: Sink<SectorPlottingProgress> + Unpin + Send + 'static,
476 PS::Error: Error,
477{
478 loop {
479 match nats_client
480 .request(&ClusterPlotterFreeInstanceRequest, None)
481 .await
482 {
483 Ok(Some(free_instance)) => {
484 return Some(free_instance);
485 }
486 Ok(None) => {
487 if let Some(delay) = retry_backoff_policy.next() {
488 debug!("Instance was occupied, retrying #1");
489
490 tokio::time::sleep(delay).await;
491 continue;
492 } else {
493 progress_updater
494 .update_progress_and_events(
495 progress_sender,
496 SectorPlottingProgress::Error {
497 error: "Instance was occupied, exiting #1".to_string(),
498 },
499 )
500 .await;
501 return None;
502 }
503 }
504 Err(error) => match error.kind() {
505 RequestErrorKind::TimedOut => {
506 if let Some(delay) = retry_backoff_policy.next() {
507 debug!("Plotter request timed out, retrying");
508
509 tokio::time::sleep(delay).await;
510 continue;
511 } else {
512 progress_updater
513 .update_progress_and_events(
514 progress_sender,
515 SectorPlottingProgress::Error {
516 error: "Plotter request timed out, exiting".to_string(),
517 },
518 )
519 .await;
520 return None;
521 }
522 }
523 RequestErrorKind::NoResponders => {
524 if let Some(delay) = retry_backoff_policy.next() {
525 debug!("No plotters, retrying");
526
527 tokio::time::sleep(delay).await;
528 continue;
529 } else {
530 progress_updater
531 .update_progress_and_events(
532 progress_sender,
533 SectorPlottingProgress::Error {
534 error: "No plotters, exiting".to_string(),
535 },
536 )
537 .await;
538 return None;
539 }
540 }
541 RequestErrorKind::InvalidSubject | RequestErrorKind::Other => {
542 progress_updater
543 .update_progress_and_events(
544 progress_sender,
545 SectorPlottingProgress::Error {
546 error: format!("Failed to get free plotter instance: {error}"),
547 },
548 )
549 .await;
550 return None;
551 }
552 },
553 };
554 }
555}
556
557enum ResponseProcessingResult {
558 Retry,
559 Abort,
560 Continue,
561}
562
563#[expect(clippy::too_many_arguments)]
564async fn process_response_notification<PS>(
565 start: &Instant,
566 free_instance: &str,
567 progress_updater: &ProgressUpdater,
568 progress_sender: &mut PS,
569 retry_backoff_policy: &mut ExponentialBackoff,
570 response: ClusterSectorPlottingProgress,
571 sector_sender: &mut mpsc::Sender<Result<Bytes, String>>,
572 maybe_sector_receiver: &mut Option<mpsc::Receiver<Result<Bytes, String>>>,
573) -> ResponseProcessingResult
574where
575 PS: Sink<SectorPlottingProgress> + Unpin + Send + 'static,
576 PS::Error: Error,
577{
578 if !matches!(response, ClusterSectorPlottingProgress::SectorChunk(_)) {
579 trace!(?response, "Processing plotting response notification");
580 } else {
581 trace!("Processing plotting response notification (sector chunk)");
582 }
583
584 match response {
585 ClusterSectorPlottingProgress::Occupied => {
586 debug!(%free_instance, "Instance was occupied, retrying #2");
587
588 if let Some(delay) = retry_backoff_policy.next() {
589 debug!("Instance was occupied, retrying #2");
590
591 tokio::time::sleep(delay).await;
592 return ResponseProcessingResult::Retry;
593 } else {
594 debug!("Instance was occupied, exiting #2");
595 return ResponseProcessingResult::Abort;
596 }
597 }
598 ClusterSectorPlottingProgress::Ping => {
599 }
601 ClusterSectorPlottingProgress::Downloading => {
602 if !progress_updater
603 .update_progress_and_events(progress_sender, SectorPlottingProgress::Downloading)
604 .await
605 {
606 return ResponseProcessingResult::Abort;
607 }
608 }
609 ClusterSectorPlottingProgress::Downloaded(time) => {
610 if !progress_updater
611 .update_progress_and_events(
612 progress_sender,
613 SectorPlottingProgress::Downloaded(time),
614 )
615 .await
616 {
617 return ResponseProcessingResult::Abort;
618 }
619 }
620 ClusterSectorPlottingProgress::Encoding => {
621 if !progress_updater
622 .update_progress_and_events(progress_sender, SectorPlottingProgress::Encoding)
623 .await
624 {
625 return ResponseProcessingResult::Abort;
626 }
627 }
628 ClusterSectorPlottingProgress::Encoded(time) => {
629 if !progress_updater
630 .update_progress_and_events(progress_sender, SectorPlottingProgress::Encoded(time))
631 .await
632 {
633 return ResponseProcessingResult::Abort;
634 }
635 }
636 ClusterSectorPlottingProgress::Finished {
637 plotted_sector,
638 time: _,
639 } => {
640 let Some(sector_receiver) = maybe_sector_receiver.take() else {
641 debug!("Unexpected duplicated sector plotting progress Finished");
642
643 progress_updater
644 .update_progress_and_events(
645 progress_sender,
646 SectorPlottingProgress::Error {
647 error: "Unexpected duplicated sector plotting progress Finished"
648 .to_string(),
649 },
650 )
651 .await;
652 return ResponseProcessingResult::Abort;
653 };
654
655 let progress = SectorPlottingProgress::Finished {
656 plotted_sector,
657 time: start.elapsed(),
659 sector: Box::pin(sector_receiver),
660 };
661 if !progress_updater
662 .update_progress_and_events(progress_sender, progress)
663 .await
664 {
665 return ResponseProcessingResult::Abort;
666 }
667
668 return ResponseProcessingResult::Continue;
669 }
670 ClusterSectorPlottingProgress::SectorChunk(maybe_sector_chunk) => {
672 if let Err(error) = sector_sender.send(maybe_sector_chunk).await {
673 warn!(%error, "Failed to send sector chunk");
674 return ResponseProcessingResult::Abort;
675 }
676 return ResponseProcessingResult::Continue;
677 }
678 ClusterSectorPlottingProgress::Error { error } => {
679 if !progress_updater
680 .update_progress_and_events(
681 progress_sender,
682 SectorPlottingProgress::Error { error },
683 )
684 .await
685 {
686 return ResponseProcessingResult::Abort;
687 }
688 }
689 }
690
691 ResponseProcessingResult::Continue
692}
693
694struct ProgressUpdater {
695 public_key: Ed25519PublicKey,
696 sector_index: SectorIndex,
697 handlers: Arc<Handlers>,
698}
699
700impl ProgressUpdater {
701 async fn update_progress_and_events<PS>(
703 &self,
704 progress_sender: &mut PS,
705 progress: SectorPlottingProgress,
706 ) -> bool
707 where
708 PS: Sink<SectorPlottingProgress> + Unpin,
709 PS::Error: Error,
710 {
711 self.handlers.plotting_progress.call_simple(
712 &self.public_key,
713 &self.sector_index,
714 &progress,
715 );
716
717 if let Err(error) = progress_sender.send(progress).await {
718 warn!(%error, "Failed to send error progress update");
719
720 false
721 } else {
722 true
723 }
724 }
725}
726
727pub async fn plotter_service<P>(nats_client: &NatsClient, plotter: &P) -> anyhow::Result<()>
732where
733 P: Plotter + Sync,
734{
735 let plotter_id = ClusterPlotterId::new();
736
737 select! {
738 result = free_instance_responder(&plotter_id, nats_client, plotter).fuse() => {
739 result
740 }
741 result = plot_sector_responder(&plotter_id, nats_client, plotter).fuse() => {
742 result
743 }
744 }
745}
746
747async fn free_instance_responder<P>(
748 plotter_id: &ClusterPlotterId,
749 nats_client: &NatsClient,
750 plotter: &P,
751) -> anyhow::Result<()>
752where
753 P: Plotter + Sync,
754{
755 loop {
756 while !plotter.has_free_capacity().await.unwrap_or_default() {
757 tokio::time::sleep(FREE_CAPACITY_CHECK_INTERVAL).await;
758 }
759
760 let mut subscription = nats_client
761 .queue_subscribe(
762 ClusterPlotterFreeInstanceRequest::SUBJECT,
763 "ab.plotter".to_string(),
764 )
765 .await
766 .map_err(|error| anyhow!("Failed to subscribe to free instance requests: {error}"))?;
767 debug!(?subscription, "Free instance subscription");
768
769 while let Some(message) = subscription.next().await {
770 let Some(reply_subject) = message.reply else {
771 continue;
772 };
773
774 debug!(%reply_subject, "Free instance request");
775
776 let has_free_capacity = plotter.has_free_capacity().await.unwrap_or_default();
777 let response: <ClusterPlotterFreeInstanceRequest as GenericRequest>::Response =
778 has_free_capacity.then(|| plotter_id.to_string());
779
780 if let Err(error) = nats_client
781 .publish(reply_subject, response.encode().into())
782 .await
783 {
784 warn!(%error, "Failed to send free instance response");
785 }
786
787 if !has_free_capacity {
788 subscription.unsubscribe().await.map_err(|error| {
789 anyhow!("Failed to unsubscribe from free instance requests: {error}")
790 })?;
791 }
792 }
793 }
794}
795
796async fn plot_sector_responder<P>(
797 plotter_id: &ClusterPlotterId,
798 nats_client: &NatsClient,
799 plotter: &P,
800) -> anyhow::Result<()>
801where
802 P: Plotter + Sync,
803{
804 let plotter_id_string = plotter_id.to_string();
805
806 nats_client
807 .stream_request_responder(
808 Some(&plotter_id_string),
809 Some(plotter_id_string.clone()),
810 |request| async move {
811 let (progress_sender, mut progress_receiver) = mpsc::channel(10);
812
813 let fut =
814 process_plot_sector_request(nats_client, plotter, request, progress_sender);
815 let mut fut = Box::pin(fut.fuse());
816
817 Some(
818 stream::poll_fn(move |cx| {
820 if !fut.is_terminated() {
821 let _ = fut.poll_unpin(cx);
823 }
824
825 if let Poll::Ready(maybe_result) = progress_receiver.poll_next_unpin(cx) {
826 return Poll::Ready(maybe_result);
827 }
828
829 Poll::Pending
831 }),
832 )
833 },
834 )
835 .await
836}
837
838async fn process_plot_sector_request<P>(
839 nats_client: &NatsClient,
840 plotter: &P,
841 request: ClusterPlotterPlotSectorRequest,
842 mut response_proxy_sender: mpsc::Sender<ClusterSectorPlottingProgress>,
843) where
844 P: Plotter,
845{
846 let ClusterPlotterPlotSectorRequest {
847 public_key,
848 shard_commitments_root,
849 sector_index,
850 farmer_protocol_info,
851 pieces_in_sector,
852 } = request;
853
854 let inner_fut = async {
856 info!("Plot sector request");
857
858 let (progress_sender, mut progress_receiver) = mpsc::channel(1);
859
860 if !plotter
861 .try_plot_sector(
862 public_key,
863 shard_commitments_root,
864 sector_index,
865 farmer_protocol_info,
866 pieces_in_sector,
867 false,
868 progress_sender,
869 )
870 .await
871 {
872 debug!("Plotter is currently occupied and can't plot more sectors");
873
874 if let Err(error) = response_proxy_sender
875 .send(ClusterSectorPlottingProgress::Occupied)
876 .await
877 {
878 warn!(%error, "Failed to send plotting progress");
879 return;
880 }
881 return;
882 }
883
884 let progress_proxy_fut = {
885 let mut response_proxy_sender = response_proxy_sender.clone();
886 let approximate_max_message_size = nats_client.approximate_max_message_size();
887
888 async move {
889 while let Some(progress) = progress_receiver.next().await {
890 send_publish_progress(
891 &mut response_proxy_sender,
892 progress,
893 approximate_max_message_size,
894 )
895 .await;
896 }
897 }
898 };
899
900 let mut ping_interval = tokio::time::interval(PING_INTERVAL);
901 ping_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
902 let ping_fut = async {
903 loop {
904 ping_interval.tick().await;
905 if let Err(error) = response_proxy_sender
906 .send(ClusterSectorPlottingProgress::Ping)
907 .await
908 {
909 warn!(%error, "Failed to send plotting ping");
910 return;
911 }
912 }
913 };
914
915 select! {
916 _ = progress_proxy_fut.fuse() => {
917 }
919 _ = ping_fut.fuse() => {
920 unreachable!("Ping loop never ends");
921 }
922 }
923
924 info!("Finished plotting sector successfully");
925 };
926
927 inner_fut
928 .instrument(info_span!("", %public_key, %sector_index))
929 .await
930}
931
932async fn send_publish_progress(
933 response_sender: &mut mpsc::Sender<ClusterSectorPlottingProgress>,
934 progress: SectorPlottingProgress,
935 approximate_max_message_size: usize,
936) {
937 let cluster_progress = match progress {
939 SectorPlottingProgress::Downloading => ClusterSectorPlottingProgress::Downloading,
940 SectorPlottingProgress::Downloaded(time) => ClusterSectorPlottingProgress::Downloaded(time),
941 SectorPlottingProgress::Encoding => ClusterSectorPlottingProgress::Encoding,
942 SectorPlottingProgress::Encoded(time) => ClusterSectorPlottingProgress::Encoded(time),
943 SectorPlottingProgress::Finished {
944 plotted_sector,
945 time,
946 mut sector,
947 } => {
948 if let Err(error) = response_sender
949 .send(ClusterSectorPlottingProgress::Finished {
950 plotted_sector,
951 time,
952 })
953 .await
954 {
955 warn!(%error, "Failed to send plotting progress");
956 return;
957 }
958
959 while let Some(maybe_sector_chunk) = sector.next().await {
960 match maybe_sector_chunk {
961 Ok(sector_chunk) => {
962 for small_sector_chunk in sector_chunk.chunks(approximate_max_message_size)
964 {
965 if let Err(error) = response_sender
966 .send(ClusterSectorPlottingProgress::SectorChunk(Ok(
967 sector_chunk.slice_ref(small_sector_chunk)
968 )))
969 .await
970 {
971 warn!(%error, "Failed to send plotting progress");
972 return;
973 }
974 }
975 }
976 Err(error) => {
977 if let Err(error) = response_sender
978 .send(ClusterSectorPlottingProgress::SectorChunk(Err(error)))
979 .await
980 {
981 warn!(%error, "Failed to send plotting progress");
982 return;
983 }
984 }
985 }
986 }
987
988 return;
989 }
990 SectorPlottingProgress::Error { error } => ClusterSectorPlottingProgress::Error { error },
991 };
992
993 if let Err(error) = response_sender.send(cluster_progress).await {
994 warn!(%error, "Failed to send plotting progress");
995 }
996}