Skip to main content

ab_farmer/cluster/
plotter.rs

1//! Farming cluster plotter
2//!
3//! Plotter is responsible for plotting sectors in response to farmer requests.
4//!
5//! This module exposes some data structures for NATS communication, custom plotter
6//! implementation designed to work with cluster plotter and a service function to drive the backend
7//! part of the plotter.
8
9use crate::cluster::nats_client::{GenericRequest, GenericStreamRequest, NatsClient};
10use crate::plotter::{Plotter, SectorPlottingProgress};
11use crate::utils::AsyncJoinOnDrop;
12use ab_core_primitives::ed25519::Ed25519PublicKey;
13use ab_core_primitives::sectors::SectorIndex;
14use ab_core_primitives::solutions::ShardCommitmentHash;
15use ab_farmer_components::FarmerProtocolInfo;
16use ab_farmer_components::plotting::PlottedSector;
17use ab_farmer_components::sector::sector_size;
18use anyhow::anyhow;
19use async_nats::RequestErrorKind;
20use async_trait::async_trait;
21use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder};
22use bytes::Bytes;
23use derive_more::Display;
24use event_listener_primitives::{Bag, HandlerId};
25use futures::channel::mpsc;
26use futures::future::FusedFuture;
27use futures::stream::FuturesUnordered;
28use futures::{FutureExt, Sink, SinkExt, StreamExt, select, stream};
29use parity_scale_codec::{Decode, Encode};
30use std::error::Error;
31use std::future::pending;
32use std::num::NonZeroUsize;
33use std::pin::pin;
34use std::sync::Arc;
35use std::task::Poll;
36use std::time::{Duration, Instant};
37use tokio::sync::{OwnedSemaphorePermit, Semaphore};
38use tokio::time::MissedTickBehavior;
39use tracing::{Instrument, debug, info, info_span, trace, warn};
40use ulid::Ulid;
41
42const FREE_CAPACITY_CHECK_INTERVAL: Duration = Duration::from_secs(1);
43/// Intervals between pings from plotter server to client
44const PING_INTERVAL: Duration = Duration::from_secs(10);
45/// Timeout after which plotter that doesn't send pings is assumed to be down
46const PING_TIMEOUT: Duration = Duration::from_mins(1);
47
48/// Type alias used for event handlers
49pub type HandlerFn3<A, B, C> = Arc<dyn Fn(&A, &B, &C) + Send + Sync + 'static>;
50type Handler3<A, B, C> = Bag<HandlerFn3<A, B, C>, A, B, C>;
51
52/// An ephemeral identifier for a plotter
53#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Display)]
54pub enum ClusterPlotterId {
55    /// Plotter ID
56    Ulid(Ulid),
57}
58
59#[expect(clippy::new_without_default)]
60impl ClusterPlotterId {
61    /// Creates new ID
62    pub fn new() -> Self {
63        Self::Ulid(Ulid::new())
64    }
65}
66
67/// Request for free plotter instance
68#[derive(Debug, Clone, Encode, Decode)]
69struct ClusterPlotterFreeInstanceRequest;
70
71impl GenericRequest for ClusterPlotterFreeInstanceRequest {
72    const SUBJECT: &'static str = "ab.plotter.free-instance";
73    /// Might be `None` if instance had to respond, but turned out it was fully occupied already
74    type Response = Option<String>;
75}
76
77#[derive(Debug, Encode, Decode)]
78enum ClusterSectorPlottingProgress {
79    /// Plotter is already fully occupied with other work
80    Occupied,
81    /// Periodic ping indicating plotter is still busy
82    Ping,
83    /// Downloading sector pieces
84    Downloading,
85    /// Downloaded sector pieces
86    Downloaded(Duration),
87    /// Encoding sector pieces
88    Encoding,
89    /// Encoded sector pieces
90    Encoded(Duration),
91    /// Finished plotting, followed by a series of sector chunks
92    Finished {
93        /// Information about plotted sector
94        plotted_sector: PlottedSector,
95        /// How much time it took to plot a sector
96        time: Duration,
97    },
98    /// Sector chunk after finished plotting
99    SectorChunk(Result<Bytes, String>),
100    /// Plotting failed
101    Error {
102        /// Error message
103        error: String,
104    },
105}
106
107/// Request to plot sector from plotter
108#[derive(Debug, Clone, Encode, Decode)]
109struct ClusterPlotterPlotSectorRequest {
110    public_key: Ed25519PublicKey,
111    shard_commitments_root: ShardCommitmentHash,
112    sector_index: SectorIndex,
113    farmer_protocol_info: FarmerProtocolInfo,
114    pieces_in_sector: u16,
115}
116
117impl GenericStreamRequest for ClusterPlotterPlotSectorRequest {
118    const SUBJECT: &'static str = "ab.plotter.*.plot-sector";
119    type Response = ClusterSectorPlottingProgress;
120}
121
122#[derive(Default, Debug)]
123struct Handlers {
124    plotting_progress: Handler3<Ed25519PublicKey, SectorIndex, SectorPlottingProgress>,
125}
126
127/// Cluster plotter
128#[derive(Debug)]
129pub struct ClusterPlotter {
130    sector_encoding_semaphore: Arc<Semaphore>,
131    retry_backoff_policy: ExponentialBuilder,
132    nats_client: NatsClient,
133    handlers: Arc<Handlers>,
134    tasks_sender: mpsc::Sender<AsyncJoinOnDrop<()>>,
135    _background_tasks: AsyncJoinOnDrop<()>,
136}
137
138impl Drop for ClusterPlotter {
139    #[inline]
140    fn drop(&mut self) {
141        self.tasks_sender.close_channel();
142    }
143}
144
145#[async_trait]
146impl Plotter for ClusterPlotter {
147    async fn has_free_capacity(&self) -> Result<bool, String> {
148        Ok(self.sector_encoding_semaphore.available_permits() > 0
149            && self
150                .nats_client
151                .request(&ClusterPlotterFreeInstanceRequest, None)
152                .await
153                .map_err(|error| error.to_string())?
154                .is_some())
155    }
156
157    async fn plot_sector(
158        &self,
159        public_key: Ed25519PublicKey,
160        shard_commitments_root: ShardCommitmentHash,
161        sector_index: SectorIndex,
162        farmer_protocol_info: FarmerProtocolInfo,
163        pieces_in_sector: u16,
164        _replotting: bool,
165        mut progress_sender: mpsc::Sender<SectorPlottingProgress>,
166    ) {
167        let start = Instant::now();
168
169        // Done outside the future below as a backpressure, ensuring that it is not possible to
170        // schedule unbounded number of plotting tasks
171        let sector_encoding_permit = match Arc::clone(&self.sector_encoding_semaphore)
172            .acquire_owned()
173            .await
174        {
175            Ok(sector_encoding_permit) => sector_encoding_permit,
176            Err(error) => {
177                warn!(%error, "Failed to acquire sector encoding permit");
178
179                let progress_updater = ProgressUpdater {
180                    public_key,
181                    sector_index,
182                    handlers: Arc::clone(&self.handlers),
183                };
184
185                progress_updater
186                    .update_progress_and_events(
187                        &mut progress_sender,
188                        SectorPlottingProgress::Error {
189                            error: format!("Failed to acquire sector encoding permit: {error}"),
190                        },
191                    )
192                    .await;
193
194                return;
195            }
196        };
197
198        self.plot_sector_internal(
199            start,
200            sector_encoding_permit,
201            public_key,
202            shard_commitments_root,
203            sector_index,
204            farmer_protocol_info,
205            pieces_in_sector,
206            progress_sender,
207        )
208        .await;
209    }
210
211    async fn try_plot_sector(
212        &self,
213        public_key: Ed25519PublicKey,
214        shard_commitments_root: ShardCommitmentHash,
215        sector_index: SectorIndex,
216        farmer_protocol_info: FarmerProtocolInfo,
217        pieces_in_sector: u16,
218        _replotting: bool,
219        progress_sender: mpsc::Sender<SectorPlottingProgress>,
220    ) -> bool {
221        let start = Instant::now();
222
223        let Ok(sector_encoding_permit) =
224            Arc::clone(&self.sector_encoding_semaphore).try_acquire_owned()
225        else {
226            return false;
227        };
228
229        self.plot_sector_internal(
230            start,
231            sector_encoding_permit,
232            public_key,
233            shard_commitments_root,
234            sector_index,
235            farmer_protocol_info,
236            pieces_in_sector,
237            progress_sender,
238        )
239        .await;
240
241        true
242    }
243}
244
245impl ClusterPlotter {
246    /// Create a new instance
247    pub fn new(
248        nats_client: NatsClient,
249        sector_encoding_concurrency: NonZeroUsize,
250        retry_backoff_policy: ExponentialBuilder,
251    ) -> Self {
252        let sector_encoding_semaphore = Arc::new(Semaphore::new(sector_encoding_concurrency.get()));
253
254        let (tasks_sender, mut tasks_receiver) = mpsc::channel(1);
255
256        // Basically runs plotting tasks in the background and allows to abort on drop
257        let background_tasks = AsyncJoinOnDrop::new(
258            tokio::spawn(async move {
259                let background_tasks = FuturesUnordered::new();
260                let mut background_tasks = pin!(background_tasks);
261                // Just so that `FuturesUnordered` will never end
262                background_tasks.push(AsyncJoinOnDrop::new(tokio::spawn(pending::<()>()), true));
263
264                loop {
265                    select! {
266                        maybe_background_task = tasks_receiver.next().fuse() => {
267                            let Some(background_task) = maybe_background_task else {
268                                break;
269                            };
270
271                            background_tasks.push(background_task);
272                        },
273                        _ = background_tasks.select_next_some() => {
274                            // Nothing to do
275                        }
276                    }
277                }
278            }),
279            true,
280        );
281
282        Self {
283            sector_encoding_semaphore,
284            retry_backoff_policy,
285            nats_client,
286            handlers: Arc::default(),
287            tasks_sender,
288            _background_tasks: background_tasks,
289        }
290    }
291
292    /// Subscribe to plotting progress notifications
293    pub fn on_plotting_progress(
294        &self,
295        callback: HandlerFn3<Ed25519PublicKey, SectorIndex, SectorPlottingProgress>,
296    ) -> HandlerId {
297        self.handlers.plotting_progress.add(callback)
298    }
299
300    #[expect(clippy::too_many_arguments)]
301    async fn plot_sector_internal<PS>(
302        &self,
303        start: Instant,
304        sector_encoding_permit: OwnedSemaphorePermit,
305        public_key: Ed25519PublicKey,
306        shard_commitments_root: ShardCommitmentHash,
307        sector_index: SectorIndex,
308        farmer_protocol_info: FarmerProtocolInfo,
309        pieces_in_sector: u16,
310        mut progress_sender: PS,
311    ) where
312        PS: Sink<SectorPlottingProgress> + Unpin + Send + 'static,
313        PS::Error: Error,
314    {
315        trace!("Starting plotting, getting plotting permit");
316
317        let progress_updater = ProgressUpdater {
318            public_key,
319            sector_index,
320            handlers: Arc::clone(&self.handlers),
321        };
322
323        let mut retry_backoff_policy = self.retry_backoff_policy.build();
324
325        // Try to get plotter instance here first as a backpressure measure
326        let free_plotter_instance_fut = get_free_plotter_instance(
327            &self.nats_client,
328            &progress_updater,
329            &mut progress_sender,
330            &mut retry_backoff_policy,
331        );
332        let mut maybe_free_instance = free_plotter_instance_fut.await;
333        if maybe_free_instance.is_none() {
334            return;
335        }
336
337        trace!("Got plotting permit #1");
338
339        let nats_client = self.nats_client.clone();
340
341        let plotting_fut = async move {
342            'outer: loop {
343                // Take free instance that was found earlier if available or try to find a new one
344                let free_instance = if let Some(free_instance) = maybe_free_instance.take() {
345                    free_instance
346                } else {
347                    let free_plotter_instance_fut = get_free_plotter_instance(
348                        &nats_client,
349                        &progress_updater,
350                        &mut progress_sender,
351                        &mut retry_backoff_policy,
352                    );
353                    let Some(free_instance) = free_plotter_instance_fut.await else {
354                        break;
355                    };
356                    trace!("Got plotting permit #2");
357                    free_instance
358                };
359
360                let response_stream_result = nats_client
361                    .stream_request(
362                        &ClusterPlotterPlotSectorRequest {
363                            public_key,
364                            shard_commitments_root,
365                            sector_index,
366                            farmer_protocol_info,
367                            pieces_in_sector,
368                        },
369                        Some(&free_instance),
370                    )
371                    .await;
372                trace!("Subscribed to plotting notifications");
373
374                let mut response_stream = match response_stream_result {
375                    Ok(response_stream) => response_stream,
376                    Err(error) => {
377                        progress_updater
378                            .update_progress_and_events(
379                                &mut progress_sender,
380                                SectorPlottingProgress::Error {
381                                    error: format!("Failed make stream request: {error}"),
382                                },
383                            )
384                            .await;
385
386                        break;
387                    }
388                };
389
390                // Allow to buffer up to the whole sector in memory to not block plotter on the
391                // other side
392                let (mut sector_sender, sector_receiver) = mpsc::channel(
393                    (sector_size(pieces_in_sector) / nats_client.approximate_max_message_size())
394                        .max(1),
395                );
396                let mut maybe_sector_receiver = Some(sector_receiver);
397                loop {
398                    match tokio::time::timeout(PING_TIMEOUT, response_stream.next()).await {
399                        Ok(Some(response)) => {
400                            match process_response_notification(
401                                &start,
402                                &free_instance,
403                                &progress_updater,
404                                &mut progress_sender,
405                                &mut retry_backoff_policy,
406                                response,
407                                &mut sector_sender,
408                                &mut maybe_sector_receiver,
409                            )
410                            .await
411                            {
412                                ResponseProcessingResult::Retry => {
413                                    debug!("Retrying");
414                                    continue 'outer;
415                                }
416                                ResponseProcessingResult::Abort => {
417                                    debug!("Aborting");
418                                    break 'outer;
419                                }
420                                ResponseProcessingResult::Continue => {
421                                    trace!("Continue");
422                                    // Nothing to do
423                                }
424                            }
425                        }
426                        Ok(None) => {
427                            trace!("Plotting done");
428                            break;
429                        }
430                        Err(_error) => {
431                            progress_updater
432                                .update_progress_and_events(
433                                    &mut progress_sender,
434                                    SectorPlottingProgress::Error {
435                                        error: "Timed out without ping from plotter".to_string(),
436                                    },
437                                )
438                                .await;
439                            break;
440                        }
441                    }
442                }
443
444                break;
445            }
446
447            drop(sector_encoding_permit);
448        };
449
450        let plotting_task =
451            AsyncJoinOnDrop::new(tokio::spawn(plotting_fut.in_current_span()), true);
452        if let Err(error) = self.tasks_sender.clone().send(plotting_task).await {
453            warn!(%error, "Failed to send plotting task");
454
455            let progress = SectorPlottingProgress::Error {
456                error: format!("Failed to send plotting task: {error}"),
457            };
458
459            self.handlers
460                .plotting_progress
461                .call_simple(&public_key, &sector_index, &progress);
462        }
463    }
464}
465
466// Try to get free plotter instance and return `None` if it is not possible
467async fn get_free_plotter_instance<PS>(
468    nats_client: &NatsClient,
469    progress_updater: &ProgressUpdater,
470    progress_sender: &mut PS,
471    retry_backoff_policy: &mut ExponentialBackoff,
472) -> Option<String>
473where
474    PS: Sink<SectorPlottingProgress> + Unpin + Send + 'static,
475    PS::Error: Error,
476{
477    loop {
478        match nats_client
479            .request(&ClusterPlotterFreeInstanceRequest, None)
480            .await
481        {
482            Ok(Some(free_instance)) => {
483                return Some(free_instance);
484            }
485            Ok(None) => {
486                let Some(delay) = retry_backoff_policy.next() else {
487                    progress_updater
488                        .update_progress_and_events(
489                            progress_sender,
490                            SectorPlottingProgress::Error {
491                                error: "Instance was occupied, exiting #1".to_string(),
492                            },
493                        )
494                        .await;
495                    return None;
496                };
497
498                debug!("Instance was occupied, retrying #1");
499
500                tokio::time::sleep(delay).await;
501            }
502            Err(error) => match error.kind() {
503                RequestErrorKind::TimedOut => {
504                    let Some(delay) = retry_backoff_policy.next() else {
505                        progress_updater
506                            .update_progress_and_events(
507                                progress_sender,
508                                SectorPlottingProgress::Error {
509                                    error: "Plotter request timed out, exiting".to_string(),
510                                },
511                            )
512                            .await;
513                        return None;
514                    };
515
516                    debug!("Plotter request timed out, retrying");
517
518                    tokio::time::sleep(delay).await;
519                }
520                RequestErrorKind::NoResponders => {
521                    let Some(delay) = retry_backoff_policy.next() else {
522                        progress_updater
523                            .update_progress_and_events(
524                                progress_sender,
525                                SectorPlottingProgress::Error {
526                                    error: "No plotters, exiting".to_string(),
527                                },
528                            )
529                            .await;
530                        return None;
531                    };
532
533                    debug!("No plotters, retrying");
534
535                    tokio::time::sleep(delay).await;
536                }
537                RequestErrorKind::InvalidSubject
538                | RequestErrorKind::MaxPayloadExceeded
539                | RequestErrorKind::Other => {
540                    progress_updater
541                        .update_progress_and_events(
542                            progress_sender,
543                            SectorPlottingProgress::Error {
544                                error: format!("Failed to get free plotter instance: {error}"),
545                            },
546                        )
547                        .await;
548                    return None;
549                }
550            },
551        }
552    }
553}
554
555enum ResponseProcessingResult {
556    Retry,
557    Abort,
558    Continue,
559}
560
561#[expect(clippy::too_many_arguments)]
562async fn process_response_notification<PS>(
563    start: &Instant,
564    free_instance: &str,
565    progress_updater: &ProgressUpdater,
566    progress_sender: &mut PS,
567    retry_backoff_policy: &mut ExponentialBackoff,
568    response: ClusterSectorPlottingProgress,
569    sector_sender: &mut mpsc::Sender<Result<Bytes, String>>,
570    maybe_sector_receiver: &mut Option<mpsc::Receiver<Result<Bytes, String>>>,
571) -> ResponseProcessingResult
572where
573    PS: Sink<SectorPlottingProgress> + Unpin + Send + 'static,
574    PS::Error: Error,
575{
576    if matches!(response, ClusterSectorPlottingProgress::SectorChunk(_)) {
577        trace!("Processing plotting response notification (sector chunk)");
578    } else {
579        trace!(?response, "Processing plotting response notification");
580    }
581
582    match response {
583        ClusterSectorPlottingProgress::Occupied => {
584            debug!(%free_instance, "Instance was occupied, retrying #2");
585
586            return if let Some(delay) = retry_backoff_policy.next() {
587                debug!("Instance was occupied, retrying #2");
588
589                tokio::time::sleep(delay).await;
590                ResponseProcessingResult::Retry
591            } else {
592                debug!("Instance was occupied, exiting #2");
593                ResponseProcessingResult::Abort
594            };
595        }
596        ClusterSectorPlottingProgress::Ping => {
597            // Expected
598        }
599        ClusterSectorPlottingProgress::Downloading => {
600            if !progress_updater
601                .update_progress_and_events(progress_sender, SectorPlottingProgress::Downloading)
602                .await
603            {
604                return ResponseProcessingResult::Abort;
605            }
606        }
607        ClusterSectorPlottingProgress::Downloaded(time) => {
608            if !progress_updater
609                .update_progress_and_events(
610                    progress_sender,
611                    SectorPlottingProgress::Downloaded(time),
612                )
613                .await
614            {
615                return ResponseProcessingResult::Abort;
616            }
617        }
618        ClusterSectorPlottingProgress::Encoding => {
619            if !progress_updater
620                .update_progress_and_events(progress_sender, SectorPlottingProgress::Encoding)
621                .await
622            {
623                return ResponseProcessingResult::Abort;
624            }
625        }
626        ClusterSectorPlottingProgress::Encoded(time) => {
627            if !progress_updater
628                .update_progress_and_events(progress_sender, SectorPlottingProgress::Encoded(time))
629                .await
630            {
631                return ResponseProcessingResult::Abort;
632            }
633        }
634        ClusterSectorPlottingProgress::Finished {
635            plotted_sector,
636            time: _,
637        } => {
638            let Some(sector_receiver) = maybe_sector_receiver.take() else {
639                debug!("Unexpected duplicated sector plotting progress Finished");
640
641                progress_updater
642                    .update_progress_and_events(
643                        progress_sender,
644                        SectorPlottingProgress::Error {
645                            error: "Unexpected duplicated sector plotting progress Finished"
646                                .to_string(),
647                        },
648                    )
649                    .await;
650                return ResponseProcessingResult::Abort;
651            };
652
653            let progress = SectorPlottingProgress::Finished {
654                plotted_sector,
655                // Use local time instead of reported by remote plotter
656                time: start.elapsed(),
657                sector: Box::pin(sector_receiver),
658            };
659            if !progress_updater
660                .update_progress_and_events(progress_sender, progress)
661                .await
662            {
663                return ResponseProcessingResult::Abort;
664            }
665
666            return ResponseProcessingResult::Continue;
667        }
668        // This variant must be sent after Finished and it handled above
669        ClusterSectorPlottingProgress::SectorChunk(maybe_sector_chunk) => {
670            if let Err(error) = sector_sender.send(maybe_sector_chunk).await {
671                warn!(%error, "Failed to send sector chunk");
672                return ResponseProcessingResult::Abort;
673            }
674            return ResponseProcessingResult::Continue;
675        }
676        ClusterSectorPlottingProgress::Error { error } => {
677            if !progress_updater
678                .update_progress_and_events(
679                    progress_sender,
680                    SectorPlottingProgress::Error { error },
681                )
682                .await
683            {
684                return ResponseProcessingResult::Abort;
685            }
686        }
687    }
688
689    ResponseProcessingResult::Continue
690}
691
692struct ProgressUpdater {
693    public_key: Ed25519PublicKey,
694    sector_index: SectorIndex,
695    handlers: Arc<Handlers>,
696}
697
698impl ProgressUpdater {
699    /// Returns `true` on success and `false` if progress receiver channel is gone
700    async fn update_progress_and_events<PS>(
701        &self,
702        progress_sender: &mut PS,
703        progress: SectorPlottingProgress,
704    ) -> bool
705    where
706        PS: Sink<SectorPlottingProgress> + Unpin,
707        PS::Error: Error,
708    {
709        self.handlers.plotting_progress.call_simple(
710            &self.public_key,
711            &self.sector_index,
712            &progress,
713        );
714
715        if let Err(error) = progress_sender.send(progress).await {
716            warn!(%error, "Failed to send error progress update");
717
718            false
719        } else {
720            true
721        }
722    }
723}
724
725/// Create plotter service that will be processing incoming requests.
726///
727/// Implementation is using concurrency with multiple tokio tasks, but can be started multiple times
728/// per controller instance in order to parallelize more work across threads if needed.
729pub async fn plotter_service<P>(nats_client: &NatsClient, plotter: &P) -> anyhow::Result<()>
730where
731    P: Plotter + Sync,
732{
733    let plotter_id = ClusterPlotterId::new();
734
735    select! {
736        result = free_instance_responder(&plotter_id, nats_client, plotter).fuse() => {
737            result
738        }
739        result = plot_sector_responder(&plotter_id, nats_client, plotter).fuse() => {
740            result
741        }
742    }
743}
744
745async fn free_instance_responder<P>(
746    plotter_id: &ClusterPlotterId,
747    nats_client: &NatsClient,
748    plotter: &P,
749) -> anyhow::Result<()>
750where
751    P: Plotter + Sync,
752{
753    loop {
754        while !plotter.has_free_capacity().await.unwrap_or_default() {
755            tokio::time::sleep(FREE_CAPACITY_CHECK_INTERVAL).await;
756        }
757
758        let mut subscription = nats_client
759            .queue_subscribe(
760                ClusterPlotterFreeInstanceRequest::SUBJECT,
761                "ab.plotter".to_string(),
762            )
763            .await
764            .map_err(|error| anyhow!("Failed to subscribe to free instance requests: {error}"))?;
765        debug!(?subscription, "Free instance subscription");
766
767        while let Some(message) = subscription.next().await {
768            let Some(reply_subject) = message.reply else {
769                continue;
770            };
771
772            debug!(%reply_subject, "Free instance request");
773
774            let has_free_capacity = plotter.has_free_capacity().await.unwrap_or_default();
775            let response: <ClusterPlotterFreeInstanceRequest as GenericRequest>::Response =
776                has_free_capacity.then(|| plotter_id.to_string());
777
778            if let Err(error) = nats_client
779                .publish(reply_subject, response.encode().into())
780                .await
781            {
782                warn!(%error, "Failed to send free instance response");
783            }
784
785            if !has_free_capacity {
786                subscription.unsubscribe().await.map_err(|error| {
787                    anyhow!("Failed to unsubscribe from free instance requests: {error}")
788                })?;
789            }
790        }
791    }
792}
793
794async fn plot_sector_responder<P>(
795    plotter_id: &ClusterPlotterId,
796    nats_client: &NatsClient,
797    plotter: &P,
798) -> anyhow::Result<()>
799where
800    P: Plotter + Sync,
801{
802    let plotter_id_string = plotter_id.to_string();
803
804    nats_client
805        .stream_request_responder(
806            Some(&plotter_id_string),
807            Some(plotter_id_string.clone()),
808            |request| async move {
809                let (progress_sender, mut progress_receiver) = mpsc::channel(10);
810
811                let fut =
812                    process_plot_sector_request(nats_client, plotter, request, progress_sender);
813                let mut fut = Box::pin(fut.fuse());
814
815                Some(
816                    // Drive above future and stream back any pieces that were downloaded so far
817                    stream::poll_fn(move |cx| {
818                        if !fut.is_terminated() {
819                            // Result doesn't matter, we'll need to poll stream below anyway
820                            let _: Poll<()> = fut.poll_unpin(cx);
821                        }
822
823                        if let Poll::Ready(maybe_result) = progress_receiver.poll_next_unpin(cx) {
824                            return Poll::Ready(maybe_result);
825                        }
826
827                        // Exit will be done by the stream above
828                        Poll::Pending
829                    }),
830                )
831            },
832        )
833        .await
834}
835
836async fn process_plot_sector_request<P>(
837    nats_client: &NatsClient,
838    plotter: &P,
839    request: ClusterPlotterPlotSectorRequest,
840    mut response_proxy_sender: mpsc::Sender<ClusterSectorPlottingProgress>,
841) where
842    P: Plotter,
843{
844    let ClusterPlotterPlotSectorRequest {
845        public_key,
846        shard_commitments_root,
847        sector_index,
848        farmer_protocol_info,
849        pieces_in_sector,
850    } = request;
851
852    // Wrapper future just for instrumentation below
853    let inner_fut = async {
854        info!("Plot sector request");
855
856        let (progress_sender, mut progress_receiver) = mpsc::channel(1);
857
858        if !plotter
859            .try_plot_sector(
860                public_key,
861                shard_commitments_root,
862                sector_index,
863                farmer_protocol_info,
864                pieces_in_sector,
865                false,
866                progress_sender,
867            )
868            .await
869        {
870            debug!("Plotter is currently occupied and can't plot more sectors");
871
872            if let Err(error) = response_proxy_sender
873                .send(ClusterSectorPlottingProgress::Occupied)
874                .await
875            {
876                warn!(%error, "Failed to send plotting progress");
877                return;
878            }
879            return;
880        }
881
882        let progress_proxy_fut = {
883            let mut response_proxy_sender = response_proxy_sender.clone();
884            let approximate_max_message_size = nats_client.approximate_max_message_size();
885
886            async move {
887                while let Some(progress) = progress_receiver.next().await {
888                    send_publish_progress(
889                        &mut response_proxy_sender,
890                        progress,
891                        approximate_max_message_size,
892                    )
893                    .await;
894                }
895            }
896        };
897
898        let mut ping_interval = tokio::time::interval(PING_INTERVAL);
899        ping_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
900        let ping_fut = async {
901            loop {
902                ping_interval.tick().await;
903                if let Err(error) = response_proxy_sender
904                    .send(ClusterSectorPlottingProgress::Ping)
905                    .await
906                {
907                    warn!(%error, "Failed to send plotting ping");
908                    return;
909                }
910            }
911        };
912
913        select! {
914            _ = progress_proxy_fut.fuse() => {
915                // Done
916            }
917            _ = ping_fut.fuse() => {
918                unreachable!("Ping loop never ends");
919            }
920        }
921
922        info!("Finished plotting sector successfully");
923    };
924
925    inner_fut
926        .instrument(info_span!("", %public_key, %sector_index))
927        .await;
928}
929
930async fn send_publish_progress(
931    response_sender: &mut mpsc::Sender<ClusterSectorPlottingProgress>,
932    progress: SectorPlottingProgress,
933    approximate_max_message_size: usize,
934) {
935    // Finished response is large and needs special care
936    let cluster_progress = match progress {
937        SectorPlottingProgress::Downloading => ClusterSectorPlottingProgress::Downloading,
938        SectorPlottingProgress::Downloaded(time) => ClusterSectorPlottingProgress::Downloaded(time),
939        SectorPlottingProgress::Encoding => ClusterSectorPlottingProgress::Encoding,
940        SectorPlottingProgress::Encoded(time) => ClusterSectorPlottingProgress::Encoded(time),
941        SectorPlottingProgress::Finished {
942            plotted_sector,
943            time,
944            mut sector,
945        } => {
946            if let Err(error) = response_sender
947                .send(ClusterSectorPlottingProgress::Finished {
948                    plotted_sector,
949                    time,
950                })
951                .await
952            {
953                warn!(%error, "Failed to send plotting progress");
954                return;
955            }
956
957            while let Some(maybe_sector_chunk) = sector.next().await {
958                match maybe_sector_chunk {
959                    Ok(sector_chunk) => {
960                        // Slice large chunks into smaller ones before publishing
961                        for small_sector_chunk in sector_chunk.chunks(approximate_max_message_size)
962                        {
963                            if let Err(error) = response_sender
964                                .send(ClusterSectorPlottingProgress::SectorChunk(Ok(
965                                    sector_chunk.slice_ref(small_sector_chunk)
966                                )))
967                                .await
968                            {
969                                warn!(%error, "Failed to send plotting progress");
970                                return;
971                            }
972                        }
973                    }
974                    Err(error) => {
975                        if let Err(error) = response_sender
976                            .send(ClusterSectorPlottingProgress::SectorChunk(Err(error)))
977                            .await
978                        {
979                            warn!(%error, "Failed to send plotting progress");
980                            return;
981                        }
982                    }
983                }
984            }
985
986            return;
987        }
988        SectorPlottingProgress::Error { error } => ClusterSectorPlottingProgress::Error { error },
989    };
990
991    if let Err(error) = response_sender.send(cluster_progress).await {
992        warn!(%error, "Failed to send plotting progress");
993    }
994}