1use crate::cluster::controller::ClusterControllerFarmerIdentifyBroadcast;
11use crate::cluster::nats_client::{
12 GenericBroadcast, GenericRequest, GenericStreamRequest, NatsClient,
13};
14use crate::farm::{
15 Farm, FarmError, FarmId, FarmingNotification, HandlerFn, HandlerId, PieceReader,
16 PlottedSectors, SectorUpdate,
17};
18use crate::utils::AsyncJoinOnDrop;
19use ab_core_primitives::pieces::{Piece, PieceOffset};
20use ab_core_primitives::sectors::SectorIndex;
21use ab_farmer_components::plotting::PlottedSector;
22use ab_farmer_rpc_primitives::SolutionResponse;
23use anyhow::anyhow;
24use async_trait::async_trait;
25use derive_more::{Display, From};
26use event_listener_primitives::Bag;
27use futures::channel::mpsc;
28use futures::stream::FuturesUnordered;
29use futures::{FutureExt, Stream, StreamExt, select, stream};
30use parity_scale_codec::{Decode, Encode, EncodeLike, Input, Output};
31use std::future::Future;
32use std::pin::{Pin, pin};
33use std::sync::Arc;
34use std::time::{Duration, Instant};
35use tokio::time::MissedTickBehavior;
36use tracing::{Instrument, debug, error, info_span, trace, warn};
37use ulid::Ulid;
38
39const BROADCAST_NOTIFICATIONS_BUFFER: usize = 1000;
40const MIN_FARMER_IDENTIFICATION_INTERVAL: Duration = Duration::from_secs(1);
41
42type Handler<A> = Bag<HandlerFn<A>, A>;
43
44#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Display, From)]
46pub struct ClusterFarmerId(Ulid);
47
48impl Encode for ClusterFarmerId {
49 #[inline]
50 fn size_hint(&self) -> usize {
51 Encode::size_hint(&self.0.0)
52 }
53
54 #[inline]
55 fn encode_to<O>(&self, dest: &mut O)
56 where
57 O: Output + ?Sized,
58 {
59 Encode::encode_to(&self.0.0, dest);
60 }
61}
62
63impl EncodeLike for ClusterFarmerId {}
64
65impl Decode for ClusterFarmerId {
66 #[inline]
67 fn decode<I>(input: &mut I) -> Result<Self, parity_scale_codec::Error>
68 where
69 I: Input,
70 {
71 u128::decode(input)
72 .map(|ulid| Self(Ulid(ulid)))
73 .map_err(|e| e.chain("Could not decode `ClusterFarmerId.0.0`"))
74 }
75}
76
77#[expect(clippy::new_without_default)]
78impl ClusterFarmerId {
79 pub fn new() -> Self {
81 Self(Ulid::new())
82 }
83}
84
85#[derive(Debug, Clone, Encode, Decode)]
87pub struct ClusterFarmerIdentifyBroadcast {
88 pub farmer_id: ClusterFarmerId,
90}
91
92impl GenericBroadcast for ClusterFarmerIdentifyBroadcast {
93 const SUBJECT: &'static str = "ab.farmer.*.farmer-identify";
95}
96
97#[derive(Debug, Clone, Encode, Decode)]
99pub struct ClusterFarmerFarmDetailsRequest;
100
101impl GenericStreamRequest for ClusterFarmerFarmDetailsRequest {
102 const SUBJECT: &'static str = "ab.farmer.*.farm.details";
104 type Response = ClusterFarmerFarmDetails;
105}
106
107#[derive(Debug, Clone, Encode, Decode)]
109pub struct ClusterFarmerFarmDetails {
110 pub farm_id: FarmId,
112 pub total_sectors_count: u16,
114}
115
116#[derive(Debug, Clone, Encode, Decode)]
118struct ClusterFarmerSectorUpdateBroadcast {
119 farm_id: FarmId,
121 sector_index: SectorIndex,
123 sector_update: SectorUpdate,
125}
126
127impl GenericBroadcast for ClusterFarmerSectorUpdateBroadcast {
128 const SUBJECT: &'static str = "ab.farmer.*.sector-update";
130}
131
132#[derive(Debug, Clone, Encode, Decode)]
134struct ClusterFarmerFarmingNotificationBroadcast {
135 farm_id: FarmId,
137 farming_notification: FarmingNotification,
139}
140
141impl GenericBroadcast for ClusterFarmerFarmingNotificationBroadcast {
142 const SUBJECT: &'static str = "ab.farmer.*.farming-notification";
144}
145
146#[derive(Debug, Clone, Encode, Decode)]
148struct ClusterFarmerSolutionBroadcast {
149 farm_id: FarmId,
151 solution_response: SolutionResponse,
153}
154
155impl GenericBroadcast for ClusterFarmerSolutionBroadcast {
156 const SUBJECT: &'static str = "ab.farmer.*.solution-response";
158}
159
160#[derive(Debug, Clone, Encode, Decode)]
162struct ClusterFarmerReadPieceRequest {
163 sector_index: SectorIndex,
164 piece_offset: PieceOffset,
165}
166
167impl GenericRequest for ClusterFarmerReadPieceRequest {
168 const SUBJECT: &'static str = "ab.farmer.*.farm.read-piece";
170 type Response = Result<Option<Piece>, String>;
171}
172
173#[derive(Debug, Clone, Encode, Decode)]
175struct ClusterFarmerPlottedSectorsRequest;
176
177impl GenericStreamRequest for ClusterFarmerPlottedSectorsRequest {
178 const SUBJECT: &'static str = "ab.farmer.*.farm.plotted-sectors";
180 type Response = Result<PlottedSector, String>;
181}
182
183#[derive(Debug)]
184struct ClusterPlottedSectors {
185 farm_id_string: String,
186 nats_client: NatsClient,
187}
188
189#[async_trait]
190impl PlottedSectors for ClusterPlottedSectors {
191 async fn get(
192 &self,
193 ) -> Result<
194 Box<dyn Stream<Item = Result<PlottedSector, FarmError>> + Unpin + Send + '_>,
195 FarmError,
196 > {
197 Ok(Box::new(
198 self.nats_client
199 .stream_request(
200 &ClusterFarmerPlottedSectorsRequest,
201 Some(&self.farm_id_string),
202 )
203 .await?
204 .map(|response| response.map_err(FarmError::from)),
205 ))
206 }
207}
208
209#[derive(Debug)]
210struct ClusterPieceReader {
211 farm_id_string: String,
212 nats_client: NatsClient,
213}
214
215#[async_trait]
216impl PieceReader for ClusterPieceReader {
217 async fn read_piece(
218 &self,
219 sector_index: SectorIndex,
220 piece_offset: PieceOffset,
221 ) -> Result<Option<Piece>, FarmError> {
222 Ok(self
223 .nats_client
224 .request(
225 &ClusterFarmerReadPieceRequest {
226 sector_index,
227 piece_offset,
228 },
229 Some(&self.farm_id_string),
230 )
231 .await??)
232 }
233}
234
235#[derive(Default, Debug)]
236struct Handlers {
237 sector_update: Handler<(SectorIndex, SectorUpdate)>,
238 farming_notification: Handler<FarmingNotification>,
239 solution: Handler<SolutionResponse>,
240}
241
242#[derive(Debug)]
244pub struct ClusterFarm {
245 farm_id: FarmId,
246 farm_id_string: String,
247 total_sectors_count: u16,
248 nats_client: NatsClient,
249 handlers: Arc<Handlers>,
250 background_tasks: AsyncJoinOnDrop<()>,
251}
252
253#[async_trait(?Send)]
254impl Farm for ClusterFarm {
255 fn id(&self) -> &FarmId {
256 &self.farm_id
257 }
258
259 fn total_sectors_count(&self) -> u16 {
260 self.total_sectors_count
261 }
262
263 fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static> {
264 Arc::new(ClusterPlottedSectors {
265 farm_id_string: self.farm_id_string.clone(),
266 nats_client: self.nats_client.clone(),
267 })
268 }
269
270 fn piece_reader(&self) -> Arc<dyn PieceReader + 'static> {
271 Arc::new(ClusterPieceReader {
272 farm_id_string: self.farm_id_string.clone(),
273 nats_client: self.nats_client.clone(),
274 })
275 }
276
277 fn on_sector_update(
278 &self,
279 callback: HandlerFn<(SectorIndex, SectorUpdate)>,
280 ) -> Box<dyn HandlerId> {
281 Box::new(self.handlers.sector_update.add(callback))
282 }
283
284 fn on_farming_notification(
285 &self,
286 callback: HandlerFn<FarmingNotification>,
287 ) -> Box<dyn HandlerId> {
288 Box::new(self.handlers.farming_notification.add(callback))
289 }
290
291 fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn HandlerId> {
292 Box::new(self.handlers.solution.add(callback))
293 }
294
295 fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
296 Box::pin((*self).run())
297 }
298}
299
300impl ClusterFarm {
301 pub async fn new(
304 farm_id: FarmId,
305 total_sectors_count: u16,
306 nats_client: NatsClient,
307 ) -> anyhow::Result<Self> {
308 let farm_id_string = farm_id.to_string();
309
310 let sector_updates_subscription = nats_client
311 .subscribe_to_broadcasts::<ClusterFarmerSectorUpdateBroadcast>(
312 Some(&farm_id_string),
313 None,
314 )
315 .await
316 .map_err(|error| anyhow!("Failed to subscribe to sector updates broadcast: {error}"))?;
317 let farming_notifications_subscription = nats_client
318 .subscribe_to_broadcasts::<ClusterFarmerFarmingNotificationBroadcast>(
319 Some(&farm_id_string),
320 None,
321 )
322 .await
323 .map_err(|error| {
324 anyhow!("Failed to subscribe to farming notifications broadcast: {error}")
325 })?;
326 let solution_subscription = nats_client
327 .subscribe_to_broadcasts::<ClusterFarmerSolutionBroadcast>(Some(&farm_id_string), None)
328 .await
329 .map_err(|error| {
330 anyhow!("Failed to subscribe to solution responses broadcast: {error}")
331 })?;
332
333 let handlers = Arc::<Handlers>::default();
334 let background_tasks = {
336 let handlers = Arc::clone(&handlers);
337
338 async move {
339 let mut sector_updates_subscription = pin!(sector_updates_subscription);
340 let mut farming_notifications_subscription =
341 pin!(farming_notifications_subscription);
342 let mut solution_subscription = pin!(solution_subscription);
343
344 let sector_updates_fut = async {
345 while let Some(ClusterFarmerSectorUpdateBroadcast {
346 sector_index,
347 sector_update,
348 ..
349 }) = sector_updates_subscription.next().await
350 {
351 handlers
352 .sector_update
353 .call_simple(&(sector_index, sector_update));
354 }
355 };
356 let farming_notifications_fut = async {
357 while let Some(ClusterFarmerFarmingNotificationBroadcast {
358 farming_notification,
359 ..
360 }) = farming_notifications_subscription.next().await
361 {
362 handlers
363 .farming_notification
364 .call_simple(&farming_notification);
365 }
366 };
367 let solutions_fut = async {
368 while let Some(ClusterFarmerSolutionBroadcast {
369 solution_response, ..
370 }) = solution_subscription.next().await
371 {
372 handlers.solution.call_simple(&solution_response);
373 }
374 };
375
376 select! {
377 _ = sector_updates_fut.fuse() => {}
378 _ = farming_notifications_fut.fuse() => {}
379 _ = solutions_fut.fuse() => {}
380 }
381 }
382 };
383
384 Ok(Self {
385 farm_id,
386 farm_id_string,
387 total_sectors_count,
388 nats_client,
389 handlers,
390 background_tasks: AsyncJoinOnDrop::new(tokio::spawn(background_tasks), true),
391 })
392 }
393
394 pub async fn run(self) -> anyhow::Result<()> {
396 Ok(self.background_tasks.await?)
397 }
398}
399
400#[derive(Debug)]
401struct FarmDetails {
402 farm_id: FarmId,
403 farm_id_string: String,
404 total_sectors_count: u16,
405 piece_reader: Arc<dyn PieceReader + 'static>,
406 plotted_sectors: Arc<dyn PlottedSectors + 'static>,
407 _background_tasks: Option<AsyncJoinOnDrop<()>>,
408}
409
410pub fn farmer_service<F>(
416 nats_client: NatsClient,
417 farms: &[F],
418 identification_broadcast_interval: Duration,
419 primary_instance: bool,
420) -> impl Future<Output = anyhow::Result<()>> + Send + 'static
421where
422 F: Farm,
423{
424 let farmer_id = ClusterFarmerId::new();
425 let farmer_id_string = farmer_id.to_string();
426
427 let farms_details = farms
430 .iter()
431 .map(|farm| {
432 let farm_id = *farm.id();
433 let nats_client = nats_client.clone();
434
435 let background_tasks = if primary_instance {
436 let (sector_updates_sender, mut sector_updates_receiver) =
437 mpsc::channel(BROADCAST_NOTIFICATIONS_BUFFER);
438 let (farming_notifications_sender, mut farming_notifications_receiver) =
439 mpsc::channel(BROADCAST_NOTIFICATIONS_BUFFER);
440 let (solutions_sender, mut solutions_receiver) =
441 mpsc::channel(BROADCAST_NOTIFICATIONS_BUFFER);
442
443 let sector_updates_handler_id =
444 farm.on_sector_update(Arc::new(move |(sector_index, sector_update)| {
445 if let Err(error) = sector_updates_sender.clone().try_send(
446 ClusterFarmerSectorUpdateBroadcast {
447 farm_id,
448 sector_index: *sector_index,
449 sector_update: sector_update.clone(),
450 },
451 ) {
452 warn!(%farm_id, %error, "Failed to send sector update notification");
453 }
454 }));
455
456 let farming_notifications_handler_id =
457 farm.on_farming_notification(Arc::new(move |farming_notification| {
458 if let Err(error) = farming_notifications_sender.clone().try_send(
459 ClusterFarmerFarmingNotificationBroadcast {
460 farm_id,
461 farming_notification: farming_notification.clone(),
462 },
463 ) {
464 warn!(%farm_id, %error, "Failed to send farming notification");
465 }
466 }));
467
468 let solutions_handler_id = farm.on_solution(Arc::new(move |solution_response| {
469 if let Err(error) =
470 solutions_sender
471 .clone()
472 .try_send(ClusterFarmerSolutionBroadcast {
473 farm_id,
474 solution_response: solution_response.clone(),
475 })
476 {
477 warn!(%farm_id, %error, "Failed to send solution notification");
478 }
479 }));
480
481 Some(AsyncJoinOnDrop::new(
482 tokio::spawn(async move {
483 let farm_id_string = farm_id.to_string();
484
485 let sector_updates_fut = async {
486 while let Some(broadcast) = sector_updates_receiver.next().await {
487 if let Err(error) =
488 nats_client.broadcast(&broadcast, &farm_id_string).await
489 {
490 warn!(%farm_id, %error, "Failed to broadcast sector update");
491 }
492 }
493 };
494 let farming_notifications_fut = async {
495 while let Some(broadcast) = farming_notifications_receiver.next().await
496 {
497 if let Err(error) =
498 nats_client.broadcast(&broadcast, &farm_id_string).await
499 {
500 warn!(
501 %farm_id,
502 %error,
503 "Failed to broadcast farming notification"
504 );
505 }
506 }
507 };
508 let solutions_fut = async {
509 while let Some(broadcast) = solutions_receiver.next().await {
510 if let Err(error) =
511 nats_client.broadcast(&broadcast, &farm_id_string).await
512 {
513 warn!(%farm_id, %error, "Failed to broadcast solution");
514 }
515 }
516 };
517
518 select! {
519 _ = sector_updates_fut.fuse() => {}
520 _ = farming_notifications_fut.fuse() => {}
521 _ = solutions_fut.fuse() => {}
522 }
523
524 drop(sector_updates_handler_id);
525 drop(farming_notifications_handler_id);
526 drop(solutions_handler_id);
527 }),
528 true,
529 ))
530 } else {
531 None
532 };
533
534 FarmDetails {
535 farm_id,
536 farm_id_string: farm_id.to_string(),
537 total_sectors_count: farm.total_sectors_count(),
538 piece_reader: farm.piece_reader(),
539 plotted_sectors: farm.plotted_sectors(),
540 _background_tasks: background_tasks,
541 }
542 })
543 .collect::<Vec<_>>();
544
545 async move {
546 if primary_instance {
547 select! {
548 result = identify_responder(
549 &nats_client,
550 farmer_id,
551 &farmer_id_string,
552 identification_broadcast_interval
553 ).fuse() => {
554 result
555 },
556 result = farms_details_responder(
557 &nats_client,
558 &farmer_id_string,
559 &farms_details
560 ).fuse() => {
561 result
562 },
563 result = plotted_sectors_responder(&nats_client, &farms_details).fuse() => {
564 result
565 },
566 result = read_piece_responder(&nats_client, &farms_details).fuse() => {
567 result
568 },
569 }
570 } else {
571 select! {
572 result = plotted_sectors_responder(&nats_client, &farms_details).fuse() => {
573 result
574 },
575 result = read_piece_responder(&nats_client, &farms_details).fuse() => {
576 result
577 },
578 }
579 }
580 }
581}
582
583async fn identify_responder(
586 nats_client: &NatsClient,
587 farmer_id: ClusterFarmerId,
588 farmer_id_string: &str,
589 identification_broadcast_interval: Duration,
590) -> anyhow::Result<()> {
591 let mut subscription = nats_client
592 .subscribe_to_broadcasts::<ClusterControllerFarmerIdentifyBroadcast>(None, None)
593 .await
594 .map_err(|error| {
595 anyhow!("Failed to subscribe to farmer identify broadcast requests: {error}")
596 })?
597 .fuse();
598
599 let mut interval = tokio::time::interval(identification_broadcast_interval);
601 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
602
603 let mut last_identification = Instant::now();
604
605 loop {
606 select! {
607 maybe_message = subscription.next() => {
608 let Some(message) = maybe_message else {
609 debug!("Identify broadcast stream ended");
610 break;
611 };
612
613 trace!(?message, "Farmer received identify broadcast message");
614
615 if last_identification.elapsed() < MIN_FARMER_IDENTIFICATION_INTERVAL {
616 continue;
618 }
619
620 last_identification = Instant::now();
621 send_identify_broadcast(nats_client, farmer_id, farmer_id_string).await;
622 interval.reset();
623 }
624 _ = interval.tick().fuse() => {
625 last_identification = Instant::now();
626 trace!("Farmer self-identification");
627
628 send_identify_broadcast(nats_client, farmer_id, farmer_id_string).await;
629 }
630 }
631 }
632
633 Ok(())
634}
635
636async fn send_identify_broadcast(
637 nats_client: &NatsClient,
638 farmer_id: ClusterFarmerId,
639 farmer_id_string: &str,
640) {
641 if let Err(error) = nats_client
642 .broadcast(&new_identify_message(farmer_id), farmer_id_string)
643 .await
644 {
645 warn!(%farmer_id, %error, "Failed to send farmer identify notification");
646 }
647}
648
649fn new_identify_message(farmer_id: ClusterFarmerId) -> ClusterFarmerIdentifyBroadcast {
650 ClusterFarmerIdentifyBroadcast { farmer_id }
651}
652
653async fn farms_details_responder(
654 nats_client: &NatsClient,
655 farmer_id_string: &str,
656 farms_details: &[FarmDetails],
657) -> anyhow::Result<()> {
658 nats_client
659 .stream_request_responder(
660 Some(farmer_id_string),
661 Some(farmer_id_string.to_string()),
662 |_request: ClusterFarmerFarmDetailsRequest| async {
663 Some(stream::iter(farms_details.iter().map(|farm_details| {
664 ClusterFarmerFarmDetails {
665 farm_id: farm_details.farm_id,
666 total_sectors_count: farm_details.total_sectors_count,
667 }
668 })))
669 },
670 )
671 .await
672}
673
674async fn plotted_sectors_responder(
675 nats_client: &NatsClient,
676 farms_details: &[FarmDetails],
677) -> anyhow::Result<()> {
678 farms_details
679 .iter()
680 .map(|farm_details| async move {
681 nats_client
682 .stream_request_responder::<_, _, Pin<Box<dyn Stream<Item = _> + Send>>, _>(
683 Some(&farm_details.farm_id_string),
684 Some(farm_details.farm_id_string.clone()),
685 |_request: ClusterFarmerPlottedSectorsRequest| async move {
686 Some(match farm_details.plotted_sectors.get().await {
687 Ok(plotted_sectors) => {
688 Box::pin(plotted_sectors.map(|maybe_plotted_sector| {
689 maybe_plotted_sector.map_err(|error| error.to_string())
690 })) as _
691 }
692 Err(error) => {
693 error!(
694 %error,
695 farm_id = %farm_details.farm_id,
696 "Failed to get plotted sectors"
697 );
698
699 Box::pin(stream::once(async move {
700 Err(format!("Failed to get plotted sectors: {error}"))
701 })) as _
702 }
703 })
704 },
705 )
706 .instrument(info_span!("", cache_id = %farm_details.farm_id))
707 .await
708 })
709 .collect::<FuturesUnordered<_>>()
710 .next()
711 .await
712 .ok_or_else(|| anyhow!("No farms"))?
713}
714
715async fn read_piece_responder(
716 nats_client: &NatsClient,
717 farms_details: &[FarmDetails],
718) -> anyhow::Result<()> {
719 farms_details
720 .iter()
721 .map(|farm_details| async move {
722 nats_client
723 .request_responder(
724 Some(farm_details.farm_id_string.as_str()),
725 Some(farm_details.farm_id_string.clone()),
726 |request: ClusterFarmerReadPieceRequest| async move {
727 Some(
728 farm_details
729 .piece_reader
730 .read_piece(request.sector_index, request.piece_offset)
731 .await
732 .map_err(|error| error.to_string()),
733 )
734 },
735 )
736 .instrument(info_span!("", cache_id = %farm_details.farm_id))
737 .await
738 })
739 .collect::<FuturesUnordered<_>>()
740 .next()
741 .await
742 .ok_or_else(|| anyhow!("No farms"))?
743}