Skip to main content

ab_farmer/cluster/
farmer.rs

1//! Farming cluster farmer
2//!
3//! Farmer is responsible for maintaining farms, doing audits and generating proofs when solution is
4//! found in one of the plots.
5//!
6//! This module exposes some data structures for NATS communication, custom farm implementation
7//! designed to work with cluster farmer and a service function to drive the backend part
8//! of the farmer.
9
10use crate::cluster::controller::ClusterControllerFarmerIdentifyBroadcast;
11use crate::cluster::nats_client::{
12    GenericBroadcast, GenericRequest, GenericStreamRequest, NatsClient,
13};
14use crate::farm::{
15    Farm, FarmError, FarmId, FarmingNotification, HandlerFn, HandlerId, PieceReader,
16    PlottedSectors, SectorUpdate,
17};
18use crate::utils::AsyncJoinOnDrop;
19use ab_core_primitives::pieces::{Piece, PieceOffset};
20use ab_core_primitives::sectors::SectorIndex;
21use ab_farmer_components::plotting::PlottedSector;
22use ab_farmer_rpc_primitives::SolutionResponse;
23use anyhow::anyhow;
24use async_trait::async_trait;
25use derive_more::{Display, From};
26use event_listener_primitives::Bag;
27use futures::channel::mpsc;
28use futures::stream::FuturesUnordered;
29use futures::{FutureExt, Stream, StreamExt, select, stream};
30use parity_scale_codec::{Decode, Encode, EncodeLike, Input, Output};
31use std::future::Future;
32use std::pin::{Pin, pin};
33use std::sync::Arc;
34use std::time::{Duration, Instant};
35use tokio::time::MissedTickBehavior;
36use tracing::{Instrument, debug, error, info_span, trace, warn};
37use ulid::Ulid;
38
39const BROADCAST_NOTIFICATIONS_BUFFER: usize = 1000;
40const MIN_FARMER_IDENTIFICATION_INTERVAL: Duration = Duration::from_secs(1);
41
42type Handler<A> = Bag<HandlerFn<A>, A>;
43
44/// An identifier for a cluster farmer, can be used for in logs, thread names, etc.
45#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Display, From)]
46pub struct ClusterFarmerId(Ulid);
47
48impl Encode for ClusterFarmerId {
49    #[inline]
50    fn size_hint(&self) -> usize {
51        Encode::size_hint(&self.0.0)
52    }
53
54    #[inline]
55    fn encode_to<O>(&self, dest: &mut O)
56    where
57        O: Output + ?Sized,
58    {
59        Encode::encode_to(&self.0.0, dest);
60    }
61}
62
63impl EncodeLike for ClusterFarmerId {}
64
65impl Decode for ClusterFarmerId {
66    #[inline]
67    fn decode<I>(input: &mut I) -> Result<Self, parity_scale_codec::Error>
68    where
69        I: Input,
70    {
71        u128::decode(input)
72            .map(|ulid| Self(Ulid(ulid)))
73            .map_err(|e| e.chain("Could not decode `ClusterFarmerId.0.0`"))
74    }
75}
76
77#[expect(clippy::new_without_default)]
78impl ClusterFarmerId {
79    /// Create a new cluster farmer ID
80    pub fn new() -> Self {
81        Self(Ulid::new())
82    }
83}
84
85/// Broadcast with cluster farmer id for identification
86#[derive(Debug, Clone, Encode, Decode)]
87pub struct ClusterFarmerIdentifyBroadcast {
88    /// Cluster farmer ID
89    pub farmer_id: ClusterFarmerId,
90}
91
92impl GenericBroadcast for ClusterFarmerIdentifyBroadcast {
93    /// `*` here stands for cluster farmer ID
94    const SUBJECT: &'static str = "ab.farmer.*.farmer-identify";
95}
96
97/// Request farm details from farmer
98#[derive(Debug, Clone, Encode, Decode)]
99pub struct ClusterFarmerFarmDetailsRequest;
100
101impl GenericStreamRequest for ClusterFarmerFarmDetailsRequest {
102    /// `*` here stands for cluster farmer ID
103    const SUBJECT: &'static str = "ab.farmer.*.farm.details";
104    type Response = ClusterFarmerFarmDetails;
105}
106
107/// Farm details
108#[derive(Debug, Clone, Encode, Decode)]
109pub struct ClusterFarmerFarmDetails {
110    /// Farm ID
111    pub farm_id: FarmId,
112    /// Total number of sectors in the farm
113    pub total_sectors_count: u16,
114}
115
116/// Broadcast with sector updates by farmers
117#[derive(Debug, Clone, Encode, Decode)]
118struct ClusterFarmerSectorUpdateBroadcast {
119    /// Farm ID
120    farm_id: FarmId,
121    /// Sector index
122    sector_index: SectorIndex,
123    /// Sector update
124    sector_update: SectorUpdate,
125}
126
127impl GenericBroadcast for ClusterFarmerSectorUpdateBroadcast {
128    /// `*` here stands for single farm ID
129    const SUBJECT: &'static str = "ab.farmer.*.sector-update";
130}
131
132/// Broadcast with farming notifications by farmers
133#[derive(Debug, Clone, Encode, Decode)]
134struct ClusterFarmerFarmingNotificationBroadcast {
135    /// Farm ID
136    farm_id: FarmId,
137    /// Farming notification
138    farming_notification: FarmingNotification,
139}
140
141impl GenericBroadcast for ClusterFarmerFarmingNotificationBroadcast {
142    /// `*` here stands for single farm ID
143    const SUBJECT: &'static str = "ab.farmer.*.farming-notification";
144}
145
146/// Broadcast with solutions by farmers
147#[derive(Debug, Clone, Encode, Decode)]
148struct ClusterFarmerSolutionBroadcast {
149    /// Farm ID
150    farm_id: FarmId,
151    /// Solution response
152    solution_response: SolutionResponse,
153}
154
155impl GenericBroadcast for ClusterFarmerSolutionBroadcast {
156    /// `*` here stands for single farm ID
157    const SUBJECT: &'static str = "ab.farmer.*.solution-response";
158}
159
160/// Read piece from farm
161#[derive(Debug, Clone, Encode, Decode)]
162struct ClusterFarmerReadPieceRequest {
163    sector_index: SectorIndex,
164    piece_offset: PieceOffset,
165}
166
167impl GenericRequest for ClusterFarmerReadPieceRequest {
168    /// `*` here stands for single farm ID
169    const SUBJECT: &'static str = "ab.farmer.*.farm.read-piece";
170    type Response = Result<Option<Piece>, String>;
171}
172
173/// Request plotted sectors from farmer
174#[derive(Debug, Clone, Encode, Decode)]
175struct ClusterFarmerPlottedSectorsRequest;
176
177impl GenericStreamRequest for ClusterFarmerPlottedSectorsRequest {
178    /// `*` here stands for single farm ID
179    const SUBJECT: &'static str = "ab.farmer.*.farm.plotted-sectors";
180    type Response = Result<PlottedSector, String>;
181}
182
183#[derive(Debug)]
184struct ClusterPlottedSectors {
185    farm_id_string: String,
186    nats_client: NatsClient,
187}
188
189#[async_trait]
190impl PlottedSectors for ClusterPlottedSectors {
191    async fn get(
192        &self,
193    ) -> Result<
194        Box<dyn Stream<Item = Result<PlottedSector, FarmError>> + Unpin + Send + '_>,
195        FarmError,
196    > {
197        Ok(Box::new(
198            self.nats_client
199                .stream_request(
200                    &ClusterFarmerPlottedSectorsRequest,
201                    Some(&self.farm_id_string),
202                )
203                .await?
204                .map(|response| response.map_err(FarmError::from)),
205        ))
206    }
207}
208
209#[derive(Debug)]
210struct ClusterPieceReader {
211    farm_id_string: String,
212    nats_client: NatsClient,
213}
214
215#[async_trait]
216impl PieceReader for ClusterPieceReader {
217    async fn read_piece(
218        &self,
219        sector_index: SectorIndex,
220        piece_offset: PieceOffset,
221    ) -> Result<Option<Piece>, FarmError> {
222        Ok(self
223            .nats_client
224            .request(
225                &ClusterFarmerReadPieceRequest {
226                    sector_index,
227                    piece_offset,
228                },
229                Some(&self.farm_id_string),
230            )
231            .await??)
232    }
233}
234
235#[derive(Default, Debug)]
236struct Handlers {
237    sector_update: Handler<(SectorIndex, SectorUpdate)>,
238    farming_notification: Handler<FarmingNotification>,
239    solution: Handler<SolutionResponse>,
240}
241
242/// Cluster farm implementation
243#[derive(Debug)]
244pub struct ClusterFarm {
245    farm_id: FarmId,
246    farm_id_string: String,
247    total_sectors_count: u16,
248    nats_client: NatsClient,
249    handlers: Arc<Handlers>,
250    background_tasks: AsyncJoinOnDrop<()>,
251}
252
253#[async_trait(?Send)]
254impl Farm for ClusterFarm {
255    fn id(&self) -> &FarmId {
256        &self.farm_id
257    }
258
259    fn total_sectors_count(&self) -> u16 {
260        self.total_sectors_count
261    }
262
263    fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static> {
264        Arc::new(ClusterPlottedSectors {
265            farm_id_string: self.farm_id_string.clone(),
266            nats_client: self.nats_client.clone(),
267        })
268    }
269
270    fn piece_reader(&self) -> Arc<dyn PieceReader + 'static> {
271        Arc::new(ClusterPieceReader {
272            farm_id_string: self.farm_id_string.clone(),
273            nats_client: self.nats_client.clone(),
274        })
275    }
276
277    fn on_sector_update(
278        &self,
279        callback: HandlerFn<(SectorIndex, SectorUpdate)>,
280    ) -> Box<dyn HandlerId> {
281        Box::new(self.handlers.sector_update.add(callback))
282    }
283
284    fn on_farming_notification(
285        &self,
286        callback: HandlerFn<FarmingNotification>,
287    ) -> Box<dyn HandlerId> {
288        Box::new(self.handlers.farming_notification.add(callback))
289    }
290
291    fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn HandlerId> {
292        Box::new(self.handlers.solution.add(callback))
293    }
294
295    fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
296        Box::pin((*self).run())
297    }
298}
299
300impl ClusterFarm {
301    /// Create a new instance using information from previously received
302    /// [`ClusterFarmerIdentifyBroadcast`]
303    pub async fn new(
304        farm_id: FarmId,
305        total_sectors_count: u16,
306        nats_client: NatsClient,
307    ) -> anyhow::Result<Self> {
308        let farm_id_string = farm_id.to_string();
309
310        let sector_updates_subscription = nats_client
311            .subscribe_to_broadcasts::<ClusterFarmerSectorUpdateBroadcast>(
312                Some(&farm_id_string),
313                None,
314            )
315            .await
316            .map_err(|error| anyhow!("Failed to subscribe to sector updates broadcast: {error}"))?;
317        let farming_notifications_subscription = nats_client
318            .subscribe_to_broadcasts::<ClusterFarmerFarmingNotificationBroadcast>(
319                Some(&farm_id_string),
320                None,
321            )
322            .await
323            .map_err(|error| {
324                anyhow!("Failed to subscribe to farming notifications broadcast: {error}")
325            })?;
326        let solution_subscription = nats_client
327            .subscribe_to_broadcasts::<ClusterFarmerSolutionBroadcast>(Some(&farm_id_string), None)
328            .await
329            .map_err(|error| {
330                anyhow!("Failed to subscribe to solution responses broadcast: {error}")
331            })?;
332
333        let handlers = Arc::<Handlers>::default();
334        // Run background tasks and fire corresponding notifications
335        let background_tasks = {
336            let handlers = Arc::clone(&handlers);
337
338            async move {
339                let mut sector_updates_subscription = pin!(sector_updates_subscription);
340                let mut farming_notifications_subscription =
341                    pin!(farming_notifications_subscription);
342                let mut solution_subscription = pin!(solution_subscription);
343
344                let sector_updates_fut = async {
345                    while let Some(ClusterFarmerSectorUpdateBroadcast {
346                        sector_index,
347                        sector_update,
348                        ..
349                    }) = sector_updates_subscription.next().await
350                    {
351                        handlers
352                            .sector_update
353                            .call_simple(&(sector_index, sector_update));
354                    }
355                };
356                let farming_notifications_fut = async {
357                    while let Some(ClusterFarmerFarmingNotificationBroadcast {
358                        farming_notification,
359                        ..
360                    }) = farming_notifications_subscription.next().await
361                    {
362                        handlers
363                            .farming_notification
364                            .call_simple(&farming_notification);
365                    }
366                };
367                let solutions_fut = async {
368                    while let Some(ClusterFarmerSolutionBroadcast {
369                        solution_response, ..
370                    }) = solution_subscription.next().await
371                    {
372                        handlers.solution.call_simple(&solution_response);
373                    }
374                };
375
376                select! {
377                    _ = sector_updates_fut.fuse() => {}
378                    _ = farming_notifications_fut.fuse() => {}
379                    _ = solutions_fut.fuse() => {}
380                }
381            }
382        };
383
384        Ok(Self {
385            farm_id,
386            farm_id_string,
387            total_sectors_count,
388            nats_client,
389            handlers,
390            background_tasks: AsyncJoinOnDrop::new(tokio::spawn(background_tasks), true),
391        })
392    }
393
394    /// Run and wait for background tasks to exit or return an error
395    pub async fn run(self) -> anyhow::Result<()> {
396        Ok(self.background_tasks.await?)
397    }
398}
399
400#[derive(Debug)]
401struct FarmDetails {
402    farm_id: FarmId,
403    farm_id_string: String,
404    total_sectors_count: u16,
405    piece_reader: Arc<dyn PieceReader + 'static>,
406    plotted_sectors: Arc<dyn PlottedSectors + 'static>,
407    _background_tasks: Option<AsyncJoinOnDrop<()>>,
408}
409
410/// Create farmer service for specified farms that will be processing incoming requests and send
411/// periodic identify notifications.
412///
413/// Implementation is using concurrency with multiple tokio tasks, but can be started multiple times
414/// per controller instance in order to parallelize more work across threads if needed.
415pub fn farmer_service<F>(
416    nats_client: NatsClient,
417    farms: &[F],
418    identification_broadcast_interval: Duration,
419    primary_instance: bool,
420) -> impl Future<Output = anyhow::Result<()>> + Send + 'static
421where
422    F: Farm,
423{
424    let farmer_id = ClusterFarmerId::new();
425    let farmer_id_string = farmer_id.to_string();
426
427    // For each farm start forwarding notifications as broadcast messages and create farm details
428    // that can be used to respond to incoming requests
429    let farms_details = farms
430        .iter()
431        .map(|farm| {
432            let farm_id = *farm.id();
433            let nats_client = nats_client.clone();
434
435            let background_tasks = if primary_instance {
436                let (sector_updates_sender, mut sector_updates_receiver) =
437                    mpsc::channel(BROADCAST_NOTIFICATIONS_BUFFER);
438                let (farming_notifications_sender, mut farming_notifications_receiver) =
439                    mpsc::channel(BROADCAST_NOTIFICATIONS_BUFFER);
440                let (solutions_sender, mut solutions_receiver) =
441                    mpsc::channel(BROADCAST_NOTIFICATIONS_BUFFER);
442
443                let sector_updates_handler_id =
444                    farm.on_sector_update(Arc::new(move |(sector_index, sector_update)| {
445                        if let Err(error) = sector_updates_sender.clone().try_send(
446                            ClusterFarmerSectorUpdateBroadcast {
447                                farm_id,
448                                sector_index: *sector_index,
449                                sector_update: sector_update.clone(),
450                            },
451                        ) {
452                            warn!(%farm_id, %error, "Failed to send sector update notification");
453                        }
454                    }));
455
456                let farming_notifications_handler_id =
457                    farm.on_farming_notification(Arc::new(move |farming_notification| {
458                        if let Err(error) = farming_notifications_sender.clone().try_send(
459                            ClusterFarmerFarmingNotificationBroadcast {
460                                farm_id,
461                                farming_notification: farming_notification.clone(),
462                            },
463                        ) {
464                            warn!(%farm_id, %error, "Failed to send farming notification");
465                        }
466                    }));
467
468                let solutions_handler_id = farm.on_solution(Arc::new(move |solution_response| {
469                    if let Err(error) =
470                        solutions_sender
471                            .clone()
472                            .try_send(ClusterFarmerSolutionBroadcast {
473                                farm_id,
474                                solution_response: solution_response.clone(),
475                            })
476                    {
477                        warn!(%farm_id, %error, "Failed to send solution notification");
478                    }
479                }));
480
481                Some(AsyncJoinOnDrop::new(
482                    tokio::spawn(async move {
483                        let farm_id_string = farm_id.to_string();
484
485                        let sector_updates_fut = async {
486                            while let Some(broadcast) = sector_updates_receiver.next().await {
487                                if let Err(error) =
488                                    nats_client.broadcast(&broadcast, &farm_id_string).await
489                                {
490                                    warn!(%farm_id, %error, "Failed to broadcast sector update");
491                                }
492                            }
493                        };
494                        let farming_notifications_fut = async {
495                            while let Some(broadcast) = farming_notifications_receiver.next().await
496                            {
497                                if let Err(error) =
498                                    nats_client.broadcast(&broadcast, &farm_id_string).await
499                                {
500                                    warn!(
501                                        %farm_id,
502                                        %error,
503                                        "Failed to broadcast farming notification"
504                                    );
505                                }
506                            }
507                        };
508                        let solutions_fut = async {
509                            while let Some(broadcast) = solutions_receiver.next().await {
510                                if let Err(error) =
511                                    nats_client.broadcast(&broadcast, &farm_id_string).await
512                                {
513                                    warn!(%farm_id, %error, "Failed to broadcast solution");
514                                }
515                            }
516                        };
517
518                        select! {
519                            _ = sector_updates_fut.fuse() => {}
520                            _ = farming_notifications_fut.fuse() => {}
521                            _ = solutions_fut.fuse() => {}
522                        }
523
524                        drop(sector_updates_handler_id);
525                        drop(farming_notifications_handler_id);
526                        drop(solutions_handler_id);
527                    }),
528                    true,
529                ))
530            } else {
531                None
532            };
533
534            FarmDetails {
535                farm_id,
536                farm_id_string: farm_id.to_string(),
537                total_sectors_count: farm.total_sectors_count(),
538                piece_reader: farm.piece_reader(),
539                plotted_sectors: farm.plotted_sectors(),
540                _background_tasks: background_tasks,
541            }
542        })
543        .collect::<Vec<_>>();
544
545    async move {
546        if primary_instance {
547            select! {
548                result = identify_responder(
549                    &nats_client,
550                    farmer_id,
551                    &farmer_id_string,
552                    identification_broadcast_interval
553                ).fuse() => {
554                    result
555                },
556                result = farms_details_responder(
557                    &nats_client,
558                    &farmer_id_string,
559                    &farms_details
560                ).fuse() => {
561                    result
562                },
563                result = plotted_sectors_responder(&nats_client, &farms_details).fuse() => {
564                    result
565                },
566                result = read_piece_responder(&nats_client, &farms_details).fuse() => {
567                    result
568                },
569            }
570        } else {
571            select! {
572                result = plotted_sectors_responder(&nats_client, &farms_details).fuse() => {
573                    result
574                },
575                result = read_piece_responder(&nats_client, &farms_details).fuse() => {
576                    result
577                },
578            }
579        }
580    }
581}
582
583/// Listen for farmer identification broadcast from controller and publish identification
584/// broadcast in response, also send periodic notifications reminding that farm exists
585async fn identify_responder(
586    nats_client: &NatsClient,
587    farmer_id: ClusterFarmerId,
588    farmer_id_string: &str,
589    identification_broadcast_interval: Duration,
590) -> anyhow::Result<()> {
591    let mut subscription = nats_client
592        .subscribe_to_broadcasts::<ClusterControllerFarmerIdentifyBroadcast>(None, None)
593        .await
594        .map_err(|error| {
595            anyhow!("Failed to subscribe to farmer identify broadcast requests: {error}")
596        })?
597        .fuse();
598
599    // Also send periodic updates in addition to the subscription response
600    let mut interval = tokio::time::interval(identification_broadcast_interval);
601    interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
602
603    let mut last_identification = Instant::now();
604
605    loop {
606        select! {
607            maybe_message = subscription.next() => {
608                let Some(message) = maybe_message else {
609                    debug!("Identify broadcast stream ended");
610                    break;
611                };
612
613                trace!(?message, "Farmer received identify broadcast message");
614
615                if last_identification.elapsed() < MIN_FARMER_IDENTIFICATION_INTERVAL {
616                    // Skip too frequent identification requests
617                    continue;
618                }
619
620                last_identification = Instant::now();
621                send_identify_broadcast(nats_client, farmer_id, farmer_id_string).await;
622                interval.reset();
623            }
624            _ = interval.tick().fuse() => {
625                last_identification = Instant::now();
626                trace!("Farmer self-identification");
627
628                send_identify_broadcast(nats_client, farmer_id, farmer_id_string).await;
629            }
630        }
631    }
632
633    Ok(())
634}
635
636async fn send_identify_broadcast(
637    nats_client: &NatsClient,
638    farmer_id: ClusterFarmerId,
639    farmer_id_string: &str,
640) {
641    if let Err(error) = nats_client
642        .broadcast(&new_identify_message(farmer_id), farmer_id_string)
643        .await
644    {
645        warn!(%farmer_id, %error, "Failed to send farmer identify notification");
646    }
647}
648
649fn new_identify_message(farmer_id: ClusterFarmerId) -> ClusterFarmerIdentifyBroadcast {
650    ClusterFarmerIdentifyBroadcast { farmer_id }
651}
652
653async fn farms_details_responder(
654    nats_client: &NatsClient,
655    farmer_id_string: &str,
656    farms_details: &[FarmDetails],
657) -> anyhow::Result<()> {
658    nats_client
659        .stream_request_responder(
660            Some(farmer_id_string),
661            Some(farmer_id_string.to_string()),
662            |_request: ClusterFarmerFarmDetailsRequest| async {
663                Some(stream::iter(farms_details.iter().map(|farm_details| {
664                    ClusterFarmerFarmDetails {
665                        farm_id: farm_details.farm_id,
666                        total_sectors_count: farm_details.total_sectors_count,
667                    }
668                })))
669            },
670        )
671        .await
672}
673
674async fn plotted_sectors_responder(
675    nats_client: &NatsClient,
676    farms_details: &[FarmDetails],
677) -> anyhow::Result<()> {
678    farms_details
679        .iter()
680        .map(|farm_details| async move {
681            nats_client
682                .stream_request_responder::<_, _, Pin<Box<dyn Stream<Item = _> + Send>>, _>(
683                    Some(&farm_details.farm_id_string),
684                    Some(farm_details.farm_id_string.clone()),
685                    |_request: ClusterFarmerPlottedSectorsRequest| async move {
686                        Some(match farm_details.plotted_sectors.get().await {
687                            Ok(plotted_sectors) => {
688                                Box::pin(plotted_sectors.map(|maybe_plotted_sector| {
689                                    maybe_plotted_sector.map_err(|error| error.to_string())
690                                })) as _
691                            }
692                            Err(error) => {
693                                error!(
694                                    %error,
695                                    farm_id = %farm_details.farm_id,
696                                    "Failed to get plotted sectors"
697                                );
698
699                                Box::pin(stream::once(async move {
700                                    Err(format!("Failed to get plotted sectors: {error}"))
701                                })) as _
702                            }
703                        })
704                    },
705                )
706                .instrument(info_span!("", cache_id = %farm_details.farm_id))
707                .await
708        })
709        .collect::<FuturesUnordered<_>>()
710        .next()
711        .await
712        .ok_or_else(|| anyhow!("No farms"))?
713}
714
715async fn read_piece_responder(
716    nats_client: &NatsClient,
717    farms_details: &[FarmDetails],
718) -> anyhow::Result<()> {
719    farms_details
720        .iter()
721        .map(|farm_details| async move {
722            nats_client
723                .request_responder(
724                    Some(farm_details.farm_id_string.as_str()),
725                    Some(farm_details.farm_id_string.clone()),
726                    |request: ClusterFarmerReadPieceRequest| async move {
727                        Some(
728                            farm_details
729                                .piece_reader
730                                .read_piece(request.sector_index, request.piece_offset)
731                                .await
732                                .map_err(|error| error.to_string()),
733                        )
734                    },
735                )
736                .instrument(info_span!("", cache_id = %farm_details.farm_id))
737                .await
738        })
739        .collect::<FuturesUnordered<_>>()
740        .next()
741        .await
742        .ok_or_else(|| anyhow!("No farms"))?
743}