Skip to main content

ab_farmer/cluster/
farmer.rs

1//! Farming cluster farmer
2//!
3//! Farmer is responsible for maintaining farms, doing audits and generating proofs when solution is
4//! found in one of the plots.
5//!
6//! This module exposes some data structures for NATS communication, custom farm implementation
7//! designed to work with cluster farmer and a service function to drive the backend part
8//! of the farmer.
9
10use crate::cluster::controller::ClusterControllerFarmerIdentifyBroadcast;
11use crate::cluster::nats_client::{
12    GenericBroadcast, GenericRequest, GenericStreamRequest, NatsClient,
13};
14use crate::farm::{
15    Farm, FarmError, FarmId, FarmingNotification, HandlerFn, HandlerId, PieceReader,
16    PlottedSectors, SectorUpdate,
17};
18use crate::utils::AsyncJoinOnDrop;
19use ab_core_primitives::pieces::{Piece, PieceOffset};
20use ab_core_primitives::sectors::SectorIndex;
21use ab_farmer_components::plotting::PlottedSector;
22use ab_farmer_rpc_primitives::SolutionResponse;
23use anyhow::anyhow;
24use async_trait::async_trait;
25use derive_more::{Display, From};
26use event_listener_primitives::Bag;
27use futures::channel::mpsc;
28use futures::stream::FuturesUnordered;
29use futures::{FutureExt, Stream, StreamExt, select, stream};
30use parity_scale_codec::{Decode, Encode, EncodeLike, Input, Output};
31use std::future::Future;
32use std::pin::{Pin, pin};
33use std::sync::Arc;
34use std::time::{Duration, Instant};
35use tokio::time::MissedTickBehavior;
36use tracing::{Instrument, debug, error, info_span, trace, warn};
37use ulid::Ulid;
38
39const BROADCAST_NOTIFICATIONS_BUFFER: usize = 1000;
40const MIN_FARMER_IDENTIFICATION_INTERVAL: Duration = Duration::from_secs(1);
41
42type Handler<A> = Bag<HandlerFn<A>, A>;
43
44/// An identifier for a cluster farmer, can be used for in logs, thread names, etc.
45#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Display, From)]
46pub struct ClusterFarmerId(Ulid);
47
48impl Encode for ClusterFarmerId {
49    #[inline]
50    fn size_hint(&self) -> usize {
51        Encode::size_hint(&self.0.0)
52    }
53
54    #[inline]
55    fn encode_to<O>(&self, dest: &mut O)
56    where
57        O: Output + ?Sized,
58    {
59        Encode::encode_to(&self.0.0, dest);
60    }
61}
62
63impl EncodeLike for ClusterFarmerId {}
64
65impl Decode for ClusterFarmerId {
66    #[inline]
67    fn decode<I>(input: &mut I) -> Result<Self, parity_scale_codec::Error>
68    where
69        I: Input,
70    {
71        u128::decode(input)
72            .map(|ulid| Self(Ulid(ulid)))
73            .map_err(|e| e.chain("Could not decode `ClusterFarmerId.0.0`"))
74    }
75}
76
77#[expect(
78    clippy::new_without_default,
79    reason = "Default has different semantics"
80)]
81impl ClusterFarmerId {
82    /// Create a new cluster farmer ID
83    pub fn new() -> Self {
84        Self(Ulid::new())
85    }
86}
87
88/// Broadcast with cluster farmer id for identification
89#[derive(Debug, Clone, Encode, Decode)]
90pub struct ClusterFarmerIdentifyBroadcast {
91    /// Cluster farmer ID
92    pub farmer_id: ClusterFarmerId,
93}
94
95impl GenericBroadcast for ClusterFarmerIdentifyBroadcast {
96    /// `*` here stands for cluster farmer ID
97    const SUBJECT: &'static str = "ab.farmer.*.farmer-identify";
98}
99
100/// Request farm details from farmer
101#[derive(Debug, Clone, Encode, Decode)]
102pub struct ClusterFarmerFarmDetailsRequest;
103
104impl GenericStreamRequest for ClusterFarmerFarmDetailsRequest {
105    /// `*` here stands for cluster farmer ID
106    const SUBJECT: &'static str = "ab.farmer.*.farm.details";
107    type Response = ClusterFarmerFarmDetails;
108}
109
110/// Farm details
111#[derive(Debug, Clone, Encode, Decode)]
112pub struct ClusterFarmerFarmDetails {
113    /// Farm ID
114    pub farm_id: FarmId,
115    /// Total number of sectors in the farm
116    pub total_sectors_count: u16,
117}
118
119/// Broadcast with sector updates by farmers
120#[derive(Debug, Clone, Encode, Decode)]
121struct ClusterFarmerSectorUpdateBroadcast {
122    /// Farm ID
123    farm_id: FarmId,
124    /// Sector index
125    sector_index: SectorIndex,
126    /// Sector update
127    sector_update: SectorUpdate,
128}
129
130impl GenericBroadcast for ClusterFarmerSectorUpdateBroadcast {
131    /// `*` here stands for single farm ID
132    const SUBJECT: &'static str = "ab.farmer.*.sector-update";
133}
134
135/// Broadcast with farming notifications by farmers
136#[derive(Debug, Clone, Encode, Decode)]
137struct ClusterFarmerFarmingNotificationBroadcast {
138    /// Farm ID
139    farm_id: FarmId,
140    /// Farming notification
141    farming_notification: FarmingNotification,
142}
143
144impl GenericBroadcast for ClusterFarmerFarmingNotificationBroadcast {
145    /// `*` here stands for single farm ID
146    const SUBJECT: &'static str = "ab.farmer.*.farming-notification";
147}
148
149/// Broadcast with solutions by farmers
150#[derive(Debug, Clone, Encode, Decode)]
151struct ClusterFarmerSolutionBroadcast {
152    /// Farm ID
153    farm_id: FarmId,
154    /// Solution response
155    solution_response: SolutionResponse,
156}
157
158impl GenericBroadcast for ClusterFarmerSolutionBroadcast {
159    /// `*` here stands for single farm ID
160    const SUBJECT: &'static str = "ab.farmer.*.solution-response";
161}
162
163/// Read piece from farm
164#[derive(Debug, Clone, Encode, Decode)]
165struct ClusterFarmerReadPieceRequest {
166    sector_index: SectorIndex,
167    piece_offset: PieceOffset,
168}
169
170impl GenericRequest for ClusterFarmerReadPieceRequest {
171    /// `*` here stands for single farm ID
172    const SUBJECT: &'static str = "ab.farmer.*.farm.read-piece";
173    type Response = Result<Option<Piece>, String>;
174}
175
176/// Request plotted sectors from farmer
177#[derive(Debug, Clone, Encode, Decode)]
178struct ClusterFarmerPlottedSectorsRequest;
179
180impl GenericStreamRequest for ClusterFarmerPlottedSectorsRequest {
181    /// `*` here stands for single farm ID
182    const SUBJECT: &'static str = "ab.farmer.*.farm.plotted-sectors";
183    type Response = Result<PlottedSector, String>;
184}
185
186#[derive(Debug)]
187struct ClusterPlottedSectors {
188    farm_id_string: String,
189    nats_client: NatsClient,
190}
191
192#[async_trait]
193impl PlottedSectors for ClusterPlottedSectors {
194    async fn get(
195        &self,
196    ) -> Result<
197        Box<dyn Stream<Item = Result<PlottedSector, FarmError>> + Unpin + Send + '_>,
198        FarmError,
199    > {
200        Ok(Box::new(
201            self.nats_client
202                .stream_request(
203                    &ClusterFarmerPlottedSectorsRequest,
204                    Some(&self.farm_id_string),
205                )
206                .await?
207                .map(|response| response.map_err(FarmError::from)),
208        ))
209    }
210}
211
212#[derive(Debug)]
213struct ClusterPieceReader {
214    farm_id_string: String,
215    nats_client: NatsClient,
216}
217
218#[async_trait]
219impl PieceReader for ClusterPieceReader {
220    async fn read_piece(
221        &self,
222        sector_index: SectorIndex,
223        piece_offset: PieceOffset,
224    ) -> Result<Option<Piece>, FarmError> {
225        Ok(self
226            .nats_client
227            .request(
228                &ClusterFarmerReadPieceRequest {
229                    sector_index,
230                    piece_offset,
231                },
232                Some(&self.farm_id_string),
233            )
234            .await??)
235    }
236}
237
238#[derive(Default, Debug)]
239struct Handlers {
240    sector_update: Handler<(SectorIndex, SectorUpdate)>,
241    farming_notification: Handler<FarmingNotification>,
242    solution: Handler<SolutionResponse>,
243}
244
245/// Cluster farm implementation
246#[derive(Debug)]
247pub struct ClusterFarm {
248    farm_id: FarmId,
249    farm_id_string: String,
250    total_sectors_count: u16,
251    nats_client: NatsClient,
252    handlers: Arc<Handlers>,
253    background_tasks: AsyncJoinOnDrop<()>,
254}
255
256#[async_trait(?Send)]
257impl Farm for ClusterFarm {
258    fn id(&self) -> &FarmId {
259        &self.farm_id
260    }
261
262    fn total_sectors_count(&self) -> u16 {
263        self.total_sectors_count
264    }
265
266    fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static> {
267        Arc::new(ClusterPlottedSectors {
268            farm_id_string: self.farm_id_string.clone(),
269            nats_client: self.nats_client.clone(),
270        })
271    }
272
273    fn piece_reader(&self) -> Arc<dyn PieceReader + 'static> {
274        Arc::new(ClusterPieceReader {
275            farm_id_string: self.farm_id_string.clone(),
276            nats_client: self.nats_client.clone(),
277        })
278    }
279
280    fn on_sector_update(
281        &self,
282        callback: HandlerFn<(SectorIndex, SectorUpdate)>,
283    ) -> Box<dyn HandlerId> {
284        Box::new(self.handlers.sector_update.add(callback))
285    }
286
287    fn on_farming_notification(
288        &self,
289        callback: HandlerFn<FarmingNotification>,
290    ) -> Box<dyn HandlerId> {
291        Box::new(self.handlers.farming_notification.add(callback))
292    }
293
294    fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn HandlerId> {
295        Box::new(self.handlers.solution.add(callback))
296    }
297
298    fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
299        Box::pin((*self).run())
300    }
301}
302
303impl ClusterFarm {
304    /// Create a new instance using information from previously received
305    /// [`ClusterFarmerIdentifyBroadcast`]
306    pub async fn new(
307        farm_id: FarmId,
308        total_sectors_count: u16,
309        nats_client: NatsClient,
310    ) -> anyhow::Result<Self> {
311        let farm_id_string = farm_id.to_string();
312
313        let sector_updates_subscription = nats_client
314            .subscribe_to_broadcasts::<ClusterFarmerSectorUpdateBroadcast>(
315                Some(&farm_id_string),
316                None,
317            )
318            .await
319            .map_err(|error| anyhow!("Failed to subscribe to sector updates broadcast: {error}"))?;
320        let farming_notifications_subscription = nats_client
321            .subscribe_to_broadcasts::<ClusterFarmerFarmingNotificationBroadcast>(
322                Some(&farm_id_string),
323                None,
324            )
325            .await
326            .map_err(|error| {
327                anyhow!("Failed to subscribe to farming notifications broadcast: {error}")
328            })?;
329        let solution_subscription = nats_client
330            .subscribe_to_broadcasts::<ClusterFarmerSolutionBroadcast>(Some(&farm_id_string), None)
331            .await
332            .map_err(|error| {
333                anyhow!("Failed to subscribe to solution responses broadcast: {error}")
334            })?;
335
336        let handlers = Arc::<Handlers>::default();
337        // Run background tasks and fire corresponding notifications
338        let background_tasks = {
339            let handlers = Arc::clone(&handlers);
340
341            async move {
342                let mut sector_updates_subscription = pin!(sector_updates_subscription);
343                let mut farming_notifications_subscription =
344                    pin!(farming_notifications_subscription);
345                let mut solution_subscription = pin!(solution_subscription);
346
347                let sector_updates_fut = async {
348                    while let Some(ClusterFarmerSectorUpdateBroadcast {
349                        sector_index,
350                        sector_update,
351                        ..
352                    }) = sector_updates_subscription.next().await
353                    {
354                        handlers
355                            .sector_update
356                            .call_simple(&(sector_index, sector_update));
357                    }
358                };
359                let farming_notifications_fut = async {
360                    while let Some(ClusterFarmerFarmingNotificationBroadcast {
361                        farming_notification,
362                        ..
363                    }) = farming_notifications_subscription.next().await
364                    {
365                        handlers
366                            .farming_notification
367                            .call_simple(&farming_notification);
368                    }
369                };
370                let solutions_fut = async {
371                    while let Some(ClusterFarmerSolutionBroadcast {
372                        solution_response, ..
373                    }) = solution_subscription.next().await
374                    {
375                        handlers.solution.call_simple(&solution_response);
376                    }
377                };
378
379                select! {
380                    () = sector_updates_fut.fuse() => {}
381                    () = farming_notifications_fut.fuse() => {}
382                    () = solutions_fut.fuse() => {}
383                }
384            }
385        };
386
387        Ok(Self {
388            farm_id,
389            farm_id_string,
390            total_sectors_count,
391            nats_client,
392            handlers,
393            background_tasks: AsyncJoinOnDrop::new(tokio::spawn(background_tasks), true),
394        })
395    }
396
397    /// Run and wait for background tasks to exit or return an error
398    pub async fn run(self) -> anyhow::Result<()> {
399        Ok(self.background_tasks.await?)
400    }
401}
402
403#[derive(Debug)]
404struct FarmDetails {
405    farm_id: FarmId,
406    farm_id_string: String,
407    total_sectors_count: u16,
408    piece_reader: Arc<dyn PieceReader + 'static>,
409    plotted_sectors: Arc<dyn PlottedSectors + 'static>,
410    _background_tasks: Option<AsyncJoinOnDrop<()>>,
411}
412
413/// Create farmer service for specified farms that will be processing incoming requests and send
414/// periodic identify notifications.
415///
416/// Implementation is using concurrency with multiple tokio tasks, but can be started multiple times
417/// per controller instance in order to parallelize more work across threads if needed.
418pub fn farmer_service<F>(
419    nats_client: NatsClient,
420    farms: &[F],
421    identification_broadcast_interval: Duration,
422    primary_instance: bool,
423) -> impl Future<Output = anyhow::Result<()>> + Send + 'static
424where
425    F: Farm,
426{
427    let farmer_id = ClusterFarmerId::new();
428    let farmer_id_string = farmer_id.to_string();
429
430    // For each farm start forwarding notifications as broadcast messages and create farm details
431    // that can be used to respond to incoming requests
432    let farms_details = farms
433        .iter()
434        .map(|farm| {
435            let farm_id = *farm.id();
436            let nats_client = nats_client.clone();
437
438            let background_tasks = if primary_instance {
439                let (sector_updates_sender, mut sector_updates_receiver) =
440                    mpsc::channel(BROADCAST_NOTIFICATIONS_BUFFER);
441                let (farming_notifications_sender, mut farming_notifications_receiver) =
442                    mpsc::channel(BROADCAST_NOTIFICATIONS_BUFFER);
443                let (solutions_sender, mut solutions_receiver) =
444                    mpsc::channel(BROADCAST_NOTIFICATIONS_BUFFER);
445
446                let sector_updates_handler_id =
447                    farm.on_sector_update(Arc::new(move |(sector_index, sector_update)| {
448                        if let Err(error) = sector_updates_sender.clone().try_send(
449                            ClusterFarmerSectorUpdateBroadcast {
450                                farm_id,
451                                sector_index: *sector_index,
452                                sector_update: sector_update.clone(),
453                            },
454                        ) {
455                            warn!(%farm_id, %error, "Failed to send sector update notification");
456                        }
457                    }));
458
459                let farming_notifications_handler_id =
460                    farm.on_farming_notification(Arc::new(move |farming_notification| {
461                        if let Err(error) = farming_notifications_sender.clone().try_send(
462                            ClusterFarmerFarmingNotificationBroadcast {
463                                farm_id,
464                                farming_notification: farming_notification.clone(),
465                            },
466                        ) {
467                            warn!(%farm_id, %error, "Failed to send farming notification");
468                        }
469                    }));
470
471                let solutions_handler_id = farm.on_solution(Arc::new(move |solution_response| {
472                    if let Err(error) =
473                        solutions_sender
474                            .clone()
475                            .try_send(ClusterFarmerSolutionBroadcast {
476                                farm_id,
477                                solution_response: solution_response.clone(),
478                            })
479                    {
480                        warn!(%farm_id, %error, "Failed to send solution notification");
481                    }
482                }));
483
484                Some(AsyncJoinOnDrop::new(
485                    tokio::spawn(async move {
486                        let farm_id_string = farm_id.to_string();
487
488                        let sector_updates_fut = async {
489                            while let Some(broadcast) = sector_updates_receiver.next().await {
490                                if let Err(error) =
491                                    nats_client.broadcast(&broadcast, &farm_id_string).await
492                                {
493                                    warn!(%farm_id, %error, "Failed to broadcast sector update");
494                                }
495                            }
496                        };
497                        let farming_notifications_fut = async {
498                            while let Some(broadcast) = farming_notifications_receiver.next().await
499                            {
500                                if let Err(error) =
501                                    nats_client.broadcast(&broadcast, &farm_id_string).await
502                                {
503                                    warn!(
504                                        %farm_id,
505                                        %error,
506                                        "Failed to broadcast farming notification"
507                                    );
508                                }
509                            }
510                        };
511                        let solutions_fut = async {
512                            while let Some(broadcast) = solutions_receiver.next().await {
513                                if let Err(error) =
514                                    nats_client.broadcast(&broadcast, &farm_id_string).await
515                                {
516                                    warn!(%farm_id, %error, "Failed to broadcast solution");
517                                }
518                            }
519                        };
520
521                        select! {
522                            () = sector_updates_fut.fuse() => {}
523                            () = farming_notifications_fut.fuse() => {}
524                            () = solutions_fut.fuse() => {}
525                        }
526
527                        drop(sector_updates_handler_id);
528                        drop(farming_notifications_handler_id);
529                        drop(solutions_handler_id);
530                    }),
531                    true,
532                ))
533            } else {
534                None
535            };
536
537            FarmDetails {
538                farm_id,
539                farm_id_string: farm_id.to_string(),
540                total_sectors_count: farm.total_sectors_count(),
541                piece_reader: farm.piece_reader(),
542                plotted_sectors: farm.plotted_sectors(),
543                _background_tasks: background_tasks,
544            }
545        })
546        .collect::<Vec<_>>();
547
548    async move {
549        if primary_instance {
550            select! {
551                result = identify_responder(
552                    &nats_client,
553                    farmer_id,
554                    &farmer_id_string,
555                    identification_broadcast_interval
556                ).fuse() => {
557                    result
558                },
559                result = farms_details_responder(
560                    &nats_client,
561                    &farmer_id_string,
562                    &farms_details
563                ).fuse() => {
564                    result
565                },
566                result = plotted_sectors_responder(&nats_client, &farms_details).fuse() => {
567                    result
568                },
569                result = read_piece_responder(&nats_client, &farms_details).fuse() => {
570                    result
571                },
572            }
573        } else {
574            select! {
575                result = plotted_sectors_responder(&nats_client, &farms_details).fuse() => {
576                    result
577                },
578                result = read_piece_responder(&nats_client, &farms_details).fuse() => {
579                    result
580                },
581            }
582        }
583    }
584}
585
586/// Listen for farmer identification broadcast from controller and publish identification
587/// broadcast in response, also send periodic notifications reminding that farm exists
588async fn identify_responder(
589    nats_client: &NatsClient,
590    farmer_id: ClusterFarmerId,
591    farmer_id_string: &str,
592    identification_broadcast_interval: Duration,
593) -> anyhow::Result<()> {
594    let mut subscription = nats_client
595        .subscribe_to_broadcasts::<ClusterControllerFarmerIdentifyBroadcast>(None, None)
596        .await
597        .map_err(|error| {
598            anyhow!("Failed to subscribe to farmer identify broadcast requests: {error}")
599        })?
600        .fuse();
601
602    // Also send periodic updates in addition to the subscription response
603    let mut interval = tokio::time::interval(identification_broadcast_interval);
604    interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
605
606    let mut last_identification = Instant::now();
607
608    loop {
609        select! {
610            maybe_message = subscription.next() => {
611                let Some(message) = maybe_message else {
612                    debug!("Identify broadcast stream ended");
613                    break;
614                };
615
616                trace!(?message, "Farmer received identify broadcast message");
617
618                if last_identification.elapsed() < MIN_FARMER_IDENTIFICATION_INTERVAL {
619                    // Skip too frequent identification requests
620                    continue;
621                }
622
623                last_identification = Instant::now();
624                send_identify_broadcast(nats_client, farmer_id, farmer_id_string).await;
625                interval.reset();
626            }
627            _ = interval.tick().fuse() => {
628                last_identification = Instant::now();
629                trace!("Farmer self-identification");
630
631                send_identify_broadcast(nats_client, farmer_id, farmer_id_string).await;
632            }
633        }
634    }
635
636    Ok(())
637}
638
639async fn send_identify_broadcast(
640    nats_client: &NatsClient,
641    farmer_id: ClusterFarmerId,
642    farmer_id_string: &str,
643) {
644    if let Err(error) = nats_client
645        .broadcast(&new_identify_message(farmer_id), farmer_id_string)
646        .await
647    {
648        warn!(%farmer_id, %error, "Failed to send farmer identify notification");
649    }
650}
651
652fn new_identify_message(farmer_id: ClusterFarmerId) -> ClusterFarmerIdentifyBroadcast {
653    ClusterFarmerIdentifyBroadcast { farmer_id }
654}
655
656async fn farms_details_responder(
657    nats_client: &NatsClient,
658    farmer_id_string: &str,
659    farms_details: &[FarmDetails],
660) -> anyhow::Result<()> {
661    nats_client
662        .stream_request_responder(
663            Some(farmer_id_string),
664            Some(farmer_id_string.to_string()),
665            |_request: ClusterFarmerFarmDetailsRequest| async {
666                Some(stream::iter(farms_details.iter().map(|farm_details| {
667                    ClusterFarmerFarmDetails {
668                        farm_id: farm_details.farm_id,
669                        total_sectors_count: farm_details.total_sectors_count,
670                    }
671                })))
672            },
673        )
674        .await
675}
676
677async fn plotted_sectors_responder(
678    nats_client: &NatsClient,
679    farms_details: &[FarmDetails],
680) -> anyhow::Result<()> {
681    farms_details
682        .iter()
683        .map(|farm_details| async move {
684            nats_client
685                .stream_request_responder::<_, _, Pin<Box<dyn Stream<Item = _> + Send>>, _>(
686                    Some(&farm_details.farm_id_string),
687                    Some(farm_details.farm_id_string.clone()),
688                    |_request: ClusterFarmerPlottedSectorsRequest| async move {
689                        Some(match farm_details.plotted_sectors.get().await {
690                            Ok(plotted_sectors) => {
691                                Box::pin(plotted_sectors.map(|maybe_plotted_sector| {
692                                    maybe_plotted_sector.map_err(|error| error.to_string())
693                                })) as _
694                            }
695                            Err(error) => {
696                                error!(
697                                    %error,
698                                    farm_id = %farm_details.farm_id,
699                                    "Failed to get plotted sectors"
700                                );
701
702                                Box::pin(stream::once(async move {
703                                    Err(format!("Failed to get plotted sectors: {error}"))
704                                })) as _
705                            }
706                        })
707                    },
708                )
709                .instrument(info_span!("", cache_id = %farm_details.farm_id))
710                .await
711        })
712        .collect::<FuturesUnordered<_>>()
713        .next()
714        .await
715        .ok_or_else(|| anyhow!("No farms"))?
716}
717
718async fn read_piece_responder(
719    nats_client: &NatsClient,
720    farms_details: &[FarmDetails],
721) -> anyhow::Result<()> {
722    farms_details
723        .iter()
724        .map(|farm_details| async move {
725            nats_client
726                .request_responder(
727                    Some(farm_details.farm_id_string.as_str()),
728                    Some(farm_details.farm_id_string.clone()),
729                    |request: ClusterFarmerReadPieceRequest| async move {
730                        Some(
731                            farm_details
732                                .piece_reader
733                                .read_piece(request.sector_index, request.piece_offset)
734                                .await
735                                .map_err(|error| error.to_string()),
736                        )
737                    },
738                )
739                .instrument(info_span!("", cache_id = %farm_details.farm_id))
740                .await
741        })
742        .collect::<FuturesUnordered<_>>()
743        .next()
744        .await
745        .ok_or_else(|| anyhow!("No farms"))?
746}