1use crate::cluster::controller::ClusterControllerFarmerIdentifyBroadcast;
11use crate::cluster::nats_client::{
12 GenericBroadcast, GenericRequest, GenericStreamRequest, NatsClient,
13};
14use crate::farm::{
15 Farm, FarmError, FarmId, FarmingNotification, HandlerFn, HandlerId, PieceReader,
16 PlottedSectors, SectorUpdate,
17};
18use crate::utils::AsyncJoinOnDrop;
19use ab_core_primitives::pieces::{Piece, PieceOffset};
20use ab_core_primitives::sectors::SectorIndex;
21use ab_farmer_components::plotting::PlottedSector;
22use ab_farmer_rpc_primitives::SolutionResponse;
23use anyhow::anyhow;
24use async_trait::async_trait;
25use derive_more::{Display, From};
26use event_listener_primitives::Bag;
27use futures::channel::mpsc;
28use futures::stream::FuturesUnordered;
29use futures::{FutureExt, Stream, StreamExt, select, stream};
30use parity_scale_codec::{Decode, Encode, EncodeLike, Input, Output};
31use std::future::Future;
32use std::pin::{Pin, pin};
33use std::sync::Arc;
34use std::time::{Duration, Instant};
35use tokio::time::MissedTickBehavior;
36use tracing::{Instrument, debug, error, info_span, trace, warn};
37use ulid::Ulid;
38
39const BROADCAST_NOTIFICATIONS_BUFFER: usize = 1000;
40const MIN_FARMER_IDENTIFICATION_INTERVAL: Duration = Duration::from_secs(1);
41
42type Handler<A> = Bag<HandlerFn<A>, A>;
43
44#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Display, From)]
46pub struct ClusterFarmerId(Ulid);
47
48impl Encode for ClusterFarmerId {
49 #[inline]
50 fn size_hint(&self) -> usize {
51 Encode::size_hint(&self.0.0)
52 }
53
54 #[inline]
55 fn encode_to<O>(&self, dest: &mut O)
56 where
57 O: Output + ?Sized,
58 {
59 Encode::encode_to(&self.0.0, dest);
60 }
61}
62
63impl EncodeLike for ClusterFarmerId {}
64
65impl Decode for ClusterFarmerId {
66 #[inline]
67 fn decode<I>(input: &mut I) -> Result<Self, parity_scale_codec::Error>
68 where
69 I: Input,
70 {
71 u128::decode(input)
72 .map(|ulid| Self(Ulid(ulid)))
73 .map_err(|e| e.chain("Could not decode `ClusterFarmerId.0.0`"))
74 }
75}
76
77#[expect(
78 clippy::new_without_default,
79 reason = "Default has different semantics"
80)]
81impl ClusterFarmerId {
82 pub fn new() -> Self {
84 Self(Ulid::new())
85 }
86}
87
88#[derive(Debug, Clone, Encode, Decode)]
90pub struct ClusterFarmerIdentifyBroadcast {
91 pub farmer_id: ClusterFarmerId,
93}
94
95impl GenericBroadcast for ClusterFarmerIdentifyBroadcast {
96 const SUBJECT: &'static str = "ab.farmer.*.farmer-identify";
98}
99
100#[derive(Debug, Clone, Encode, Decode)]
102pub struct ClusterFarmerFarmDetailsRequest;
103
104impl GenericStreamRequest for ClusterFarmerFarmDetailsRequest {
105 const SUBJECT: &'static str = "ab.farmer.*.farm.details";
107 type Response = ClusterFarmerFarmDetails;
108}
109
110#[derive(Debug, Clone, Encode, Decode)]
112pub struct ClusterFarmerFarmDetails {
113 pub farm_id: FarmId,
115 pub total_sectors_count: u16,
117}
118
119#[derive(Debug, Clone, Encode, Decode)]
121struct ClusterFarmerSectorUpdateBroadcast {
122 farm_id: FarmId,
124 sector_index: SectorIndex,
126 sector_update: SectorUpdate,
128}
129
130impl GenericBroadcast for ClusterFarmerSectorUpdateBroadcast {
131 const SUBJECT: &'static str = "ab.farmer.*.sector-update";
133}
134
135#[derive(Debug, Clone, Encode, Decode)]
137struct ClusterFarmerFarmingNotificationBroadcast {
138 farm_id: FarmId,
140 farming_notification: FarmingNotification,
142}
143
144impl GenericBroadcast for ClusterFarmerFarmingNotificationBroadcast {
145 const SUBJECT: &'static str = "ab.farmer.*.farming-notification";
147}
148
149#[derive(Debug, Clone, Encode, Decode)]
151struct ClusterFarmerSolutionBroadcast {
152 farm_id: FarmId,
154 solution_response: SolutionResponse,
156}
157
158impl GenericBroadcast for ClusterFarmerSolutionBroadcast {
159 const SUBJECT: &'static str = "ab.farmer.*.solution-response";
161}
162
163#[derive(Debug, Clone, Encode, Decode)]
165struct ClusterFarmerReadPieceRequest {
166 sector_index: SectorIndex,
167 piece_offset: PieceOffset,
168}
169
170impl GenericRequest for ClusterFarmerReadPieceRequest {
171 const SUBJECT: &'static str = "ab.farmer.*.farm.read-piece";
173 type Response = Result<Option<Piece>, String>;
174}
175
176#[derive(Debug, Clone, Encode, Decode)]
178struct ClusterFarmerPlottedSectorsRequest;
179
180impl GenericStreamRequest for ClusterFarmerPlottedSectorsRequest {
181 const SUBJECT: &'static str = "ab.farmer.*.farm.plotted-sectors";
183 type Response = Result<PlottedSector, String>;
184}
185
186#[derive(Debug)]
187struct ClusterPlottedSectors {
188 farm_id_string: String,
189 nats_client: NatsClient,
190}
191
192#[async_trait]
193impl PlottedSectors for ClusterPlottedSectors {
194 async fn get(
195 &self,
196 ) -> Result<
197 Box<dyn Stream<Item = Result<PlottedSector, FarmError>> + Unpin + Send + '_>,
198 FarmError,
199 > {
200 Ok(Box::new(
201 self.nats_client
202 .stream_request(
203 &ClusterFarmerPlottedSectorsRequest,
204 Some(&self.farm_id_string),
205 )
206 .await?
207 .map(|response| response.map_err(FarmError::from)),
208 ))
209 }
210}
211
212#[derive(Debug)]
213struct ClusterPieceReader {
214 farm_id_string: String,
215 nats_client: NatsClient,
216}
217
218#[async_trait]
219impl PieceReader for ClusterPieceReader {
220 async fn read_piece(
221 &self,
222 sector_index: SectorIndex,
223 piece_offset: PieceOffset,
224 ) -> Result<Option<Piece>, FarmError> {
225 Ok(self
226 .nats_client
227 .request(
228 &ClusterFarmerReadPieceRequest {
229 sector_index,
230 piece_offset,
231 },
232 Some(&self.farm_id_string),
233 )
234 .await??)
235 }
236}
237
238#[derive(Default, Debug)]
239struct Handlers {
240 sector_update: Handler<(SectorIndex, SectorUpdate)>,
241 farming_notification: Handler<FarmingNotification>,
242 solution: Handler<SolutionResponse>,
243}
244
245#[derive(Debug)]
247pub struct ClusterFarm {
248 farm_id: FarmId,
249 farm_id_string: String,
250 total_sectors_count: u16,
251 nats_client: NatsClient,
252 handlers: Arc<Handlers>,
253 background_tasks: AsyncJoinOnDrop<()>,
254}
255
256#[async_trait(?Send)]
257impl Farm for ClusterFarm {
258 fn id(&self) -> &FarmId {
259 &self.farm_id
260 }
261
262 fn total_sectors_count(&self) -> u16 {
263 self.total_sectors_count
264 }
265
266 fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static> {
267 Arc::new(ClusterPlottedSectors {
268 farm_id_string: self.farm_id_string.clone(),
269 nats_client: self.nats_client.clone(),
270 })
271 }
272
273 fn piece_reader(&self) -> Arc<dyn PieceReader + 'static> {
274 Arc::new(ClusterPieceReader {
275 farm_id_string: self.farm_id_string.clone(),
276 nats_client: self.nats_client.clone(),
277 })
278 }
279
280 fn on_sector_update(
281 &self,
282 callback: HandlerFn<(SectorIndex, SectorUpdate)>,
283 ) -> Box<dyn HandlerId> {
284 Box::new(self.handlers.sector_update.add(callback))
285 }
286
287 fn on_farming_notification(
288 &self,
289 callback: HandlerFn<FarmingNotification>,
290 ) -> Box<dyn HandlerId> {
291 Box::new(self.handlers.farming_notification.add(callback))
292 }
293
294 fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn HandlerId> {
295 Box::new(self.handlers.solution.add(callback))
296 }
297
298 fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
299 Box::pin((*self).run())
300 }
301}
302
303impl ClusterFarm {
304 pub async fn new(
307 farm_id: FarmId,
308 total_sectors_count: u16,
309 nats_client: NatsClient,
310 ) -> anyhow::Result<Self> {
311 let farm_id_string = farm_id.to_string();
312
313 let sector_updates_subscription = nats_client
314 .subscribe_to_broadcasts::<ClusterFarmerSectorUpdateBroadcast>(
315 Some(&farm_id_string),
316 None,
317 )
318 .await
319 .map_err(|error| anyhow!("Failed to subscribe to sector updates broadcast: {error}"))?;
320 let farming_notifications_subscription = nats_client
321 .subscribe_to_broadcasts::<ClusterFarmerFarmingNotificationBroadcast>(
322 Some(&farm_id_string),
323 None,
324 )
325 .await
326 .map_err(|error| {
327 anyhow!("Failed to subscribe to farming notifications broadcast: {error}")
328 })?;
329 let solution_subscription = nats_client
330 .subscribe_to_broadcasts::<ClusterFarmerSolutionBroadcast>(Some(&farm_id_string), None)
331 .await
332 .map_err(|error| {
333 anyhow!("Failed to subscribe to solution responses broadcast: {error}")
334 })?;
335
336 let handlers = Arc::<Handlers>::default();
337 let background_tasks = {
339 let handlers = Arc::clone(&handlers);
340
341 async move {
342 let mut sector_updates_subscription = pin!(sector_updates_subscription);
343 let mut farming_notifications_subscription =
344 pin!(farming_notifications_subscription);
345 let mut solution_subscription = pin!(solution_subscription);
346
347 let sector_updates_fut = async {
348 while let Some(ClusterFarmerSectorUpdateBroadcast {
349 sector_index,
350 sector_update,
351 ..
352 }) = sector_updates_subscription.next().await
353 {
354 handlers
355 .sector_update
356 .call_simple(&(sector_index, sector_update));
357 }
358 };
359 let farming_notifications_fut = async {
360 while let Some(ClusterFarmerFarmingNotificationBroadcast {
361 farming_notification,
362 ..
363 }) = farming_notifications_subscription.next().await
364 {
365 handlers
366 .farming_notification
367 .call_simple(&farming_notification);
368 }
369 };
370 let solutions_fut = async {
371 while let Some(ClusterFarmerSolutionBroadcast {
372 solution_response, ..
373 }) = solution_subscription.next().await
374 {
375 handlers.solution.call_simple(&solution_response);
376 }
377 };
378
379 select! {
380 () = sector_updates_fut.fuse() => {}
381 () = farming_notifications_fut.fuse() => {}
382 () = solutions_fut.fuse() => {}
383 }
384 }
385 };
386
387 Ok(Self {
388 farm_id,
389 farm_id_string,
390 total_sectors_count,
391 nats_client,
392 handlers,
393 background_tasks: AsyncJoinOnDrop::new(tokio::spawn(background_tasks), true),
394 })
395 }
396
397 pub async fn run(self) -> anyhow::Result<()> {
399 Ok(self.background_tasks.await?)
400 }
401}
402
403#[derive(Debug)]
404struct FarmDetails {
405 farm_id: FarmId,
406 farm_id_string: String,
407 total_sectors_count: u16,
408 piece_reader: Arc<dyn PieceReader + 'static>,
409 plotted_sectors: Arc<dyn PlottedSectors + 'static>,
410 _background_tasks: Option<AsyncJoinOnDrop<()>>,
411}
412
413pub fn farmer_service<F>(
419 nats_client: NatsClient,
420 farms: &[F],
421 identification_broadcast_interval: Duration,
422 primary_instance: bool,
423) -> impl Future<Output = anyhow::Result<()>> + Send + 'static
424where
425 F: Farm,
426{
427 let farmer_id = ClusterFarmerId::new();
428 let farmer_id_string = farmer_id.to_string();
429
430 let farms_details = farms
433 .iter()
434 .map(|farm| {
435 let farm_id = *farm.id();
436 let nats_client = nats_client.clone();
437
438 let background_tasks = if primary_instance {
439 let (sector_updates_sender, mut sector_updates_receiver) =
440 mpsc::channel(BROADCAST_NOTIFICATIONS_BUFFER);
441 let (farming_notifications_sender, mut farming_notifications_receiver) =
442 mpsc::channel(BROADCAST_NOTIFICATIONS_BUFFER);
443 let (solutions_sender, mut solutions_receiver) =
444 mpsc::channel(BROADCAST_NOTIFICATIONS_BUFFER);
445
446 let sector_updates_handler_id =
447 farm.on_sector_update(Arc::new(move |(sector_index, sector_update)| {
448 if let Err(error) = sector_updates_sender.clone().try_send(
449 ClusterFarmerSectorUpdateBroadcast {
450 farm_id,
451 sector_index: *sector_index,
452 sector_update: sector_update.clone(),
453 },
454 ) {
455 warn!(%farm_id, %error, "Failed to send sector update notification");
456 }
457 }));
458
459 let farming_notifications_handler_id =
460 farm.on_farming_notification(Arc::new(move |farming_notification| {
461 if let Err(error) = farming_notifications_sender.clone().try_send(
462 ClusterFarmerFarmingNotificationBroadcast {
463 farm_id,
464 farming_notification: farming_notification.clone(),
465 },
466 ) {
467 warn!(%farm_id, %error, "Failed to send farming notification");
468 }
469 }));
470
471 let solutions_handler_id = farm.on_solution(Arc::new(move |solution_response| {
472 if let Err(error) =
473 solutions_sender
474 .clone()
475 .try_send(ClusterFarmerSolutionBroadcast {
476 farm_id,
477 solution_response: solution_response.clone(),
478 })
479 {
480 warn!(%farm_id, %error, "Failed to send solution notification");
481 }
482 }));
483
484 Some(AsyncJoinOnDrop::new(
485 tokio::spawn(async move {
486 let farm_id_string = farm_id.to_string();
487
488 let sector_updates_fut = async {
489 while let Some(broadcast) = sector_updates_receiver.next().await {
490 if let Err(error) =
491 nats_client.broadcast(&broadcast, &farm_id_string).await
492 {
493 warn!(%farm_id, %error, "Failed to broadcast sector update");
494 }
495 }
496 };
497 let farming_notifications_fut = async {
498 while let Some(broadcast) = farming_notifications_receiver.next().await
499 {
500 if let Err(error) =
501 nats_client.broadcast(&broadcast, &farm_id_string).await
502 {
503 warn!(
504 %farm_id,
505 %error,
506 "Failed to broadcast farming notification"
507 );
508 }
509 }
510 };
511 let solutions_fut = async {
512 while let Some(broadcast) = solutions_receiver.next().await {
513 if let Err(error) =
514 nats_client.broadcast(&broadcast, &farm_id_string).await
515 {
516 warn!(%farm_id, %error, "Failed to broadcast solution");
517 }
518 }
519 };
520
521 select! {
522 () = sector_updates_fut.fuse() => {}
523 () = farming_notifications_fut.fuse() => {}
524 () = solutions_fut.fuse() => {}
525 }
526
527 drop(sector_updates_handler_id);
528 drop(farming_notifications_handler_id);
529 drop(solutions_handler_id);
530 }),
531 true,
532 ))
533 } else {
534 None
535 };
536
537 FarmDetails {
538 farm_id,
539 farm_id_string: farm_id.to_string(),
540 total_sectors_count: farm.total_sectors_count(),
541 piece_reader: farm.piece_reader(),
542 plotted_sectors: farm.plotted_sectors(),
543 _background_tasks: background_tasks,
544 }
545 })
546 .collect::<Vec<_>>();
547
548 async move {
549 if primary_instance {
550 select! {
551 result = identify_responder(
552 &nats_client,
553 farmer_id,
554 &farmer_id_string,
555 identification_broadcast_interval
556 ).fuse() => {
557 result
558 },
559 result = farms_details_responder(
560 &nats_client,
561 &farmer_id_string,
562 &farms_details
563 ).fuse() => {
564 result
565 },
566 result = plotted_sectors_responder(&nats_client, &farms_details).fuse() => {
567 result
568 },
569 result = read_piece_responder(&nats_client, &farms_details).fuse() => {
570 result
571 },
572 }
573 } else {
574 select! {
575 result = plotted_sectors_responder(&nats_client, &farms_details).fuse() => {
576 result
577 },
578 result = read_piece_responder(&nats_client, &farms_details).fuse() => {
579 result
580 },
581 }
582 }
583 }
584}
585
586async fn identify_responder(
589 nats_client: &NatsClient,
590 farmer_id: ClusterFarmerId,
591 farmer_id_string: &str,
592 identification_broadcast_interval: Duration,
593) -> anyhow::Result<()> {
594 let mut subscription = nats_client
595 .subscribe_to_broadcasts::<ClusterControllerFarmerIdentifyBroadcast>(None, None)
596 .await
597 .map_err(|error| {
598 anyhow!("Failed to subscribe to farmer identify broadcast requests: {error}")
599 })?
600 .fuse();
601
602 let mut interval = tokio::time::interval(identification_broadcast_interval);
604 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
605
606 let mut last_identification = Instant::now();
607
608 loop {
609 select! {
610 maybe_message = subscription.next() => {
611 let Some(message) = maybe_message else {
612 debug!("Identify broadcast stream ended");
613 break;
614 };
615
616 trace!(?message, "Farmer received identify broadcast message");
617
618 if last_identification.elapsed() < MIN_FARMER_IDENTIFICATION_INTERVAL {
619 continue;
621 }
622
623 last_identification = Instant::now();
624 send_identify_broadcast(nats_client, farmer_id, farmer_id_string).await;
625 interval.reset();
626 }
627 _ = interval.tick().fuse() => {
628 last_identification = Instant::now();
629 trace!("Farmer self-identification");
630
631 send_identify_broadcast(nats_client, farmer_id, farmer_id_string).await;
632 }
633 }
634 }
635
636 Ok(())
637}
638
639async fn send_identify_broadcast(
640 nats_client: &NatsClient,
641 farmer_id: ClusterFarmerId,
642 farmer_id_string: &str,
643) {
644 if let Err(error) = nats_client
645 .broadcast(&new_identify_message(farmer_id), farmer_id_string)
646 .await
647 {
648 warn!(%farmer_id, %error, "Failed to send farmer identify notification");
649 }
650}
651
652fn new_identify_message(farmer_id: ClusterFarmerId) -> ClusterFarmerIdentifyBroadcast {
653 ClusterFarmerIdentifyBroadcast { farmer_id }
654}
655
656async fn farms_details_responder(
657 nats_client: &NatsClient,
658 farmer_id_string: &str,
659 farms_details: &[FarmDetails],
660) -> anyhow::Result<()> {
661 nats_client
662 .stream_request_responder(
663 Some(farmer_id_string),
664 Some(farmer_id_string.to_string()),
665 |_request: ClusterFarmerFarmDetailsRequest| async {
666 Some(stream::iter(farms_details.iter().map(|farm_details| {
667 ClusterFarmerFarmDetails {
668 farm_id: farm_details.farm_id,
669 total_sectors_count: farm_details.total_sectors_count,
670 }
671 })))
672 },
673 )
674 .await
675}
676
677async fn plotted_sectors_responder(
678 nats_client: &NatsClient,
679 farms_details: &[FarmDetails],
680) -> anyhow::Result<()> {
681 farms_details
682 .iter()
683 .map(|farm_details| async move {
684 nats_client
685 .stream_request_responder::<_, _, Pin<Box<dyn Stream<Item = _> + Send>>, _>(
686 Some(&farm_details.farm_id_string),
687 Some(farm_details.farm_id_string.clone()),
688 |_request: ClusterFarmerPlottedSectorsRequest| async move {
689 Some(match farm_details.plotted_sectors.get().await {
690 Ok(plotted_sectors) => {
691 Box::pin(plotted_sectors.map(|maybe_plotted_sector| {
692 maybe_plotted_sector.map_err(|error| error.to_string())
693 })) as _
694 }
695 Err(error) => {
696 error!(
697 %error,
698 farm_id = %farm_details.farm_id,
699 "Failed to get plotted sectors"
700 );
701
702 Box::pin(stream::once(async move {
703 Err(format!("Failed to get plotted sectors: {error}"))
704 })) as _
705 }
706 })
707 },
708 )
709 .instrument(info_span!("", cache_id = %farm_details.farm_id))
710 .await
711 })
712 .collect::<FuturesUnordered<_>>()
713 .next()
714 .await
715 .ok_or_else(|| anyhow!("No farms"))?
716}
717
718async fn read_piece_responder(
719 nats_client: &NatsClient,
720 farms_details: &[FarmDetails],
721) -> anyhow::Result<()> {
722 farms_details
723 .iter()
724 .map(|farm_details| async move {
725 nats_client
726 .request_responder(
727 Some(farm_details.farm_id_string.as_str()),
728 Some(farm_details.farm_id_string.clone()),
729 |request: ClusterFarmerReadPieceRequest| async move {
730 Some(
731 farm_details
732 .piece_reader
733 .read_piece(request.sector_index, request.piece_offset)
734 .await
735 .map_err(|error| error.to_string()),
736 )
737 },
738 )
739 .instrument(info_span!("", cache_id = %farm_details.farm_id))
740 .await
741 })
742 .collect::<FuturesUnordered<_>>()
743 .next()
744 .await
745 .ok_or_else(|| anyhow!("No farms"))?
746}