Skip to main content

ab_farmer/cluster/
plotter.rs

1//! Farming cluster plotter
2//!
3//! Plotter is responsible for plotting sectors in response to farmer requests.
4//!
5//! This module exposes some data structures for NATS communication, custom plotter
6//! implementation designed to work with cluster plotter and a service function to drive the backend
7//! part of the plotter.
8
9use crate::cluster::nats_client::{GenericRequest, GenericStreamRequest, NatsClient};
10use crate::plotter::{Plotter, SectorPlottingProgress};
11use crate::utils::AsyncJoinOnDrop;
12use ab_core_primitives::ed25519::Ed25519PublicKey;
13use ab_core_primitives::sectors::SectorIndex;
14use ab_core_primitives::solutions::ShardCommitmentHash;
15use ab_farmer_components::FarmerProtocolInfo;
16use ab_farmer_components::plotting::PlottedSector;
17use ab_farmer_components::sector::sector_size;
18use anyhow::anyhow;
19use async_nats::RequestErrorKind;
20use async_trait::async_trait;
21use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder};
22use bytes::Bytes;
23use derive_more::Display;
24use event_listener_primitives::{Bag, HandlerId};
25use futures::channel::mpsc;
26use futures::future::FusedFuture;
27use futures::stream::FuturesUnordered;
28use futures::{FutureExt, Sink, SinkExt, StreamExt, select, stream};
29use parity_scale_codec::{Decode, Encode};
30use std::error::Error;
31use std::future::pending;
32use std::num::NonZeroUsize;
33use std::pin::pin;
34use std::sync::Arc;
35use std::task::Poll;
36use std::time::{Duration, Instant};
37use tokio::sync::{OwnedSemaphorePermit, Semaphore};
38use tokio::time::MissedTickBehavior;
39use tracing::{Instrument, debug, info, info_span, trace, warn};
40use ulid::Ulid;
41
42const FREE_CAPACITY_CHECK_INTERVAL: Duration = Duration::from_secs(1);
43/// Intervals between pings from plotter server to client
44const PING_INTERVAL: Duration = Duration::from_secs(10);
45/// Timeout after which plotter that doesn't send pings is assumed to be down
46const PING_TIMEOUT: Duration = Duration::from_mins(1);
47
48/// Type alias used for event handlers
49pub type HandlerFn3<A, B, C> = Arc<dyn Fn(&A, &B, &C) + Send + Sync + 'static>;
50type Handler3<A, B, C> = Bag<HandlerFn3<A, B, C>, A, B, C>;
51
52/// An ephemeral identifier for a plotter
53#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Display)]
54pub enum ClusterPlotterId {
55    /// Plotter ID
56    Ulid(Ulid),
57}
58
59#[expect(clippy::new_without_default)]
60impl ClusterPlotterId {
61    /// Creates new ID
62    pub fn new() -> Self {
63        Self::Ulid(Ulid::new())
64    }
65}
66
67/// Request for free plotter instance
68#[derive(Debug, Clone, Encode, Decode)]
69struct ClusterPlotterFreeInstanceRequest;
70
71impl GenericRequest for ClusterPlotterFreeInstanceRequest {
72    const SUBJECT: &'static str = "ab.plotter.free-instance";
73    /// Might be `None` if instance had to respond, but turned out it was fully occupied already
74    type Response = Option<String>;
75}
76
77#[derive(Debug, Encode, Decode)]
78enum ClusterSectorPlottingProgress {
79    /// Plotter is already fully occupied with other work
80    Occupied,
81    /// Periodic ping indicating plotter is still busy
82    Ping,
83    /// Downloading sector pieces
84    Downloading,
85    /// Downloaded sector pieces
86    Downloaded(Duration),
87    /// Encoding sector pieces
88    Encoding,
89    /// Encoded sector pieces
90    Encoded(Duration),
91    /// Finished plotting, followed by a series of sector chunks
92    Finished {
93        /// Information about plotted sector
94        plotted_sector: PlottedSector,
95        /// How much time it took to plot a sector
96        time: Duration,
97    },
98    /// Sector chunk after finished plotting
99    SectorChunk(Result<Bytes, String>),
100    /// Plotting failed
101    Error {
102        /// Error message
103        error: String,
104    },
105}
106
107/// Request to plot sector from plotter
108#[derive(Debug, Clone, Encode, Decode)]
109struct ClusterPlotterPlotSectorRequest {
110    public_key: Ed25519PublicKey,
111    shard_commitments_root: ShardCommitmentHash,
112    sector_index: SectorIndex,
113    farmer_protocol_info: FarmerProtocolInfo,
114    pieces_in_sector: u16,
115}
116
117impl GenericStreamRequest for ClusterPlotterPlotSectorRequest {
118    const SUBJECT: &'static str = "ab.plotter.*.plot-sector";
119    type Response = ClusterSectorPlottingProgress;
120}
121
122#[derive(Default, Debug)]
123struct Handlers {
124    plotting_progress: Handler3<Ed25519PublicKey, SectorIndex, SectorPlottingProgress>,
125}
126
127/// Cluster plotter
128#[derive(Debug)]
129pub struct ClusterPlotter {
130    sector_encoding_semaphore: Arc<Semaphore>,
131    retry_backoff_policy: ExponentialBuilder,
132    nats_client: NatsClient,
133    handlers: Arc<Handlers>,
134    tasks_sender: mpsc::Sender<AsyncJoinOnDrop<()>>,
135    _background_tasks: AsyncJoinOnDrop<()>,
136}
137
138impl Drop for ClusterPlotter {
139    #[inline]
140    fn drop(&mut self) {
141        self.tasks_sender.close_channel();
142    }
143}
144
145#[async_trait]
146impl Plotter for ClusterPlotter {
147    async fn has_free_capacity(&self) -> Result<bool, String> {
148        Ok(self.sector_encoding_semaphore.available_permits() > 0
149            && self
150                .nats_client
151                .request(&ClusterPlotterFreeInstanceRequest, None)
152                .await
153                .map_err(|error| error.to_string())?
154                .is_some())
155    }
156
157    async fn plot_sector(
158        &self,
159        public_key: Ed25519PublicKey,
160        shard_commitments_root: ShardCommitmentHash,
161        sector_index: SectorIndex,
162        farmer_protocol_info: FarmerProtocolInfo,
163        pieces_in_sector: u16,
164        _replotting: bool,
165        mut progress_sender: mpsc::Sender<SectorPlottingProgress>,
166    ) {
167        let start = Instant::now();
168
169        // Done outside the future below as a backpressure, ensuring that it is not possible to
170        // schedule unbounded number of plotting tasks
171        let sector_encoding_permit = match Arc::clone(&self.sector_encoding_semaphore)
172            .acquire_owned()
173            .await
174        {
175            Ok(sector_encoding_permit) => sector_encoding_permit,
176            Err(error) => {
177                warn!(%error, "Failed to acquire sector encoding permit");
178
179                let progress_updater = ProgressUpdater {
180                    public_key,
181                    sector_index,
182                    handlers: Arc::clone(&self.handlers),
183                };
184
185                progress_updater
186                    .update_progress_and_events(
187                        &mut progress_sender,
188                        SectorPlottingProgress::Error {
189                            error: format!("Failed to acquire sector encoding permit: {error}"),
190                        },
191                    )
192                    .await;
193
194                return;
195            }
196        };
197
198        self.plot_sector_internal(
199            start,
200            sector_encoding_permit,
201            public_key,
202            shard_commitments_root,
203            sector_index,
204            farmer_protocol_info,
205            pieces_in_sector,
206            progress_sender,
207        )
208        .await
209    }
210
211    async fn try_plot_sector(
212        &self,
213        public_key: Ed25519PublicKey,
214        shard_commitments_root: ShardCommitmentHash,
215        sector_index: SectorIndex,
216        farmer_protocol_info: FarmerProtocolInfo,
217        pieces_in_sector: u16,
218        _replotting: bool,
219        progress_sender: mpsc::Sender<SectorPlottingProgress>,
220    ) -> bool {
221        let start = Instant::now();
222
223        let Ok(sector_encoding_permit) =
224            Arc::clone(&self.sector_encoding_semaphore).try_acquire_owned()
225        else {
226            return false;
227        };
228
229        self.plot_sector_internal(
230            start,
231            sector_encoding_permit,
232            public_key,
233            shard_commitments_root,
234            sector_index,
235            farmer_protocol_info,
236            pieces_in_sector,
237            progress_sender,
238        )
239        .await;
240
241        true
242    }
243}
244
245impl ClusterPlotter {
246    /// Create a new instance
247    pub fn new(
248        nats_client: NatsClient,
249        sector_encoding_concurrency: NonZeroUsize,
250        retry_backoff_policy: ExponentialBuilder,
251    ) -> Self {
252        let sector_encoding_semaphore = Arc::new(Semaphore::new(sector_encoding_concurrency.get()));
253
254        let (tasks_sender, mut tasks_receiver) = mpsc::channel(1);
255
256        // Basically runs plotting tasks in the background and allows to abort on drop
257        let background_tasks = AsyncJoinOnDrop::new(
258            tokio::spawn(async move {
259                let background_tasks = FuturesUnordered::new();
260                let mut background_tasks = pin!(background_tasks);
261                // Just so that `FuturesUnordered` will never end
262                background_tasks.push(AsyncJoinOnDrop::new(tokio::spawn(pending::<()>()), true));
263
264                loop {
265                    select! {
266                        maybe_background_task = tasks_receiver.next().fuse() => {
267                            let Some(background_task) = maybe_background_task else {
268                                break;
269                            };
270
271                            background_tasks.push(background_task);
272                        },
273                        _ = background_tasks.select_next_some() => {
274                            // Nothing to do
275                        }
276                    }
277                }
278            }),
279            true,
280        );
281
282        Self {
283            sector_encoding_semaphore,
284            retry_backoff_policy,
285            nats_client,
286            handlers: Arc::default(),
287            tasks_sender,
288            _background_tasks: background_tasks,
289        }
290    }
291
292    /// Subscribe to plotting progress notifications
293    pub fn on_plotting_progress(
294        &self,
295        callback: HandlerFn3<Ed25519PublicKey, SectorIndex, SectorPlottingProgress>,
296    ) -> HandlerId {
297        self.handlers.plotting_progress.add(callback)
298    }
299
300    #[expect(clippy::too_many_arguments)]
301    async fn plot_sector_internal<PS>(
302        &self,
303        start: Instant,
304        sector_encoding_permit: OwnedSemaphorePermit,
305        public_key: Ed25519PublicKey,
306        shard_commitments_root: ShardCommitmentHash,
307        sector_index: SectorIndex,
308        farmer_protocol_info: FarmerProtocolInfo,
309        pieces_in_sector: u16,
310        mut progress_sender: PS,
311    ) where
312        PS: Sink<SectorPlottingProgress> + Unpin + Send + 'static,
313        PS::Error: Error,
314    {
315        trace!("Starting plotting, getting plotting permit");
316
317        let progress_updater = ProgressUpdater {
318            public_key,
319            sector_index,
320            handlers: Arc::clone(&self.handlers),
321        };
322
323        let mut retry_backoff_policy = self.retry_backoff_policy.build();
324
325        // Try to get plotter instance here first as a backpressure measure
326        let free_plotter_instance_fut = get_free_plotter_instance(
327            &self.nats_client,
328            &progress_updater,
329            &mut progress_sender,
330            &mut retry_backoff_policy,
331        );
332        let mut maybe_free_instance = free_plotter_instance_fut.await;
333        if maybe_free_instance.is_none() {
334            return;
335        }
336
337        trace!("Got plotting permit #1");
338
339        let nats_client = self.nats_client.clone();
340
341        let plotting_fut = async move {
342            'outer: loop {
343                // Take free instance that was found earlier if available or try to find a new one
344                let free_instance = match maybe_free_instance.take() {
345                    Some(free_instance) => free_instance,
346                    None => {
347                        let free_plotter_instance_fut = get_free_plotter_instance(
348                            &nats_client,
349                            &progress_updater,
350                            &mut progress_sender,
351                            &mut retry_backoff_policy,
352                        );
353                        let Some(free_instance) = free_plotter_instance_fut.await else {
354                            break;
355                        };
356                        trace!("Got plotting permit #2");
357                        free_instance
358                    }
359                };
360
361                let response_stream_result = nats_client
362                    .stream_request(
363                        &ClusterPlotterPlotSectorRequest {
364                            public_key,
365                            shard_commitments_root,
366                            sector_index,
367                            farmer_protocol_info,
368                            pieces_in_sector,
369                        },
370                        Some(&free_instance),
371                    )
372                    .await;
373                trace!("Subscribed to plotting notifications");
374
375                let mut response_stream = match response_stream_result {
376                    Ok(response_stream) => response_stream,
377                    Err(error) => {
378                        progress_updater
379                            .update_progress_and_events(
380                                &mut progress_sender,
381                                SectorPlottingProgress::Error {
382                                    error: format!("Failed make stream request: {error}"),
383                                },
384                            )
385                            .await;
386
387                        break;
388                    }
389                };
390
391                // Allow to buffer up to the whole sector in memory to not block plotter on the
392                // other side
393                let (mut sector_sender, sector_receiver) = mpsc::channel(
394                    (sector_size(pieces_in_sector) / nats_client.approximate_max_message_size())
395                        .max(1),
396                );
397                let mut maybe_sector_receiver = Some(sector_receiver);
398                loop {
399                    match tokio::time::timeout(PING_TIMEOUT, response_stream.next()).await {
400                        Ok(Some(response)) => {
401                            match process_response_notification(
402                                &start,
403                                &free_instance,
404                                &progress_updater,
405                                &mut progress_sender,
406                                &mut retry_backoff_policy,
407                                response,
408                                &mut sector_sender,
409                                &mut maybe_sector_receiver,
410                            )
411                            .await
412                            {
413                                ResponseProcessingResult::Retry => {
414                                    debug!("Retrying");
415                                    continue 'outer;
416                                }
417                                ResponseProcessingResult::Abort => {
418                                    debug!("Aborting");
419                                    break 'outer;
420                                }
421                                ResponseProcessingResult::Continue => {
422                                    trace!("Continue");
423                                    // Nothing to do
424                                }
425                            }
426                        }
427                        Ok(None) => {
428                            trace!("Plotting done");
429                            break;
430                        }
431                        Err(_error) => {
432                            progress_updater
433                                .update_progress_and_events(
434                                    &mut progress_sender,
435                                    SectorPlottingProgress::Error {
436                                        error: "Timed out without ping from plotter".to_string(),
437                                    },
438                                )
439                                .await;
440                            break;
441                        }
442                    }
443                }
444
445                break;
446            }
447
448            drop(sector_encoding_permit);
449        };
450
451        let plotting_task =
452            AsyncJoinOnDrop::new(tokio::spawn(plotting_fut.in_current_span()), true);
453        if let Err(error) = self.tasks_sender.clone().send(plotting_task).await {
454            warn!(%error, "Failed to send plotting task");
455
456            let progress = SectorPlottingProgress::Error {
457                error: format!("Failed to send plotting task: {error}"),
458            };
459
460            self.handlers
461                .plotting_progress
462                .call_simple(&public_key, &sector_index, &progress);
463        }
464    }
465}
466
467// Try to get free plotter instance and return `None` if it is not possible
468async fn get_free_plotter_instance<PS>(
469    nats_client: &NatsClient,
470    progress_updater: &ProgressUpdater,
471    progress_sender: &mut PS,
472    retry_backoff_policy: &mut ExponentialBackoff,
473) -> Option<String>
474where
475    PS: Sink<SectorPlottingProgress> + Unpin + Send + 'static,
476    PS::Error: Error,
477{
478    loop {
479        match nats_client
480            .request(&ClusterPlotterFreeInstanceRequest, None)
481            .await
482        {
483            Ok(Some(free_instance)) => {
484                return Some(free_instance);
485            }
486            Ok(None) => {
487                if let Some(delay) = retry_backoff_policy.next() {
488                    debug!("Instance was occupied, retrying #1");
489
490                    tokio::time::sleep(delay).await;
491                    continue;
492                } else {
493                    progress_updater
494                        .update_progress_and_events(
495                            progress_sender,
496                            SectorPlottingProgress::Error {
497                                error: "Instance was occupied, exiting #1".to_string(),
498                            },
499                        )
500                        .await;
501                    return None;
502                }
503            }
504            Err(error) => match error.kind() {
505                RequestErrorKind::TimedOut => {
506                    if let Some(delay) = retry_backoff_policy.next() {
507                        debug!("Plotter request timed out, retrying");
508
509                        tokio::time::sleep(delay).await;
510                        continue;
511                    } else {
512                        progress_updater
513                            .update_progress_and_events(
514                                progress_sender,
515                                SectorPlottingProgress::Error {
516                                    error: "Plotter request timed out, exiting".to_string(),
517                                },
518                            )
519                            .await;
520                        return None;
521                    }
522                }
523                RequestErrorKind::NoResponders => {
524                    if let Some(delay) = retry_backoff_policy.next() {
525                        debug!("No plotters, retrying");
526
527                        tokio::time::sleep(delay).await;
528                        continue;
529                    } else {
530                        progress_updater
531                            .update_progress_and_events(
532                                progress_sender,
533                                SectorPlottingProgress::Error {
534                                    error: "No plotters, exiting".to_string(),
535                                },
536                            )
537                            .await;
538                        return None;
539                    }
540                }
541                RequestErrorKind::InvalidSubject | RequestErrorKind::Other => {
542                    progress_updater
543                        .update_progress_and_events(
544                            progress_sender,
545                            SectorPlottingProgress::Error {
546                                error: format!("Failed to get free plotter instance: {error}"),
547                            },
548                        )
549                        .await;
550                    return None;
551                }
552            },
553        };
554    }
555}
556
557enum ResponseProcessingResult {
558    Retry,
559    Abort,
560    Continue,
561}
562
563#[expect(clippy::too_many_arguments)]
564async fn process_response_notification<PS>(
565    start: &Instant,
566    free_instance: &str,
567    progress_updater: &ProgressUpdater,
568    progress_sender: &mut PS,
569    retry_backoff_policy: &mut ExponentialBackoff,
570    response: ClusterSectorPlottingProgress,
571    sector_sender: &mut mpsc::Sender<Result<Bytes, String>>,
572    maybe_sector_receiver: &mut Option<mpsc::Receiver<Result<Bytes, String>>>,
573) -> ResponseProcessingResult
574where
575    PS: Sink<SectorPlottingProgress> + Unpin + Send + 'static,
576    PS::Error: Error,
577{
578    if !matches!(response, ClusterSectorPlottingProgress::SectorChunk(_)) {
579        trace!(?response, "Processing plotting response notification");
580    } else {
581        trace!("Processing plotting response notification (sector chunk)");
582    }
583
584    match response {
585        ClusterSectorPlottingProgress::Occupied => {
586            debug!(%free_instance, "Instance was occupied, retrying #2");
587
588            if let Some(delay) = retry_backoff_policy.next() {
589                debug!("Instance was occupied, retrying #2");
590
591                tokio::time::sleep(delay).await;
592                return ResponseProcessingResult::Retry;
593            } else {
594                debug!("Instance was occupied, exiting #2");
595                return ResponseProcessingResult::Abort;
596            }
597        }
598        ClusterSectorPlottingProgress::Ping => {
599            // Expected
600        }
601        ClusterSectorPlottingProgress::Downloading => {
602            if !progress_updater
603                .update_progress_and_events(progress_sender, SectorPlottingProgress::Downloading)
604                .await
605            {
606                return ResponseProcessingResult::Abort;
607            }
608        }
609        ClusterSectorPlottingProgress::Downloaded(time) => {
610            if !progress_updater
611                .update_progress_and_events(
612                    progress_sender,
613                    SectorPlottingProgress::Downloaded(time),
614                )
615                .await
616            {
617                return ResponseProcessingResult::Abort;
618            }
619        }
620        ClusterSectorPlottingProgress::Encoding => {
621            if !progress_updater
622                .update_progress_and_events(progress_sender, SectorPlottingProgress::Encoding)
623                .await
624            {
625                return ResponseProcessingResult::Abort;
626            }
627        }
628        ClusterSectorPlottingProgress::Encoded(time) => {
629            if !progress_updater
630                .update_progress_and_events(progress_sender, SectorPlottingProgress::Encoded(time))
631                .await
632            {
633                return ResponseProcessingResult::Abort;
634            }
635        }
636        ClusterSectorPlottingProgress::Finished {
637            plotted_sector,
638            time: _,
639        } => {
640            let Some(sector_receiver) = maybe_sector_receiver.take() else {
641                debug!("Unexpected duplicated sector plotting progress Finished");
642
643                progress_updater
644                    .update_progress_and_events(
645                        progress_sender,
646                        SectorPlottingProgress::Error {
647                            error: "Unexpected duplicated sector plotting progress Finished"
648                                .to_string(),
649                        },
650                    )
651                    .await;
652                return ResponseProcessingResult::Abort;
653            };
654
655            let progress = SectorPlottingProgress::Finished {
656                plotted_sector,
657                // Use local time instead of reported by remote plotter
658                time: start.elapsed(),
659                sector: Box::pin(sector_receiver),
660            };
661            if !progress_updater
662                .update_progress_and_events(progress_sender, progress)
663                .await
664            {
665                return ResponseProcessingResult::Abort;
666            }
667
668            return ResponseProcessingResult::Continue;
669        }
670        // This variant must be sent after Finished and it handled above
671        ClusterSectorPlottingProgress::SectorChunk(maybe_sector_chunk) => {
672            if let Err(error) = sector_sender.send(maybe_sector_chunk).await {
673                warn!(%error, "Failed to send sector chunk");
674                return ResponseProcessingResult::Abort;
675            }
676            return ResponseProcessingResult::Continue;
677        }
678        ClusterSectorPlottingProgress::Error { error } => {
679            if !progress_updater
680                .update_progress_and_events(
681                    progress_sender,
682                    SectorPlottingProgress::Error { error },
683                )
684                .await
685            {
686                return ResponseProcessingResult::Abort;
687            }
688        }
689    }
690
691    ResponseProcessingResult::Continue
692}
693
694struct ProgressUpdater {
695    public_key: Ed25519PublicKey,
696    sector_index: SectorIndex,
697    handlers: Arc<Handlers>,
698}
699
700impl ProgressUpdater {
701    /// Returns `true` on success and `false` if progress receiver channel is gone
702    async fn update_progress_and_events<PS>(
703        &self,
704        progress_sender: &mut PS,
705        progress: SectorPlottingProgress,
706    ) -> bool
707    where
708        PS: Sink<SectorPlottingProgress> + Unpin,
709        PS::Error: Error,
710    {
711        self.handlers.plotting_progress.call_simple(
712            &self.public_key,
713            &self.sector_index,
714            &progress,
715        );
716
717        if let Err(error) = progress_sender.send(progress).await {
718            warn!(%error, "Failed to send error progress update");
719
720            false
721        } else {
722            true
723        }
724    }
725}
726
727/// Create plotter service that will be processing incoming requests.
728///
729/// Implementation is using concurrency with multiple tokio tasks, but can be started multiple times
730/// per controller instance in order to parallelize more work across threads if needed.
731pub async fn plotter_service<P>(nats_client: &NatsClient, plotter: &P) -> anyhow::Result<()>
732where
733    P: Plotter + Sync,
734{
735    let plotter_id = ClusterPlotterId::new();
736
737    select! {
738        result = free_instance_responder(&plotter_id, nats_client, plotter).fuse() => {
739            result
740        }
741        result = plot_sector_responder(&plotter_id, nats_client, plotter).fuse() => {
742            result
743        }
744    }
745}
746
747async fn free_instance_responder<P>(
748    plotter_id: &ClusterPlotterId,
749    nats_client: &NatsClient,
750    plotter: &P,
751) -> anyhow::Result<()>
752where
753    P: Plotter + Sync,
754{
755    loop {
756        while !plotter.has_free_capacity().await.unwrap_or_default() {
757            tokio::time::sleep(FREE_CAPACITY_CHECK_INTERVAL).await;
758        }
759
760        let mut subscription = nats_client
761            .queue_subscribe(
762                ClusterPlotterFreeInstanceRequest::SUBJECT,
763                "ab.plotter".to_string(),
764            )
765            .await
766            .map_err(|error| anyhow!("Failed to subscribe to free instance requests: {error}"))?;
767        debug!(?subscription, "Free instance subscription");
768
769        while let Some(message) = subscription.next().await {
770            let Some(reply_subject) = message.reply else {
771                continue;
772            };
773
774            debug!(%reply_subject, "Free instance request");
775
776            let has_free_capacity = plotter.has_free_capacity().await.unwrap_or_default();
777            let response: <ClusterPlotterFreeInstanceRequest as GenericRequest>::Response =
778                has_free_capacity.then(|| plotter_id.to_string());
779
780            if let Err(error) = nats_client
781                .publish(reply_subject, response.encode().into())
782                .await
783            {
784                warn!(%error, "Failed to send free instance response");
785            }
786
787            if !has_free_capacity {
788                subscription.unsubscribe().await.map_err(|error| {
789                    anyhow!("Failed to unsubscribe from free instance requests: {error}")
790                })?;
791            }
792        }
793    }
794}
795
796async fn plot_sector_responder<P>(
797    plotter_id: &ClusterPlotterId,
798    nats_client: &NatsClient,
799    plotter: &P,
800) -> anyhow::Result<()>
801where
802    P: Plotter + Sync,
803{
804    let plotter_id_string = plotter_id.to_string();
805
806    nats_client
807        .stream_request_responder(
808            Some(&plotter_id_string),
809            Some(plotter_id_string.clone()),
810            |request| async move {
811                let (progress_sender, mut progress_receiver) = mpsc::channel(10);
812
813                let fut =
814                    process_plot_sector_request(nats_client, plotter, request, progress_sender);
815                let mut fut = Box::pin(fut.fuse());
816
817                Some(
818                    // Drive above future and stream back any pieces that were downloaded so far
819                    stream::poll_fn(move |cx| {
820                        if !fut.is_terminated() {
821                            // Result doesn't matter, we'll need to poll stream below anyway
822                            let _ = fut.poll_unpin(cx);
823                        }
824
825                        if let Poll::Ready(maybe_result) = progress_receiver.poll_next_unpin(cx) {
826                            return Poll::Ready(maybe_result);
827                        }
828
829                        // Exit will be done by the stream above
830                        Poll::Pending
831                    }),
832                )
833            },
834        )
835        .await
836}
837
838async fn process_plot_sector_request<P>(
839    nats_client: &NatsClient,
840    plotter: &P,
841    request: ClusterPlotterPlotSectorRequest,
842    mut response_proxy_sender: mpsc::Sender<ClusterSectorPlottingProgress>,
843) where
844    P: Plotter,
845{
846    let ClusterPlotterPlotSectorRequest {
847        public_key,
848        shard_commitments_root,
849        sector_index,
850        farmer_protocol_info,
851        pieces_in_sector,
852    } = request;
853
854    // Wrapper future just for instrumentation below
855    let inner_fut = async {
856        info!("Plot sector request");
857
858        let (progress_sender, mut progress_receiver) = mpsc::channel(1);
859
860        if !plotter
861            .try_plot_sector(
862                public_key,
863                shard_commitments_root,
864                sector_index,
865                farmer_protocol_info,
866                pieces_in_sector,
867                false,
868                progress_sender,
869            )
870            .await
871        {
872            debug!("Plotter is currently occupied and can't plot more sectors");
873
874            if let Err(error) = response_proxy_sender
875                .send(ClusterSectorPlottingProgress::Occupied)
876                .await
877            {
878                warn!(%error, "Failed to send plotting progress");
879                return;
880            }
881            return;
882        }
883
884        let progress_proxy_fut = {
885            let mut response_proxy_sender = response_proxy_sender.clone();
886            let approximate_max_message_size = nats_client.approximate_max_message_size();
887
888            async move {
889                while let Some(progress) = progress_receiver.next().await {
890                    send_publish_progress(
891                        &mut response_proxy_sender,
892                        progress,
893                        approximate_max_message_size,
894                    )
895                    .await;
896                }
897            }
898        };
899
900        let mut ping_interval = tokio::time::interval(PING_INTERVAL);
901        ping_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
902        let ping_fut = async {
903            loop {
904                ping_interval.tick().await;
905                if let Err(error) = response_proxy_sender
906                    .send(ClusterSectorPlottingProgress::Ping)
907                    .await
908                {
909                    warn!(%error, "Failed to send plotting ping");
910                    return;
911                }
912            }
913        };
914
915        select! {
916            _ = progress_proxy_fut.fuse() => {
917                // Done
918            }
919            _ = ping_fut.fuse() => {
920                unreachable!("Ping loop never ends");
921            }
922        }
923
924        info!("Finished plotting sector successfully");
925    };
926
927    inner_fut
928        .instrument(info_span!("", %public_key, %sector_index))
929        .await
930}
931
932async fn send_publish_progress(
933    response_sender: &mut mpsc::Sender<ClusterSectorPlottingProgress>,
934    progress: SectorPlottingProgress,
935    approximate_max_message_size: usize,
936) {
937    // Finished response is large and needs special care
938    let cluster_progress = match progress {
939        SectorPlottingProgress::Downloading => ClusterSectorPlottingProgress::Downloading,
940        SectorPlottingProgress::Downloaded(time) => ClusterSectorPlottingProgress::Downloaded(time),
941        SectorPlottingProgress::Encoding => ClusterSectorPlottingProgress::Encoding,
942        SectorPlottingProgress::Encoded(time) => ClusterSectorPlottingProgress::Encoded(time),
943        SectorPlottingProgress::Finished {
944            plotted_sector,
945            time,
946            mut sector,
947        } => {
948            if let Err(error) = response_sender
949                .send(ClusterSectorPlottingProgress::Finished {
950                    plotted_sector,
951                    time,
952                })
953                .await
954            {
955                warn!(%error, "Failed to send plotting progress");
956                return;
957            }
958
959            while let Some(maybe_sector_chunk) = sector.next().await {
960                match maybe_sector_chunk {
961                    Ok(sector_chunk) => {
962                        // Slice large chunks into smaller ones before publishing
963                        for small_sector_chunk in sector_chunk.chunks(approximate_max_message_size)
964                        {
965                            if let Err(error) = response_sender
966                                .send(ClusterSectorPlottingProgress::SectorChunk(Ok(
967                                    sector_chunk.slice_ref(small_sector_chunk)
968                                )))
969                                .await
970                            {
971                                warn!(%error, "Failed to send plotting progress");
972                                return;
973                            }
974                        }
975                    }
976                    Err(error) => {
977                        if let Err(error) = response_sender
978                            .send(ClusterSectorPlottingProgress::SectorChunk(Err(error)))
979                            .await
980                        {
981                            warn!(%error, "Failed to send plotting progress");
982                            return;
983                        }
984                    }
985                }
986            }
987
988            return;
989        }
990        SectorPlottingProgress::Error { error } => ClusterSectorPlottingProgress::Error { error },
991    };
992
993    if let Err(error) = response_sender.send(cluster_progress).await {
994        warn!(%error, "Failed to send plotting progress");
995    }
996}