Skip to main content

ab_farmer/cluster/
plotter.rs

1//! Farming cluster plotter
2//!
3//! Plotter is responsible for plotting sectors in response to farmer requests.
4//!
5//! This module exposes some data structures for NATS communication, custom plotter
6//! implementation designed to work with cluster plotter and a service function to drive the backend
7//! part of the plotter.
8
9use crate::cluster::nats_client::{GenericRequest, GenericStreamRequest, NatsClient};
10use crate::plotter::{Plotter, SectorPlottingProgress};
11use crate::utils::AsyncJoinOnDrop;
12use ab_core_primitives::ed25519::Ed25519PublicKey;
13use ab_core_primitives::sectors::SectorIndex;
14use ab_core_primitives::solutions::ShardCommitmentHash;
15use ab_farmer_components::FarmerProtocolInfo;
16use ab_farmer_components::plotting::PlottedSector;
17use ab_farmer_components::sector::sector_size;
18use anyhow::anyhow;
19use async_nats::RequestErrorKind;
20use async_trait::async_trait;
21use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder};
22use bytes::Bytes;
23use derive_more::Display;
24use event_listener_primitives::{Bag, HandlerId};
25use futures::channel::mpsc;
26use futures::future::FusedFuture;
27use futures::stream::FuturesUnordered;
28use futures::{FutureExt, Sink, SinkExt, StreamExt, select, stream};
29use parity_scale_codec::{Decode, Encode};
30use std::error::Error;
31use std::future::pending;
32use std::num::NonZeroUsize;
33use std::pin::pin;
34use std::sync::Arc;
35use std::task::Poll;
36use std::time::{Duration, Instant};
37use tokio::sync::{OwnedSemaphorePermit, Semaphore};
38use tokio::time::MissedTickBehavior;
39use tracing::{Instrument, debug, info, info_span, trace, warn};
40use ulid::Ulid;
41
42const FREE_CAPACITY_CHECK_INTERVAL: Duration = Duration::from_secs(1);
43/// Intervals between pings from plotter server to client
44const PING_INTERVAL: Duration = Duration::from_secs(10);
45/// Timeout after which plotter that doesn't send pings is assumed to be down
46const PING_TIMEOUT: Duration = Duration::from_mins(1);
47
48/// Type alias used for event handlers
49pub type HandlerFn3<A, B, C> = Arc<dyn Fn(&A, &B, &C) + Send + Sync + 'static>;
50type Handler3<A, B, C> = Bag<HandlerFn3<A, B, C>, A, B, C>;
51
52/// An ephemeral identifier for a plotter
53#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Display)]
54pub enum ClusterPlotterId {
55    /// Plotter ID
56    Ulid(Ulid),
57}
58
59#[expect(
60    clippy::new_without_default,
61    reason = "Default has different semantics"
62)]
63impl ClusterPlotterId {
64    /// Creates new ID
65    pub fn new() -> Self {
66        Self::Ulid(Ulid::new())
67    }
68}
69
70/// Request for free plotter instance
71#[derive(Debug, Clone, Encode, Decode)]
72struct ClusterPlotterFreeInstanceRequest;
73
74impl GenericRequest for ClusterPlotterFreeInstanceRequest {
75    const SUBJECT: &'static str = "ab.plotter.free-instance";
76    /// Might be `None` if instance had to respond, but turned out it was fully occupied already
77    type Response = Option<String>;
78}
79
80#[derive(Debug, Encode, Decode)]
81enum ClusterSectorPlottingProgress {
82    /// Plotter is already fully occupied with other work
83    Occupied,
84    /// Periodic ping indicating plotter is still busy
85    Ping,
86    /// Downloading sector pieces
87    Downloading,
88    /// Downloaded sector pieces
89    Downloaded(Duration),
90    /// Encoding sector pieces
91    Encoding,
92    /// Encoded sector pieces
93    Encoded(Duration),
94    /// Finished plotting, followed by a series of sector chunks
95    Finished {
96        /// Information about plotted sector
97        plotted_sector: PlottedSector,
98        /// How much time it took to plot a sector
99        time: Duration,
100    },
101    /// Sector chunk after finished plotting
102    SectorChunk(Result<Bytes, String>),
103    /// Plotting failed
104    Error {
105        /// Error message
106        error: String,
107    },
108}
109
110/// Request to plot sector from plotter
111#[derive(Debug, Clone, Encode, Decode)]
112struct ClusterPlotterPlotSectorRequest {
113    public_key: Ed25519PublicKey,
114    shard_commitments_root: ShardCommitmentHash,
115    sector_index: SectorIndex,
116    farmer_protocol_info: FarmerProtocolInfo,
117    pieces_in_sector: u16,
118}
119
120impl GenericStreamRequest for ClusterPlotterPlotSectorRequest {
121    const SUBJECT: &'static str = "ab.plotter.*.plot-sector";
122    type Response = ClusterSectorPlottingProgress;
123}
124
125#[derive(Default, Debug)]
126struct Handlers {
127    plotting_progress: Handler3<Ed25519PublicKey, SectorIndex, SectorPlottingProgress>,
128}
129
130/// Cluster plotter
131#[derive(Debug)]
132pub struct ClusterPlotter {
133    sector_encoding_semaphore: Arc<Semaphore>,
134    retry_backoff_policy: ExponentialBuilder,
135    nats_client: NatsClient,
136    handlers: Arc<Handlers>,
137    tasks_sender: mpsc::Sender<AsyncJoinOnDrop<()>>,
138    _background_tasks: AsyncJoinOnDrop<()>,
139}
140
141impl Drop for ClusterPlotter {
142    #[inline]
143    fn drop(&mut self) {
144        self.tasks_sender.close_channel();
145    }
146}
147
148#[async_trait]
149impl Plotter for ClusterPlotter {
150    async fn has_free_capacity(&self) -> Result<bool, String> {
151        Ok(self.sector_encoding_semaphore.available_permits() > 0
152            && self
153                .nats_client
154                .request(&ClusterPlotterFreeInstanceRequest, None)
155                .await
156                .map_err(|error| error.to_string())?
157                .is_some())
158    }
159
160    async fn plot_sector(
161        &self,
162        public_key: Ed25519PublicKey,
163        shard_commitments_root: ShardCommitmentHash,
164        sector_index: SectorIndex,
165        farmer_protocol_info: FarmerProtocolInfo,
166        pieces_in_sector: u16,
167        _replotting: bool,
168        mut progress_sender: mpsc::Sender<SectorPlottingProgress>,
169    ) {
170        let start = Instant::now();
171
172        // Done outside the future below as a backpressure, ensuring that it is not possible to
173        // schedule unbounded number of plotting tasks
174        let sector_encoding_permit = match Arc::clone(&self.sector_encoding_semaphore)
175            .acquire_owned()
176            .await
177        {
178            Ok(sector_encoding_permit) => sector_encoding_permit,
179            Err(error) => {
180                warn!(%error, "Failed to acquire sector encoding permit");
181
182                let progress_updater = ProgressUpdater {
183                    public_key,
184                    sector_index,
185                    handlers: Arc::clone(&self.handlers),
186                };
187
188                progress_updater
189                    .update_progress_and_events(
190                        &mut progress_sender,
191                        SectorPlottingProgress::Error {
192                            error: format!("Failed to acquire sector encoding permit: {error}"),
193                        },
194                    )
195                    .await;
196
197                return;
198            }
199        };
200
201        self.plot_sector_internal(
202            start,
203            sector_encoding_permit,
204            public_key,
205            shard_commitments_root,
206            sector_index,
207            farmer_protocol_info,
208            pieces_in_sector,
209            progress_sender,
210        )
211        .await;
212    }
213
214    async fn try_plot_sector(
215        &self,
216        public_key: Ed25519PublicKey,
217        shard_commitments_root: ShardCommitmentHash,
218        sector_index: SectorIndex,
219        farmer_protocol_info: FarmerProtocolInfo,
220        pieces_in_sector: u16,
221        _replotting: bool,
222        progress_sender: mpsc::Sender<SectorPlottingProgress>,
223    ) -> bool {
224        let start = Instant::now();
225
226        let Ok(sector_encoding_permit) =
227            Arc::clone(&self.sector_encoding_semaphore).try_acquire_owned()
228        else {
229            return false;
230        };
231
232        self.plot_sector_internal(
233            start,
234            sector_encoding_permit,
235            public_key,
236            shard_commitments_root,
237            sector_index,
238            farmer_protocol_info,
239            pieces_in_sector,
240            progress_sender,
241        )
242        .await;
243
244        true
245    }
246}
247
248impl ClusterPlotter {
249    /// Create a new instance
250    pub fn new(
251        nats_client: NatsClient,
252        sector_encoding_concurrency: NonZeroUsize,
253        retry_backoff_policy: ExponentialBuilder,
254    ) -> Self {
255        let sector_encoding_semaphore = Arc::new(Semaphore::new(sector_encoding_concurrency.get()));
256
257        let (tasks_sender, mut tasks_receiver) = mpsc::channel(1);
258
259        // Basically runs plotting tasks in the background and allows to abort on drop
260        let background_tasks = AsyncJoinOnDrop::new(
261            tokio::spawn(async move {
262                let background_tasks = FuturesUnordered::new();
263                let mut background_tasks = pin!(background_tasks);
264                // Just so that `FuturesUnordered` will never end
265                background_tasks.push(AsyncJoinOnDrop::new(tokio::spawn(pending::<()>()), true));
266
267                loop {
268                    select! {
269                        maybe_background_task = tasks_receiver.next().fuse() => {
270                            let Some(background_task) = maybe_background_task else {
271                                break;
272                            };
273
274                            background_tasks.push(background_task);
275                        },
276                        _ = background_tasks.select_next_some() => {
277                            // Nothing to do
278                        }
279                    }
280                }
281            }),
282            true,
283        );
284
285        Self {
286            sector_encoding_semaphore,
287            retry_backoff_policy,
288            nats_client,
289            handlers: Arc::default(),
290            tasks_sender,
291            _background_tasks: background_tasks,
292        }
293    }
294
295    /// Subscribe to plotting progress notifications
296    pub fn on_plotting_progress(
297        &self,
298        callback: HandlerFn3<Ed25519PublicKey, SectorIndex, SectorPlottingProgress>,
299    ) -> HandlerId {
300        self.handlers.plotting_progress.add(callback)
301    }
302
303    #[expect(clippy::too_many_arguments, reason = "Internal API")]
304    async fn plot_sector_internal<PS>(
305        &self,
306        start: Instant,
307        sector_encoding_permit: OwnedSemaphorePermit,
308        public_key: Ed25519PublicKey,
309        shard_commitments_root: ShardCommitmentHash,
310        sector_index: SectorIndex,
311        farmer_protocol_info: FarmerProtocolInfo,
312        pieces_in_sector: u16,
313        mut progress_sender: PS,
314    ) where
315        PS: Sink<SectorPlottingProgress> + Unpin + Send + 'static,
316        PS::Error: Error,
317    {
318        trace!("Starting plotting, getting plotting permit");
319
320        let progress_updater = ProgressUpdater {
321            public_key,
322            sector_index,
323            handlers: Arc::clone(&self.handlers),
324        };
325
326        let mut retry_backoff_policy = self.retry_backoff_policy.build();
327
328        // Try to get plotter instance here first as a backpressure measure
329        let free_plotter_instance_fut = get_free_plotter_instance(
330            &self.nats_client,
331            &progress_updater,
332            &mut progress_sender,
333            &mut retry_backoff_policy,
334        );
335        let mut maybe_free_instance = free_plotter_instance_fut.await;
336        if maybe_free_instance.is_none() {
337            return;
338        }
339
340        trace!("Got plotting permit #1");
341
342        let nats_client = self.nats_client.clone();
343
344        let plotting_fut = async move {
345            'outer: loop {
346                // Take free instance that was found earlier if available or try to find a new one
347                let free_instance = if let Some(free_instance) = maybe_free_instance.take() {
348                    free_instance
349                } else {
350                    let free_plotter_instance_fut = get_free_plotter_instance(
351                        &nats_client,
352                        &progress_updater,
353                        &mut progress_sender,
354                        &mut retry_backoff_policy,
355                    );
356                    let Some(free_instance) = free_plotter_instance_fut.await else {
357                        break;
358                    };
359                    trace!("Got plotting permit #2");
360                    free_instance
361                };
362
363                let response_stream_result = nats_client
364                    .stream_request(
365                        &ClusterPlotterPlotSectorRequest {
366                            public_key,
367                            shard_commitments_root,
368                            sector_index,
369                            farmer_protocol_info,
370                            pieces_in_sector,
371                        },
372                        Some(&free_instance),
373                    )
374                    .await;
375                trace!("Subscribed to plotting notifications");
376
377                let mut response_stream = match response_stream_result {
378                    Ok(response_stream) => response_stream,
379                    Err(error) => {
380                        progress_updater
381                            .update_progress_and_events(
382                                &mut progress_sender,
383                                SectorPlottingProgress::Error {
384                                    error: format!("Failed make stream request: {error}"),
385                                },
386                            )
387                            .await;
388
389                        break;
390                    }
391                };
392
393                // Allow to buffer up to the whole sector in memory to not block plotter on the
394                // other side
395                let (mut sector_sender, sector_receiver) = mpsc::channel(
396                    (sector_size(pieces_in_sector) / nats_client.approximate_max_message_size())
397                        .max(1),
398                );
399                let mut maybe_sector_receiver = Some(sector_receiver);
400                loop {
401                    match tokio::time::timeout(PING_TIMEOUT, response_stream.next()).await {
402                        Ok(Some(response)) => {
403                            match process_response_notification(
404                                &start,
405                                &free_instance,
406                                &progress_updater,
407                                &mut progress_sender,
408                                &mut retry_backoff_policy,
409                                response,
410                                &mut sector_sender,
411                                &mut maybe_sector_receiver,
412                            )
413                            .await
414                            {
415                                ResponseProcessingResult::Retry => {
416                                    debug!("Retrying");
417                                    continue 'outer;
418                                }
419                                ResponseProcessingResult::Abort => {
420                                    debug!("Aborting");
421                                    break 'outer;
422                                }
423                                ResponseProcessingResult::Continue => {
424                                    trace!("Continue");
425                                    // Nothing to do
426                                }
427                            }
428                        }
429                        Ok(None) => {
430                            trace!("Plotting done");
431                            break;
432                        }
433                        Err(_error) => {
434                            progress_updater
435                                .update_progress_and_events(
436                                    &mut progress_sender,
437                                    SectorPlottingProgress::Error {
438                                        error: "Timed out without ping from plotter".to_string(),
439                                    },
440                                )
441                                .await;
442                            break;
443                        }
444                    }
445                }
446
447                break;
448            }
449
450            drop(sector_encoding_permit);
451        };
452
453        let plotting_task =
454            AsyncJoinOnDrop::new(tokio::spawn(plotting_fut.in_current_span()), true);
455        if let Err(error) = self.tasks_sender.clone().send(plotting_task).await {
456            warn!(%error, "Failed to send plotting task");
457
458            let progress = SectorPlottingProgress::Error {
459                error: format!("Failed to send plotting task: {error}"),
460            };
461
462            self.handlers
463                .plotting_progress
464                .call_simple(&public_key, &sector_index, &progress);
465        }
466    }
467}
468
469// Try to get free plotter instance and return `None` if it is not possible
470async fn get_free_plotter_instance<PS>(
471    nats_client: &NatsClient,
472    progress_updater: &ProgressUpdater,
473    progress_sender: &mut PS,
474    retry_backoff_policy: &mut ExponentialBackoff,
475) -> Option<String>
476where
477    PS: Sink<SectorPlottingProgress> + Unpin + Send + 'static,
478    PS::Error: Error,
479{
480    loop {
481        match nats_client
482            .request(&ClusterPlotterFreeInstanceRequest, None)
483            .await
484        {
485            Ok(Some(free_instance)) => {
486                return Some(free_instance);
487            }
488            Ok(None) => {
489                let Some(delay) = retry_backoff_policy.next() else {
490                    progress_updater
491                        .update_progress_and_events(
492                            progress_sender,
493                            SectorPlottingProgress::Error {
494                                error: "Instance was occupied, exiting #1".to_string(),
495                            },
496                        )
497                        .await;
498                    return None;
499                };
500
501                debug!("Instance was occupied, retrying #1");
502
503                tokio::time::sleep(delay).await;
504            }
505            Err(error) => match error.kind() {
506                RequestErrorKind::TimedOut => {
507                    let Some(delay) = retry_backoff_policy.next() else {
508                        progress_updater
509                            .update_progress_and_events(
510                                progress_sender,
511                                SectorPlottingProgress::Error {
512                                    error: "Plotter request timed out, exiting".to_string(),
513                                },
514                            )
515                            .await;
516                        return None;
517                    };
518
519                    debug!("Plotter request timed out, retrying");
520
521                    tokio::time::sleep(delay).await;
522                }
523                RequestErrorKind::NoResponders => {
524                    let Some(delay) = retry_backoff_policy.next() else {
525                        progress_updater
526                            .update_progress_and_events(
527                                progress_sender,
528                                SectorPlottingProgress::Error {
529                                    error: "No plotters, exiting".to_string(),
530                                },
531                            )
532                            .await;
533                        return None;
534                    };
535
536                    debug!("No plotters, retrying");
537
538                    tokio::time::sleep(delay).await;
539                }
540                RequestErrorKind::InvalidSubject
541                | RequestErrorKind::MaxPayloadExceeded
542                | RequestErrorKind::Other => {
543                    progress_updater
544                        .update_progress_and_events(
545                            progress_sender,
546                            SectorPlottingProgress::Error {
547                                error: format!("Failed to get free plotter instance: {error}"),
548                            },
549                        )
550                        .await;
551                    return None;
552                }
553            },
554        }
555    }
556}
557
558enum ResponseProcessingResult {
559    Retry,
560    Abort,
561    Continue,
562}
563
564#[expect(clippy::too_many_arguments, reason = "Internal API")]
565async fn process_response_notification<PS>(
566    start: &Instant,
567    free_instance: &str,
568    progress_updater: &ProgressUpdater,
569    progress_sender: &mut PS,
570    retry_backoff_policy: &mut ExponentialBackoff,
571    response: ClusterSectorPlottingProgress,
572    sector_sender: &mut mpsc::Sender<Result<Bytes, String>>,
573    maybe_sector_receiver: &mut Option<mpsc::Receiver<Result<Bytes, String>>>,
574) -> ResponseProcessingResult
575where
576    PS: Sink<SectorPlottingProgress> + Unpin + Send + 'static,
577    PS::Error: Error,
578{
579    if matches!(response, ClusterSectorPlottingProgress::SectorChunk(_)) {
580        trace!("Processing plotting response notification (sector chunk)");
581    } else {
582        trace!(?response, "Processing plotting response notification");
583    }
584
585    match response {
586        ClusterSectorPlottingProgress::Occupied => {
587            debug!(%free_instance, "Instance was occupied, retrying #2");
588
589            return if let Some(delay) = retry_backoff_policy.next() {
590                debug!("Instance was occupied, retrying #2");
591
592                tokio::time::sleep(delay).await;
593                ResponseProcessingResult::Retry
594            } else {
595                debug!("Instance was occupied, exiting #2");
596                ResponseProcessingResult::Abort
597            };
598        }
599        ClusterSectorPlottingProgress::Ping => {
600            // Expected
601        }
602        ClusterSectorPlottingProgress::Downloading => {
603            if !progress_updater
604                .update_progress_and_events(progress_sender, SectorPlottingProgress::Downloading)
605                .await
606            {
607                return ResponseProcessingResult::Abort;
608            }
609        }
610        ClusterSectorPlottingProgress::Downloaded(time) => {
611            if !progress_updater
612                .update_progress_and_events(
613                    progress_sender,
614                    SectorPlottingProgress::Downloaded(time),
615                )
616                .await
617            {
618                return ResponseProcessingResult::Abort;
619            }
620        }
621        ClusterSectorPlottingProgress::Encoding => {
622            if !progress_updater
623                .update_progress_and_events(progress_sender, SectorPlottingProgress::Encoding)
624                .await
625            {
626                return ResponseProcessingResult::Abort;
627            }
628        }
629        ClusterSectorPlottingProgress::Encoded(time) => {
630            if !progress_updater
631                .update_progress_and_events(progress_sender, SectorPlottingProgress::Encoded(time))
632                .await
633            {
634                return ResponseProcessingResult::Abort;
635            }
636        }
637        ClusterSectorPlottingProgress::Finished {
638            plotted_sector,
639            time: _,
640        } => {
641            let Some(sector_receiver) = maybe_sector_receiver.take() else {
642                debug!("Unexpected duplicated sector plotting progress Finished");
643
644                progress_updater
645                    .update_progress_and_events(
646                        progress_sender,
647                        SectorPlottingProgress::Error {
648                            error: "Unexpected duplicated sector plotting progress Finished"
649                                .to_string(),
650                        },
651                    )
652                    .await;
653                return ResponseProcessingResult::Abort;
654            };
655
656            let progress = SectorPlottingProgress::Finished {
657                plotted_sector,
658                // Use local time instead of reported by remote plotter
659                time: start.elapsed(),
660                sector: Box::pin(sector_receiver),
661            };
662            if !progress_updater
663                .update_progress_and_events(progress_sender, progress)
664                .await
665            {
666                return ResponseProcessingResult::Abort;
667            }
668
669            return ResponseProcessingResult::Continue;
670        }
671        // This variant must be sent after Finished and it handled above
672        ClusterSectorPlottingProgress::SectorChunk(maybe_sector_chunk) => {
673            if let Err(error) = sector_sender.send(maybe_sector_chunk).await {
674                warn!(%error, "Failed to send sector chunk");
675                return ResponseProcessingResult::Abort;
676            }
677            return ResponseProcessingResult::Continue;
678        }
679        ClusterSectorPlottingProgress::Error { error } => {
680            if !progress_updater
681                .update_progress_and_events(
682                    progress_sender,
683                    SectorPlottingProgress::Error { error },
684                )
685                .await
686            {
687                return ResponseProcessingResult::Abort;
688            }
689        }
690    }
691
692    ResponseProcessingResult::Continue
693}
694
695struct ProgressUpdater {
696    public_key: Ed25519PublicKey,
697    sector_index: SectorIndex,
698    handlers: Arc<Handlers>,
699}
700
701impl ProgressUpdater {
702    /// Returns `true` on success and `false` if progress receiver channel is gone
703    async fn update_progress_and_events<PS>(
704        &self,
705        progress_sender: &mut PS,
706        progress: SectorPlottingProgress,
707    ) -> bool
708    where
709        PS: Sink<SectorPlottingProgress> + Unpin,
710        PS::Error: Error,
711    {
712        self.handlers.plotting_progress.call_simple(
713            &self.public_key,
714            &self.sector_index,
715            &progress,
716        );
717
718        if let Err(error) = progress_sender.send(progress).await {
719            warn!(%error, "Failed to send error progress update");
720
721            false
722        } else {
723            true
724        }
725    }
726}
727
728/// Create plotter service that will be processing incoming requests.
729///
730/// Implementation is using concurrency with multiple tokio tasks, but can be started multiple times
731/// per controller instance in order to parallelize more work across threads if needed.
732pub async fn plotter_service<P>(nats_client: &NatsClient, plotter: &P) -> anyhow::Result<()>
733where
734    P: Plotter + Sync,
735{
736    let plotter_id = ClusterPlotterId::new();
737
738    select! {
739        result = free_instance_responder(&plotter_id, nats_client, plotter).fuse() => {
740            result
741        }
742        result = plot_sector_responder(&plotter_id, nats_client, plotter).fuse() => {
743            result
744        }
745    }
746}
747
748async fn free_instance_responder<P>(
749    plotter_id: &ClusterPlotterId,
750    nats_client: &NatsClient,
751    plotter: &P,
752) -> anyhow::Result<()>
753where
754    P: Plotter + Sync,
755{
756    loop {
757        while !plotter.has_free_capacity().await.unwrap_or_default() {
758            tokio::time::sleep(FREE_CAPACITY_CHECK_INTERVAL).await;
759        }
760
761        let mut subscription = nats_client
762            .queue_subscribe(
763                ClusterPlotterFreeInstanceRequest::SUBJECT,
764                "ab.plotter".to_string(),
765            )
766            .await
767            .map_err(|error| anyhow!("Failed to subscribe to free instance requests: {error}"))?;
768        debug!(?subscription, "Free instance subscription");
769
770        while let Some(message) = subscription.next().await {
771            let Some(reply_subject) = message.reply else {
772                continue;
773            };
774
775            debug!(%reply_subject, "Free instance request");
776
777            let has_free_capacity = plotter.has_free_capacity().await.unwrap_or_default();
778            let response: <ClusterPlotterFreeInstanceRequest as GenericRequest>::Response =
779                has_free_capacity.then(|| plotter_id.to_string());
780
781            if let Err(error) = nats_client
782                .publish(reply_subject, response.encode().into())
783                .await
784            {
785                warn!(%error, "Failed to send free instance response");
786            }
787
788            if !has_free_capacity {
789                subscription.unsubscribe().await.map_err(|error| {
790                    anyhow!("Failed to unsubscribe from free instance requests: {error}")
791                })?;
792            }
793        }
794    }
795}
796
797async fn plot_sector_responder<P>(
798    plotter_id: &ClusterPlotterId,
799    nats_client: &NatsClient,
800    plotter: &P,
801) -> anyhow::Result<()>
802where
803    P: Plotter + Sync,
804{
805    let plotter_id_string = plotter_id.to_string();
806
807    nats_client
808        .stream_request_responder(
809            Some(&plotter_id_string),
810            Some(plotter_id_string.clone()),
811            |request| async move {
812                let (progress_sender, mut progress_receiver) = mpsc::channel(10);
813
814                let fut =
815                    process_plot_sector_request(nats_client, plotter, request, progress_sender);
816                let mut fut = Box::pin(fut.fuse());
817
818                Some(
819                    // Drive above future and stream back any pieces that were downloaded so far
820                    stream::poll_fn(move |cx| {
821                        if !fut.is_terminated() {
822                            // Result doesn't matter, we'll need to poll stream below anyway
823                            let _: Poll<()> = fut.poll_unpin(cx);
824                        }
825
826                        if let Poll::Ready(maybe_result) = progress_receiver.poll_next_unpin(cx) {
827                            return Poll::Ready(maybe_result);
828                        }
829
830                        // Exit will be done by the stream above
831                        Poll::Pending
832                    }),
833                )
834            },
835        )
836        .await
837}
838
839async fn process_plot_sector_request<P>(
840    nats_client: &NatsClient,
841    plotter: &P,
842    request: ClusterPlotterPlotSectorRequest,
843    mut response_proxy_sender: mpsc::Sender<ClusterSectorPlottingProgress>,
844) where
845    P: Plotter,
846{
847    let ClusterPlotterPlotSectorRequest {
848        public_key,
849        shard_commitments_root,
850        sector_index,
851        farmer_protocol_info,
852        pieces_in_sector,
853    } = request;
854
855    // Wrapper future just for instrumentation below
856    let inner_fut = async {
857        info!("Plot sector request");
858
859        let (progress_sender, mut progress_receiver) = mpsc::channel(1);
860
861        if !plotter
862            .try_plot_sector(
863                public_key,
864                shard_commitments_root,
865                sector_index,
866                farmer_protocol_info,
867                pieces_in_sector,
868                false,
869                progress_sender,
870            )
871            .await
872        {
873            debug!("Plotter is currently occupied and can't plot more sectors");
874
875            if let Err(error) = response_proxy_sender
876                .send(ClusterSectorPlottingProgress::Occupied)
877                .await
878            {
879                warn!(%error, "Failed to send plotting progress");
880                return;
881            }
882            return;
883        }
884
885        let progress_proxy_fut = {
886            let mut response_proxy_sender = response_proxy_sender.clone();
887            let approximate_max_message_size = nats_client.approximate_max_message_size();
888
889            async move {
890                while let Some(progress) = progress_receiver.next().await {
891                    send_publish_progress(
892                        &mut response_proxy_sender,
893                        progress,
894                        approximate_max_message_size,
895                    )
896                    .await;
897                }
898            }
899        };
900
901        let mut ping_interval = tokio::time::interval(PING_INTERVAL);
902        ping_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
903        let ping_fut = async {
904            loop {
905                ping_interval.tick().await;
906                if let Err(error) = response_proxy_sender
907                    .send(ClusterSectorPlottingProgress::Ping)
908                    .await
909                {
910                    warn!(%error, "Failed to send plotting ping");
911                    return;
912                }
913            }
914        };
915
916        select! {
917            () = progress_proxy_fut.fuse() => {
918                // Done
919            }
920            () = ping_fut.fuse() => {
921                unreachable!("Ping loop never ends");
922            }
923        }
924
925        info!("Finished plotting sector successfully");
926    };
927
928    inner_fut
929        .instrument(info_span!("", %public_key, %sector_index))
930        .await;
931}
932
933async fn send_publish_progress(
934    response_sender: &mut mpsc::Sender<ClusterSectorPlottingProgress>,
935    progress: SectorPlottingProgress,
936    approximate_max_message_size: usize,
937) {
938    // Finished response is large and needs special care
939    let cluster_progress = match progress {
940        SectorPlottingProgress::Downloading => ClusterSectorPlottingProgress::Downloading,
941        SectorPlottingProgress::Downloaded(time) => ClusterSectorPlottingProgress::Downloaded(time),
942        SectorPlottingProgress::Encoding => ClusterSectorPlottingProgress::Encoding,
943        SectorPlottingProgress::Encoded(time) => ClusterSectorPlottingProgress::Encoded(time),
944        SectorPlottingProgress::Finished {
945            plotted_sector,
946            time,
947            mut sector,
948        } => {
949            if let Err(error) = response_sender
950                .send(ClusterSectorPlottingProgress::Finished {
951                    plotted_sector,
952                    time,
953                })
954                .await
955            {
956                warn!(%error, "Failed to send plotting progress");
957                return;
958            }
959
960            while let Some(maybe_sector_chunk) = sector.next().await {
961                match maybe_sector_chunk {
962                    Ok(sector_chunk) => {
963                        // Slice large chunks into smaller ones before publishing
964                        for small_sector_chunk in sector_chunk.chunks(approximate_max_message_size)
965                        {
966                            if let Err(error) = response_sender
967                                .send(ClusterSectorPlottingProgress::SectorChunk(Ok(
968                                    sector_chunk.slice_ref(small_sector_chunk)
969                                )))
970                                .await
971                            {
972                                warn!(%error, "Failed to send plotting progress");
973                                return;
974                            }
975                        }
976                    }
977                    Err(error) => {
978                        if let Err(error) = response_sender
979                            .send(ClusterSectorPlottingProgress::SectorChunk(Err(error)))
980                            .await
981                        {
982                            warn!(%error, "Failed to send plotting progress");
983                            return;
984                        }
985                    }
986                }
987            }
988
989            return;
990        }
991        SectorPlottingProgress::Error { error } => ClusterSectorPlottingProgress::Error { error },
992    };
993
994    if let Err(error) = response_sender.send(cluster_progress).await {
995        warn!(%error, "Failed to send plotting progress");
996    }
997}