ab_farmer/cluster/
plotter.rs

1//! Farming cluster plotter
2//!
3//! Plotter is responsible for plotting sectors in response to farmer requests.
4//!
5//! This module exposes some data structures for NATS communication, custom plotter
6//! implementation designed to work with cluster plotter and a service function to drive the backend
7//! part of the plotter.
8
9use crate::cluster::nats_client::{GenericRequest, GenericStreamRequest, NatsClient};
10use crate::plotter::{Plotter, SectorPlottingProgress};
11use crate::utils::AsyncJoinOnDrop;
12use ab_core_primitives::ed25519::Ed25519PublicKey;
13use ab_core_primitives::sectors::SectorIndex;
14use ab_core_primitives::solutions::ShardCommitmentHash;
15use ab_farmer_components::FarmerProtocolInfo;
16use ab_farmer_components::plotting::PlottedSector;
17use ab_farmer_components::sector::sector_size;
18use anyhow::anyhow;
19use async_nats::RequestErrorKind;
20use async_trait::async_trait;
21use backoff::ExponentialBackoff;
22use backoff::backoff::Backoff;
23use bytes::Bytes;
24use derive_more::Display;
25use event_listener_primitives::{Bag, HandlerId};
26use futures::channel::mpsc;
27use futures::future::FusedFuture;
28use futures::stream::FuturesUnordered;
29use futures::{FutureExt, Sink, SinkExt, StreamExt, select, stream};
30use parity_scale_codec::{Decode, Encode};
31use std::error::Error;
32use std::future::pending;
33use std::num::NonZeroUsize;
34use std::pin::pin;
35use std::sync::Arc;
36use std::task::Poll;
37use std::time::{Duration, Instant};
38use tokio::sync::{OwnedSemaphorePermit, Semaphore};
39use tokio::time::MissedTickBehavior;
40use tracing::{Instrument, debug, info, info_span, trace, warn};
41use ulid::Ulid;
42
43const FREE_CAPACITY_CHECK_INTERVAL: Duration = Duration::from_secs(1);
44/// Intervals between pings from plotter server to client
45const PING_INTERVAL: Duration = Duration::from_secs(10);
46/// Timeout after which plotter that doesn't send pings is assumed to be down
47const PING_TIMEOUT: Duration = Duration::from_mins(1);
48
49/// Type alias used for event handlers
50pub type HandlerFn3<A, B, C> = Arc<dyn Fn(&A, &B, &C) + Send + Sync + 'static>;
51type Handler3<A, B, C> = Bag<HandlerFn3<A, B, C>, A, B, C>;
52
53/// An ephemeral identifier for a plotter
54#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Display)]
55pub enum ClusterPlotterId {
56    /// Plotter ID
57    Ulid(Ulid),
58}
59
60#[expect(clippy::new_without_default)]
61impl ClusterPlotterId {
62    /// Creates new ID
63    pub fn new() -> Self {
64        Self::Ulid(Ulid::new())
65    }
66}
67
68/// Request for free plotter instance
69#[derive(Debug, Clone, Encode, Decode)]
70struct ClusterPlotterFreeInstanceRequest;
71
72impl GenericRequest for ClusterPlotterFreeInstanceRequest {
73    const SUBJECT: &'static str = "ab.plotter.free-instance";
74    /// Might be `None` if instance had to respond, but turned out it was fully occupied already
75    type Response = Option<String>;
76}
77
78#[derive(Debug, Encode, Decode)]
79enum ClusterSectorPlottingProgress {
80    /// Plotter is already fully occupied with other work
81    Occupied,
82    /// Periodic ping indicating plotter is still busy
83    Ping,
84    /// Downloading sector pieces
85    Downloading,
86    /// Downloaded sector pieces
87    Downloaded(Duration),
88    /// Encoding sector pieces
89    Encoding,
90    /// Encoded sector pieces
91    Encoded(Duration),
92    /// Finished plotting, followed by a series of sector chunks
93    Finished {
94        /// Information about plotted sector
95        plotted_sector: PlottedSector,
96        /// How much time it took to plot a sector
97        time: Duration,
98    },
99    /// Sector chunk after finished plotting
100    SectorChunk(Result<Bytes, String>),
101    /// Plotting failed
102    Error {
103        /// Error message
104        error: String,
105    },
106}
107
108/// Request to plot sector from plotter
109#[derive(Debug, Clone, Encode, Decode)]
110struct ClusterPlotterPlotSectorRequest {
111    public_key: Ed25519PublicKey,
112    shard_commitments_root: ShardCommitmentHash,
113    sector_index: SectorIndex,
114    farmer_protocol_info: FarmerProtocolInfo,
115    pieces_in_sector: u16,
116}
117
118impl GenericStreamRequest for ClusterPlotterPlotSectorRequest {
119    const SUBJECT: &'static str = "ab.plotter.*.plot-sector";
120    type Response = ClusterSectorPlottingProgress;
121}
122
123#[derive(Default, Debug)]
124struct Handlers {
125    plotting_progress: Handler3<Ed25519PublicKey, SectorIndex, SectorPlottingProgress>,
126}
127
128/// Cluster plotter
129#[derive(Debug)]
130pub struct ClusterPlotter {
131    sector_encoding_semaphore: Arc<Semaphore>,
132    retry_backoff_policy: ExponentialBackoff,
133    nats_client: NatsClient,
134    handlers: Arc<Handlers>,
135    tasks_sender: mpsc::Sender<AsyncJoinOnDrop<()>>,
136    _background_tasks: AsyncJoinOnDrop<()>,
137}
138
139impl Drop for ClusterPlotter {
140    #[inline]
141    fn drop(&mut self) {
142        self.tasks_sender.close_channel();
143    }
144}
145
146#[async_trait]
147impl Plotter for ClusterPlotter {
148    async fn has_free_capacity(&self) -> Result<bool, String> {
149        Ok(self.sector_encoding_semaphore.available_permits() > 0
150            && self
151                .nats_client
152                .request(&ClusterPlotterFreeInstanceRequest, None)
153                .await
154                .map_err(|error| error.to_string())?
155                .is_some())
156    }
157
158    async fn plot_sector(
159        &self,
160        public_key: Ed25519PublicKey,
161        shard_commitments_root: ShardCommitmentHash,
162        sector_index: SectorIndex,
163        farmer_protocol_info: FarmerProtocolInfo,
164        pieces_in_sector: u16,
165        _replotting: bool,
166        mut progress_sender: mpsc::Sender<SectorPlottingProgress>,
167    ) {
168        let start = Instant::now();
169
170        // Done outside the future below as a backpressure, ensuring that it is not possible to
171        // schedule unbounded number of plotting tasks
172        let sector_encoding_permit = match Arc::clone(&self.sector_encoding_semaphore)
173            .acquire_owned()
174            .await
175        {
176            Ok(sector_encoding_permit) => sector_encoding_permit,
177            Err(error) => {
178                warn!(%error, "Failed to acquire sector encoding permit");
179
180                let progress_updater = ProgressUpdater {
181                    public_key,
182                    sector_index,
183                    handlers: Arc::clone(&self.handlers),
184                };
185
186                progress_updater
187                    .update_progress_and_events(
188                        &mut progress_sender,
189                        SectorPlottingProgress::Error {
190                            error: format!("Failed to acquire sector encoding permit: {error}"),
191                        },
192                    )
193                    .await;
194
195                return;
196            }
197        };
198
199        self.plot_sector_internal(
200            start,
201            sector_encoding_permit,
202            public_key,
203            shard_commitments_root,
204            sector_index,
205            farmer_protocol_info,
206            pieces_in_sector,
207            progress_sender,
208        )
209        .await
210    }
211
212    async fn try_plot_sector(
213        &self,
214        public_key: Ed25519PublicKey,
215        shard_commitments_root: ShardCommitmentHash,
216        sector_index: SectorIndex,
217        farmer_protocol_info: FarmerProtocolInfo,
218        pieces_in_sector: u16,
219        _replotting: bool,
220        progress_sender: mpsc::Sender<SectorPlottingProgress>,
221    ) -> bool {
222        let start = Instant::now();
223
224        let Ok(sector_encoding_permit) =
225            Arc::clone(&self.sector_encoding_semaphore).try_acquire_owned()
226        else {
227            return false;
228        };
229
230        self.plot_sector_internal(
231            start,
232            sector_encoding_permit,
233            public_key,
234            shard_commitments_root,
235            sector_index,
236            farmer_protocol_info,
237            pieces_in_sector,
238            progress_sender,
239        )
240        .await;
241
242        true
243    }
244}
245
246impl ClusterPlotter {
247    /// Create a new instance
248    pub fn new(
249        nats_client: NatsClient,
250        sector_encoding_concurrency: NonZeroUsize,
251        retry_backoff_policy: ExponentialBackoff,
252    ) -> Self {
253        let sector_encoding_semaphore = Arc::new(Semaphore::new(sector_encoding_concurrency.get()));
254
255        let (tasks_sender, mut tasks_receiver) = mpsc::channel(1);
256
257        // Basically runs plotting tasks in the background and allows to abort on drop
258        let background_tasks = AsyncJoinOnDrop::new(
259            tokio::spawn(async move {
260                let background_tasks = FuturesUnordered::new();
261                let mut background_tasks = pin!(background_tasks);
262                // Just so that `FuturesUnordered` will never end
263                background_tasks.push(AsyncJoinOnDrop::new(tokio::spawn(pending::<()>()), true));
264
265                loop {
266                    select! {
267                        maybe_background_task = tasks_receiver.next().fuse() => {
268                            let Some(background_task) = maybe_background_task else {
269                                break;
270                            };
271
272                            background_tasks.push(background_task);
273                        },
274                        _ = background_tasks.select_next_some() => {
275                            // Nothing to do
276                        }
277                    }
278                }
279            }),
280            true,
281        );
282
283        Self {
284            sector_encoding_semaphore,
285            retry_backoff_policy,
286            nats_client,
287            handlers: Arc::default(),
288            tasks_sender,
289            _background_tasks: background_tasks,
290        }
291    }
292
293    /// Subscribe to plotting progress notifications
294    pub fn on_plotting_progress(
295        &self,
296        callback: HandlerFn3<Ed25519PublicKey, SectorIndex, SectorPlottingProgress>,
297    ) -> HandlerId {
298        self.handlers.plotting_progress.add(callback)
299    }
300
301    #[expect(clippy::too_many_arguments)]
302    async fn plot_sector_internal<PS>(
303        &self,
304        start: Instant,
305        sector_encoding_permit: OwnedSemaphorePermit,
306        public_key: Ed25519PublicKey,
307        shard_commitments_root: ShardCommitmentHash,
308        sector_index: SectorIndex,
309        farmer_protocol_info: FarmerProtocolInfo,
310        pieces_in_sector: u16,
311        mut progress_sender: PS,
312    ) where
313        PS: Sink<SectorPlottingProgress> + Unpin + Send + 'static,
314        PS::Error: Error,
315    {
316        trace!("Starting plotting, getting plotting permit");
317
318        let progress_updater = ProgressUpdater {
319            public_key,
320            sector_index,
321            handlers: Arc::clone(&self.handlers),
322        };
323
324        let mut retry_backoff_policy = self.retry_backoff_policy.clone();
325        retry_backoff_policy.reset();
326
327        // Try to get plotter instance here first as a backpressure measure
328        let free_plotter_instance_fut = get_free_plotter_instance(
329            &self.nats_client,
330            &progress_updater,
331            &mut progress_sender,
332            &mut retry_backoff_policy,
333        );
334        let mut maybe_free_instance = free_plotter_instance_fut.await;
335        if maybe_free_instance.is_none() {
336            return;
337        }
338
339        trace!("Got plotting permit #1");
340
341        let nats_client = self.nats_client.clone();
342
343        let plotting_fut = async move {
344            'outer: loop {
345                // Take free instance that was found earlier if available or try to find a new one
346                let free_instance = match maybe_free_instance.take() {
347                    Some(free_instance) => free_instance,
348                    None => {
349                        let free_plotter_instance_fut = get_free_plotter_instance(
350                            &nats_client,
351                            &progress_updater,
352                            &mut progress_sender,
353                            &mut retry_backoff_policy,
354                        );
355                        let Some(free_instance) = free_plotter_instance_fut.await else {
356                            break;
357                        };
358                        trace!("Got plotting permit #2");
359                        free_instance
360                    }
361                };
362
363                let response_stream_result = nats_client
364                    .stream_request(
365                        &ClusterPlotterPlotSectorRequest {
366                            public_key,
367                            shard_commitments_root,
368                            sector_index,
369                            farmer_protocol_info,
370                            pieces_in_sector,
371                        },
372                        Some(&free_instance),
373                    )
374                    .await;
375                trace!("Subscribed to plotting notifications");
376
377                let mut response_stream = match response_stream_result {
378                    Ok(response_stream) => response_stream,
379                    Err(error) => {
380                        progress_updater
381                            .update_progress_and_events(
382                                &mut progress_sender,
383                                SectorPlottingProgress::Error {
384                                    error: format!("Failed make stream request: {error}"),
385                                },
386                            )
387                            .await;
388
389                        break;
390                    }
391                };
392
393                // Allow to buffer up to the whole sector in memory to not block plotter on the
394                // other side
395                let (mut sector_sender, sector_receiver) = mpsc::channel(
396                    (sector_size(pieces_in_sector) / nats_client.approximate_max_message_size())
397                        .max(1),
398                );
399                let mut maybe_sector_receiver = Some(sector_receiver);
400                loop {
401                    match tokio::time::timeout(PING_TIMEOUT, response_stream.next()).await {
402                        Ok(Some(response)) => {
403                            match process_response_notification(
404                                &start,
405                                &free_instance,
406                                &progress_updater,
407                                &mut progress_sender,
408                                &mut retry_backoff_policy,
409                                response,
410                                &mut sector_sender,
411                                &mut maybe_sector_receiver,
412                            )
413                            .await
414                            {
415                                ResponseProcessingResult::Retry => {
416                                    debug!("Retrying");
417                                    continue 'outer;
418                                }
419                                ResponseProcessingResult::Abort => {
420                                    debug!("Aborting");
421                                    break 'outer;
422                                }
423                                ResponseProcessingResult::Continue => {
424                                    trace!("Continue");
425                                    // Nothing to do
426                                }
427                            }
428                        }
429                        Ok(None) => {
430                            trace!("Plotting done");
431                            break;
432                        }
433                        Err(_error) => {
434                            progress_updater
435                                .update_progress_and_events(
436                                    &mut progress_sender,
437                                    SectorPlottingProgress::Error {
438                                        error: "Timed out without ping from plotter".to_string(),
439                                    },
440                                )
441                                .await;
442                            break;
443                        }
444                    }
445                }
446
447                break;
448            }
449
450            drop(sector_encoding_permit);
451        };
452
453        let plotting_task =
454            AsyncJoinOnDrop::new(tokio::spawn(plotting_fut.in_current_span()), true);
455        if let Err(error) = self.tasks_sender.clone().send(plotting_task).await {
456            warn!(%error, "Failed to send plotting task");
457
458            let progress = SectorPlottingProgress::Error {
459                error: format!("Failed to send plotting task: {error}"),
460            };
461
462            self.handlers
463                .plotting_progress
464                .call_simple(&public_key, &sector_index, &progress);
465        }
466    }
467}
468
469// Try to get free plotter instance and return `None` if it is not possible
470async fn get_free_plotter_instance<PS>(
471    nats_client: &NatsClient,
472    progress_updater: &ProgressUpdater,
473    progress_sender: &mut PS,
474    retry_backoff_policy: &mut ExponentialBackoff,
475) -> Option<String>
476where
477    PS: Sink<SectorPlottingProgress> + Unpin + Send + 'static,
478    PS::Error: Error,
479{
480    loop {
481        match nats_client
482            .request(&ClusterPlotterFreeInstanceRequest, None)
483            .await
484        {
485            Ok(Some(free_instance)) => {
486                return Some(free_instance);
487            }
488            Ok(None) => {
489                if let Some(delay) = retry_backoff_policy.next_backoff() {
490                    debug!("Instance was occupied, retrying #1");
491
492                    tokio::time::sleep(delay).await;
493                    continue;
494                } else {
495                    progress_updater
496                        .update_progress_and_events(
497                            progress_sender,
498                            SectorPlottingProgress::Error {
499                                error: "Instance was occupied, exiting #1".to_string(),
500                            },
501                        )
502                        .await;
503                    return None;
504                }
505            }
506            Err(error) => match error.kind() {
507                RequestErrorKind::TimedOut => {
508                    if let Some(delay) = retry_backoff_policy.next_backoff() {
509                        debug!("Plotter request timed out, retrying");
510
511                        tokio::time::sleep(delay).await;
512                        continue;
513                    } else {
514                        progress_updater
515                            .update_progress_and_events(
516                                progress_sender,
517                                SectorPlottingProgress::Error {
518                                    error: "Plotter request timed out, exiting".to_string(),
519                                },
520                            )
521                            .await;
522                        return None;
523                    }
524                }
525                RequestErrorKind::NoResponders => {
526                    if let Some(delay) = retry_backoff_policy.next_backoff() {
527                        debug!("No plotters, retrying");
528
529                        tokio::time::sleep(delay).await;
530                        continue;
531                    } else {
532                        progress_updater
533                            .update_progress_and_events(
534                                progress_sender,
535                                SectorPlottingProgress::Error {
536                                    error: "No plotters, exiting".to_string(),
537                                },
538                            )
539                            .await;
540                        return None;
541                    }
542                }
543                RequestErrorKind::Other => {
544                    progress_updater
545                        .update_progress_and_events(
546                            progress_sender,
547                            SectorPlottingProgress::Error {
548                                error: format!("Failed to get free plotter instance: {error}"),
549                            },
550                        )
551                        .await;
552                    return None;
553                }
554            },
555        };
556    }
557}
558
559enum ResponseProcessingResult {
560    Retry,
561    Abort,
562    Continue,
563}
564
565#[expect(clippy::too_many_arguments)]
566async fn process_response_notification<PS>(
567    start: &Instant,
568    free_instance: &str,
569    progress_updater: &ProgressUpdater,
570    progress_sender: &mut PS,
571    retry_backoff_policy: &mut ExponentialBackoff,
572    response: ClusterSectorPlottingProgress,
573    sector_sender: &mut mpsc::Sender<Result<Bytes, String>>,
574    maybe_sector_receiver: &mut Option<mpsc::Receiver<Result<Bytes, String>>>,
575) -> ResponseProcessingResult
576where
577    PS: Sink<SectorPlottingProgress> + Unpin + Send + 'static,
578    PS::Error: Error,
579{
580    if !matches!(response, ClusterSectorPlottingProgress::SectorChunk(_)) {
581        trace!(?response, "Processing plotting response notification");
582    } else {
583        trace!("Processing plotting response notification (sector chunk)");
584    }
585
586    match response {
587        ClusterSectorPlottingProgress::Occupied => {
588            debug!(%free_instance, "Instance was occupied, retrying #2");
589
590            if let Some(delay) = retry_backoff_policy.next_backoff() {
591                debug!("Instance was occupied, retrying #2");
592
593                tokio::time::sleep(delay).await;
594                return ResponseProcessingResult::Retry;
595            } else {
596                debug!("Instance was occupied, exiting #2");
597                return ResponseProcessingResult::Abort;
598            }
599        }
600        ClusterSectorPlottingProgress::Ping => {
601            // Expected
602        }
603        ClusterSectorPlottingProgress::Downloading => {
604            if !progress_updater
605                .update_progress_and_events(progress_sender, SectorPlottingProgress::Downloading)
606                .await
607            {
608                return ResponseProcessingResult::Abort;
609            }
610        }
611        ClusterSectorPlottingProgress::Downloaded(time) => {
612            if !progress_updater
613                .update_progress_and_events(
614                    progress_sender,
615                    SectorPlottingProgress::Downloaded(time),
616                )
617                .await
618            {
619                return ResponseProcessingResult::Abort;
620            }
621        }
622        ClusterSectorPlottingProgress::Encoding => {
623            if !progress_updater
624                .update_progress_and_events(progress_sender, SectorPlottingProgress::Encoding)
625                .await
626            {
627                return ResponseProcessingResult::Abort;
628            }
629        }
630        ClusterSectorPlottingProgress::Encoded(time) => {
631            if !progress_updater
632                .update_progress_and_events(progress_sender, SectorPlottingProgress::Encoded(time))
633                .await
634            {
635                return ResponseProcessingResult::Abort;
636            }
637        }
638        ClusterSectorPlottingProgress::Finished {
639            plotted_sector,
640            time: _,
641        } => {
642            let Some(sector_receiver) = maybe_sector_receiver.take() else {
643                debug!("Unexpected duplicated sector plotting progress Finished");
644
645                progress_updater
646                    .update_progress_and_events(
647                        progress_sender,
648                        SectorPlottingProgress::Error {
649                            error: "Unexpected duplicated sector plotting progress Finished"
650                                .to_string(),
651                        },
652                    )
653                    .await;
654                return ResponseProcessingResult::Abort;
655            };
656
657            let progress = SectorPlottingProgress::Finished {
658                plotted_sector,
659                // Use local time instead of reported by remote plotter
660                time: start.elapsed(),
661                sector: Box::pin(sector_receiver),
662            };
663            if !progress_updater
664                .update_progress_and_events(progress_sender, progress)
665                .await
666            {
667                return ResponseProcessingResult::Abort;
668            }
669
670            return ResponseProcessingResult::Continue;
671        }
672        // This variant must be sent after Finished and it handled above
673        ClusterSectorPlottingProgress::SectorChunk(maybe_sector_chunk) => {
674            if let Err(error) = sector_sender.send(maybe_sector_chunk).await {
675                warn!(%error, "Failed to send sector chunk");
676                return ResponseProcessingResult::Abort;
677            }
678            return ResponseProcessingResult::Continue;
679        }
680        ClusterSectorPlottingProgress::Error { error } => {
681            if !progress_updater
682                .update_progress_and_events(
683                    progress_sender,
684                    SectorPlottingProgress::Error { error },
685                )
686                .await
687            {
688                return ResponseProcessingResult::Abort;
689            }
690        }
691    }
692
693    ResponseProcessingResult::Continue
694}
695
696struct ProgressUpdater {
697    public_key: Ed25519PublicKey,
698    sector_index: SectorIndex,
699    handlers: Arc<Handlers>,
700}
701
702impl ProgressUpdater {
703    /// Returns `true` on success and `false` if progress receiver channel is gone
704    async fn update_progress_and_events<PS>(
705        &self,
706        progress_sender: &mut PS,
707        progress: SectorPlottingProgress,
708    ) -> bool
709    where
710        PS: Sink<SectorPlottingProgress> + Unpin,
711        PS::Error: Error,
712    {
713        self.handlers.plotting_progress.call_simple(
714            &self.public_key,
715            &self.sector_index,
716            &progress,
717        );
718
719        if let Err(error) = progress_sender.send(progress).await {
720            warn!(%error, "Failed to send error progress update");
721
722            false
723        } else {
724            true
725        }
726    }
727}
728
729/// Create plotter service that will be processing incoming requests.
730///
731/// Implementation is using concurrency with multiple tokio tasks, but can be started multiple times
732/// per controller instance in order to parallelize more work across threads if needed.
733pub async fn plotter_service<P>(nats_client: &NatsClient, plotter: &P) -> anyhow::Result<()>
734where
735    P: Plotter + Sync,
736{
737    let plotter_id = ClusterPlotterId::new();
738
739    select! {
740        result = free_instance_responder(&plotter_id, nats_client, plotter).fuse() => {
741            result
742        }
743        result = plot_sector_responder(&plotter_id, nats_client, plotter).fuse() => {
744            result
745        }
746    }
747}
748
749async fn free_instance_responder<P>(
750    plotter_id: &ClusterPlotterId,
751    nats_client: &NatsClient,
752    plotter: &P,
753) -> anyhow::Result<()>
754where
755    P: Plotter + Sync,
756{
757    loop {
758        while !plotter.has_free_capacity().await.unwrap_or_default() {
759            tokio::time::sleep(FREE_CAPACITY_CHECK_INTERVAL).await;
760        }
761
762        let mut subscription = nats_client
763            .queue_subscribe(
764                ClusterPlotterFreeInstanceRequest::SUBJECT,
765                "ab.plotter".to_string(),
766            )
767            .await
768            .map_err(|error| anyhow!("Failed to subscribe to free instance requests: {error}"))?;
769        debug!(?subscription, "Free instance subscription");
770
771        while let Some(message) = subscription.next().await {
772            let Some(reply_subject) = message.reply else {
773                continue;
774            };
775
776            debug!(%reply_subject, "Free instance request");
777
778            let has_free_capacity = plotter.has_free_capacity().await.unwrap_or_default();
779            let response: <ClusterPlotterFreeInstanceRequest as GenericRequest>::Response =
780                has_free_capacity.then(|| plotter_id.to_string());
781
782            if let Err(error) = nats_client
783                .publish(reply_subject, response.encode().into())
784                .await
785            {
786                warn!(%error, "Failed to send free instance response");
787            }
788
789            if !has_free_capacity {
790                subscription.unsubscribe().await.map_err(|error| {
791                    anyhow!("Failed to unsubscribe from free instance requests: {error}")
792                })?;
793            }
794        }
795    }
796}
797
798async fn plot_sector_responder<P>(
799    plotter_id: &ClusterPlotterId,
800    nats_client: &NatsClient,
801    plotter: &P,
802) -> anyhow::Result<()>
803where
804    P: Plotter + Sync,
805{
806    let plotter_id_string = plotter_id.to_string();
807
808    nats_client
809        .stream_request_responder(
810            Some(&plotter_id_string),
811            Some(plotter_id_string.clone()),
812            |request| async move {
813                let (progress_sender, mut progress_receiver) = mpsc::channel(10);
814
815                let fut =
816                    process_plot_sector_request(nats_client, plotter, request, progress_sender);
817                let mut fut = Box::pin(fut.fuse());
818
819                Some(
820                    // Drive above future and stream back any pieces that were downloaded so far
821                    stream::poll_fn(move |cx| {
822                        if !fut.is_terminated() {
823                            // Result doesn't matter, we'll need to poll stream below anyway
824                            let _ = fut.poll_unpin(cx);
825                        }
826
827                        if let Poll::Ready(maybe_result) = progress_receiver.poll_next_unpin(cx) {
828                            return Poll::Ready(maybe_result);
829                        }
830
831                        // Exit will be done by the stream above
832                        Poll::Pending
833                    }),
834                )
835            },
836        )
837        .await
838}
839
840async fn process_plot_sector_request<P>(
841    nats_client: &NatsClient,
842    plotter: &P,
843    request: ClusterPlotterPlotSectorRequest,
844    mut response_proxy_sender: mpsc::Sender<ClusterSectorPlottingProgress>,
845) where
846    P: Plotter,
847{
848    let ClusterPlotterPlotSectorRequest {
849        public_key,
850        shard_commitments_root,
851        sector_index,
852        farmer_protocol_info,
853        pieces_in_sector,
854    } = request;
855
856    // Wrapper future just for instrumentation below
857    let inner_fut = async {
858        info!("Plot sector request");
859
860        let (progress_sender, mut progress_receiver) = mpsc::channel(1);
861
862        if !plotter
863            .try_plot_sector(
864                public_key,
865                shard_commitments_root,
866                sector_index,
867                farmer_protocol_info,
868                pieces_in_sector,
869                false,
870                progress_sender,
871            )
872            .await
873        {
874            debug!("Plotter is currently occupied and can't plot more sectors");
875
876            if let Err(error) = response_proxy_sender
877                .send(ClusterSectorPlottingProgress::Occupied)
878                .await
879            {
880                warn!(%error, "Failed to send plotting progress");
881                return;
882            }
883            return;
884        }
885
886        let progress_proxy_fut = {
887            let mut response_proxy_sender = response_proxy_sender.clone();
888            let approximate_max_message_size = nats_client.approximate_max_message_size();
889
890            async move {
891                while let Some(progress) = progress_receiver.next().await {
892                    send_publish_progress(
893                        &mut response_proxy_sender,
894                        progress,
895                        approximate_max_message_size,
896                    )
897                    .await;
898                }
899            }
900        };
901
902        let mut ping_interval = tokio::time::interval(PING_INTERVAL);
903        ping_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
904        let ping_fut = async {
905            loop {
906                ping_interval.tick().await;
907                if let Err(error) = response_proxy_sender
908                    .send(ClusterSectorPlottingProgress::Ping)
909                    .await
910                {
911                    warn!(%error, "Failed to send plotting ping");
912                    return;
913                }
914            }
915        };
916
917        select! {
918            _ = progress_proxy_fut.fuse() => {
919                // Done
920            }
921            _ = ping_fut.fuse() => {
922                unreachable!("Ping loop never ends");
923            }
924        }
925
926        info!("Finished plotting sector successfully");
927    };
928
929    inner_fut
930        .instrument(info_span!("", %public_key, %sector_index))
931        .await
932}
933
934async fn send_publish_progress(
935    response_sender: &mut mpsc::Sender<ClusterSectorPlottingProgress>,
936    progress: SectorPlottingProgress,
937    approximate_max_message_size: usize,
938) {
939    // Finished response is large and needs special care
940    let cluster_progress = match progress {
941        SectorPlottingProgress::Downloading => ClusterSectorPlottingProgress::Downloading,
942        SectorPlottingProgress::Downloaded(time) => ClusterSectorPlottingProgress::Downloaded(time),
943        SectorPlottingProgress::Encoding => ClusterSectorPlottingProgress::Encoding,
944        SectorPlottingProgress::Encoded(time) => ClusterSectorPlottingProgress::Encoded(time),
945        SectorPlottingProgress::Finished {
946            plotted_sector,
947            time,
948            mut sector,
949        } => {
950            if let Err(error) = response_sender
951                .send(ClusterSectorPlottingProgress::Finished {
952                    plotted_sector,
953                    time,
954                })
955                .await
956            {
957                warn!(%error, "Failed to send plotting progress");
958                return;
959            }
960
961            while let Some(maybe_sector_chunk) = sector.next().await {
962                match maybe_sector_chunk {
963                    Ok(sector_chunk) => {
964                        // Slice large chunks into smaller ones before publishing
965                        for small_sector_chunk in sector_chunk.chunks(approximate_max_message_size)
966                        {
967                            if let Err(error) = response_sender
968                                .send(ClusterSectorPlottingProgress::SectorChunk(Ok(
969                                    sector_chunk.slice_ref(small_sector_chunk)
970                                )))
971                                .await
972                            {
973                                warn!(%error, "Failed to send plotting progress");
974                                return;
975                            }
976                        }
977                    }
978                    Err(error) => {
979                        if let Err(error) = response_sender
980                            .send(ClusterSectorPlottingProgress::SectorChunk(Err(error)))
981                            .await
982                        {
983                            warn!(%error, "Failed to send plotting progress");
984                            return;
985                        }
986                    }
987                }
988            }
989
990            return;
991        }
992        SectorPlottingProgress::Error { error } => ClusterSectorPlottingProgress::Error { error },
993    };
994
995    if let Err(error) = response_sender.send(cluster_progress).await {
996        warn!(%error, "Failed to send plotting progress");
997    }
998}