Skip to main content

ab_farmer/cluster/
controller.rs

1//! Farming cluster controller
2//!
3//! Controller is responsible for managing farming cluster.
4//!
5//! This module exposes some data structures for NATS communication, custom piece getter and node
6//! client implementations designed to work with cluster controller and a service function to drive
7//! the backend part of the controller.
8
9pub mod caches;
10pub mod farms;
11mod stream_map;
12
13use crate::cluster::cache::{ClusterCacheReadPieceRequest, ClusterCacheReadPiecesRequest};
14use crate::cluster::nats_client::{
15    GenericBroadcast, GenericNotification, GenericRequest, GenericStreamRequest, NatsClient,
16};
17use crate::farm::{PieceCacheId, PieceCacheOffset};
18use crate::farmer_cache::FarmerCache;
19use crate::node_client::NodeClient;
20use ab_core_primitives::pieces::{Piece, PieceIndex};
21use ab_core_primitives::segments::{
22    SegmentIndex, SuperSegmentHeader, SuperSegmentIndex, SuperSegmentRoot,
23};
24use ab_data_retrieval::piece_getter::PieceGetter;
25use ab_farmer_rpc_primitives::{
26    BlockSealInfo, BlockSealResponse, FarmerAppInfo, FarmerShardMembershipInfo, SlotInfo,
27    SolutionResponse,
28};
29use anyhow::anyhow;
30use async_nats::HeaderValue;
31use async_trait::async_trait;
32use futures::channel::mpsc;
33use futures::future::FusedFuture;
34use futures::stream::FuturesUnordered;
35use futures::{FutureExt, Stream, StreamExt, select, stream};
36use parity_scale_codec::{Decode, Encode};
37use parking_lot::Mutex;
38use rand::prelude::*;
39use std::collections::{HashMap, HashSet};
40use std::pin::Pin;
41use std::sync::Arc;
42use std::task::Poll;
43use tracing::{debug, error, trace, warn};
44
45/// Special "cache group" that all controllers subscribe to and that can be used to query any cache
46/// group. The cache group for each query is chosen at random.
47const GLOBAL_CACHE_GROUP: &str = "_";
48
49/// Broadcast sent by controllers requesting farmers to identify themselves
50#[derive(Debug, Copy, Clone, Encode, Decode)]
51pub struct ClusterControllerFarmerIdentifyBroadcast;
52
53impl GenericBroadcast for ClusterControllerFarmerIdentifyBroadcast {
54    const SUBJECT: &'static str = "ab.controller.farmer-identify";
55}
56
57/// Broadcast sent by controllers requesting caches in cache group to identify themselves
58#[derive(Debug, Copy, Clone, Encode, Decode)]
59pub struct ClusterControllerCacheIdentifyBroadcast;
60
61impl GenericBroadcast for ClusterControllerCacheIdentifyBroadcast {
62    /// `*` here stands for cache group
63    const SUBJECT: &'static str = "ab.controller.*.cache-identify";
64}
65
66/// Broadcast with slot info sent by controllers
67#[derive(Debug, Clone, Encode, Decode)]
68struct ClusterControllerSlotInfoBroadcast {
69    slot_info: SlotInfo,
70    instance: String,
71}
72
73impl GenericBroadcast for ClusterControllerSlotInfoBroadcast {
74    const SUBJECT: &'static str = "ab.controller.slot-info";
75
76    fn deterministic_message_id(&self) -> Option<HeaderValue> {
77        // TODO: Depending on answer in `https://github.com/nats-io/nats.docs/issues/663` this might
78        //  be simplified to just a slot number
79        Some(HeaderValue::from(format!(
80            "slot-info-{}",
81            self.slot_info.slot
82        )))
83    }
84}
85
86/// Broadcast with block sealing info by controllers
87#[derive(Debug, Clone, Encode, Decode)]
88struct ClusterControllerBlockSealingBroadcast {
89    block_sealing_info: BlockSealInfo,
90}
91
92impl GenericBroadcast for ClusterControllerBlockSealingBroadcast {
93    const SUBJECT: &'static str = "ab.controller.block-sealing-info";
94}
95
96/// Broadcast with new super segment headers by controllers
97#[derive(Debug, Clone, Encode, Decode)]
98struct ClusterControllerNewSuperSegmentHeaderBroadcast {
99    new_super_segment_header: SuperSegmentHeader,
100}
101
102impl GenericBroadcast for ClusterControllerNewSuperSegmentHeaderBroadcast {
103    const SUBJECT: &'static str = "ab.controller.new-super-segment-header";
104
105    fn deterministic_message_id(&self) -> Option<HeaderValue> {
106        // TODO: Depending on answer in `https://github.com/nats-io/nats.docs/issues/663` this might
107        //  be simplified to just a super segment index
108        Some(HeaderValue::from(format!(
109            "new-super-segment-{}",
110            self.new_super_segment_header.index
111        )))
112    }
113}
114
115/// Notification messages with solution by farmers
116#[derive(Debug, Clone, Encode, Decode)]
117struct ClusterControllerSolutionNotification {
118    solution_response: SolutionResponse,
119}
120
121impl GenericNotification for ClusterControllerSolutionNotification {
122    const SUBJECT: &'static str = "ab.controller.*.solution";
123}
124
125/// Notification messages with block seal by farmers
126#[derive(Debug, Clone, Encode, Decode)]
127struct ClusterControllerBlockSealNotification {
128    block_seal: BlockSealResponse,
129}
130
131impl GenericNotification for ClusterControllerBlockSealNotification {
132    const SUBJECT: &'static str = "ab.controller.block-seal";
133}
134
135/// Request farmer app info from controller
136#[derive(Debug, Clone, Encode, Decode)]
137struct ClusterControllerFarmerAppInfoRequest;
138
139impl GenericRequest for ClusterControllerFarmerAppInfoRequest {
140    const SUBJECT: &'static str = "ab.controller.farmer-app-info";
141    type Response = Result<FarmerAppInfo, String>;
142}
143
144/// Request super segment headers with specified super segment indices
145#[derive(Debug, Clone, Encode, Decode)]
146struct ClusterControllerSuperSegmentHeadersRequest {
147    super_segment_indices: Vec<SuperSegmentIndex>,
148}
149
150impl GenericRequest for ClusterControllerSuperSegmentHeadersRequest {
151    const SUBJECT: &'static str = "ab.controller.super-segment-headers";
152    type Response = Vec<Option<SuperSegmentHeader>>;
153}
154
155/// Request super segment root for a specified segment index
156#[derive(Debug, Clone, Encode, Decode)]
157struct ClusterControllerSuperSegmentRootForSegmentIndexRequest {
158    segment_index: SegmentIndex,
159}
160
161impl GenericRequest for ClusterControllerSuperSegmentRootForSegmentIndexRequest {
162    const SUBJECT: &'static str = "ab.controller.super-segment-root-for-segment-index";
163    type Response = Option<SuperSegmentRoot>;
164}
165
166/// Find piece with specified index in cache
167#[derive(Debug, Clone, Encode, Decode)]
168struct ClusterControllerFindPieceInCacheRequest {
169    piece_index: PieceIndex,
170}
171
172impl GenericRequest for ClusterControllerFindPieceInCacheRequest {
173    const SUBJECT: &'static str = "ab.controller.*.find-piece-in-cache";
174    type Response = Option<(PieceCacheId, PieceCacheOffset)>;
175}
176
177/// Find pieces with specified indices in cache
178#[derive(Debug, Clone, Encode, Decode)]
179struct ClusterControllerFindPiecesInCacheRequest {
180    piece_indices: Vec<PieceIndex>,
181}
182
183impl GenericStreamRequest for ClusterControllerFindPiecesInCacheRequest {
184    const SUBJECT: &'static str = "ab.controller.*.find-pieces-in-cache";
185    /// Only pieces that were found are returned
186    type Response = (PieceIndex, PieceCacheId, PieceCacheOffset);
187}
188
189/// Request piece with specified index
190#[derive(Debug, Clone, Encode, Decode)]
191struct ClusterControllerPieceRequest {
192    piece_index: PieceIndex,
193}
194
195impl GenericRequest for ClusterControllerPieceRequest {
196    const SUBJECT: &'static str = "ab.controller.piece";
197    type Response = Option<Piece>;
198}
199
200/// Request pieces with specified index
201#[derive(Debug, Clone, Encode, Decode)]
202struct ClusterControllerPiecesRequest {
203    piece_indices: Vec<PieceIndex>,
204}
205
206impl GenericStreamRequest for ClusterControllerPiecesRequest {
207    const SUBJECT: &'static str = "ab.controller.pieces";
208    /// Only pieces that were found are returned
209    type Response = (PieceIndex, Piece);
210}
211
212/// Cluster piece getter
213#[derive(Debug, Clone)]
214pub struct ClusterPieceGetter {
215    nats_client: NatsClient,
216    cache_group: String,
217}
218
219#[async_trait]
220impl PieceGetter for ClusterPieceGetter {
221    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
222        if let Some((piece_cache_id, piece_cache_offset)) = self
223            .nats_client
224            .request(
225                &ClusterControllerFindPieceInCacheRequest { piece_index },
226                Some(&self.cache_group),
227            )
228            .await?
229        {
230            trace!(
231                %piece_index,
232                %piece_cache_id,
233                %piece_cache_offset,
234                "Found piece in cache, retrieving"
235            );
236
237            match self
238                .nats_client
239                .request(
240                    &ClusterCacheReadPieceRequest {
241                        offset: piece_cache_offset,
242                    },
243                    Some(&piece_cache_id.to_string()),
244                )
245                .await
246                .map_err(|error| error.to_string())
247                .flatten()
248            {
249                Ok(Some((retrieved_piece_index, piece))) => {
250                    if retrieved_piece_index == piece_index {
251                        trace!(
252                            %piece_index,
253                            %piece_cache_id,
254                            %piece_cache_offset,
255                            "Retrieved piece from cache successfully"
256                        );
257
258                        return Ok(Some(piece));
259                    } else {
260                        trace!(
261                            %piece_index,
262                            %piece_cache_id,
263                            %piece_cache_offset,
264                            "Retrieving piece was replaced in cache during retrieval"
265                        );
266                    }
267                }
268                Ok(None) => {
269                    trace!(
270                        %piece_index,
271                        %piece_cache_id,
272                        %piece_cache_offset,
273                        "Piece cache didn't have piece at offset"
274                    );
275                }
276                Err(error) => {
277                    debug!(
278                        %piece_index,
279                        %piece_cache_id,
280                        %piece_cache_offset,
281                        %error,
282                        "Retrieving piece from cache failed"
283                    );
284                }
285            }
286        } else {
287            trace!(%piece_index, "Piece not found in cache");
288        }
289
290        Ok(self
291            .nats_client
292            .request(&ClusterControllerPieceRequest { piece_index }, None)
293            .await?)
294    }
295
296    async fn get_pieces<'a>(
297        &'a self,
298        piece_indices: Vec<PieceIndex>,
299    ) -> anyhow::Result<
300        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
301    > {
302        let (tx, mut rx) = mpsc::unbounded();
303
304        let piece_indices_to_get =
305            Mutex::new(piece_indices.iter().copied().collect::<HashSet<_>>());
306
307        let mut cached_pieces_by_cache_id = HashMap::<PieceCacheId, Vec<PieceCacheOffset>>::new();
308
309        {
310            let mut cached_pieces = self
311                .nats_client
312                .stream_request(
313                    &ClusterControllerFindPiecesInCacheRequest { piece_indices },
314                    Some(&self.cache_group),
315                )
316                .await?;
317
318            while let Some((_piece_index, piece_cache_id, piece_cache_offset)) =
319                cached_pieces.next().await
320            {
321                cached_pieces_by_cache_id
322                    .entry(piece_cache_id)
323                    .or_default()
324                    .push(piece_cache_offset);
325            }
326        }
327
328        let fut = async move {
329            let tx = &tx;
330
331            cached_pieces_by_cache_id
332                .into_iter()
333                .map(|(piece_cache_id, offsets)| {
334                    let piece_indices_to_get = &piece_indices_to_get;
335
336                    async move {
337                        let mut pieces_stream = match self
338                            .nats_client
339                            .stream_request(
340                                &ClusterCacheReadPiecesRequest { offsets },
341                                Some(&piece_cache_id.to_string()),
342                            )
343                            .await
344                        {
345                            Ok(pieces) => pieces,
346                            Err(error) => {
347                                warn!(
348                                    %error,
349                                    %piece_cache_id,
350                                    "Failed to request pieces from cache"
351                                );
352
353                                return;
354                            }
355                        };
356
357                        while let Some(piece_result) = pieces_stream.next().await {
358                            let (piece_offset, maybe_piece) = match piece_result {
359                                Ok(result) => result,
360                                Err(error) => {
361                                    warn!(%error, "Failed to get piece from cache");
362                                    continue;
363                                }
364                            };
365
366                            if let Some((piece_index, piece)) = maybe_piece {
367                                piece_indices_to_get.lock().remove(&piece_index);
368
369                                tx.unbounded_send((piece_index, Ok(Some(piece)))).expect(
370                                    "This future isn't polled after receiver is dropped; qed",
371                                );
372                            } else {
373                                warn!(
374                                    %piece_cache_id,
375                                    %piece_offset,
376                                    "Failed to get piece from cache, it was missing or already gone"
377                                );
378                            }
379                        }
380                    }
381                })
382                .collect::<FuturesUnordered<_>>()
383                // Simply drain everything
384                .for_each(|()| async {})
385                .await;
386
387            let mut piece_indices_to_get = piece_indices_to_get.into_inner();
388            if piece_indices_to_get.is_empty() {
389                return;
390            }
391
392            let mut pieces_from_controller = match self
393                .nats_client
394                .stream_request(
395                    &ClusterControllerPiecesRequest {
396                        piece_indices: piece_indices_to_get.iter().copied().collect(),
397                    },
398                    None,
399                )
400                .await
401            {
402                Ok(pieces_from_controller) => pieces_from_controller,
403                Err(error) => {
404                    error!(%error, "Failed to get pieces from controller");
405
406                    for piece_index in piece_indices_to_get {
407                        tx.unbounded_send((
408                            piece_index,
409                            Err(anyhow::anyhow!("Failed to get piece from controller")),
410                        ))
411                        .expect("This future isn't polled after receiver is dropped; qed");
412                    }
413                    return;
414                }
415            };
416
417            while let Some((piece_index, piece)) = pieces_from_controller.next().await {
418                piece_indices_to_get.remove(&piece_index);
419                tx.unbounded_send((piece_index, Ok(Some(piece))))
420                    .expect("This future isn't polled after receiver is dropped; qed");
421            }
422
423            for piece_index in piece_indices_to_get {
424                tx.unbounded_send((piece_index, Err(anyhow::anyhow!("Failed to get piece"))))
425                    .expect("This future isn't polled after receiver is dropped; qed");
426            }
427        };
428        let mut fut = Box::pin(fut.fuse());
429
430        // Drive above future and stream back any pieces that were downloaded so far
431        Ok(Box::new(stream::poll_fn(move |cx| {
432            if !fut.is_terminated() {
433                // Result doesn't matter, we'll need to poll stream below anyway
434                let _ = fut.poll_unpin(cx);
435            }
436
437            if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) {
438                return Poll::Ready(maybe_result);
439            }
440
441            // Exit will be done by the stream above
442            Poll::Pending
443        })))
444    }
445}
446
447impl ClusterPieceGetter {
448    /// Create a new instance
449    #[inline]
450    pub fn new(nats_client: NatsClient, cache_group: Option<String>) -> Self {
451        Self {
452            nats_client,
453            cache_group: cache_group.unwrap_or_else(|| GLOBAL_CACHE_GROUP.to_string()),
454        }
455    }
456}
457
458/// [`NodeClient`] used in cluster environment that connects to node through a controller instead
459/// of to the node directly
460#[derive(Debug, Clone)]
461pub struct ClusterNodeClient {
462    nats_client: NatsClient,
463    // Store last slot info instance that can be used to send solution response to (some instances
464    // may be not synced and not able to receive solution responses)
465    last_slot_info_instance: Arc<Mutex<String>>,
466}
467
468impl ClusterNodeClient {
469    /// Create a new instance
470    pub fn new(nats_client: NatsClient) -> Self {
471        Self {
472            nats_client,
473            last_slot_info_instance: Arc::default(),
474        }
475    }
476}
477
478#[async_trait]
479impl NodeClient for ClusterNodeClient {
480    async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
481        Ok(self
482            .nats_client
483            .request(&ClusterControllerFarmerAppInfoRequest, None)
484            .await?
485            .map_err(anyhow::Error::msg)?)
486    }
487
488    async fn subscribe_slot_info(
489        &self,
490    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
491        let subscription = self
492            .nats_client
493            .subscribe_to_broadcasts::<ClusterControllerSlotInfoBroadcast>(None, None)
494            .await?
495            .filter_map({
496                let mut last_slot_number = None;
497                let last_slot_info_instance = Arc::clone(&self.last_slot_info_instance);
498
499                move |broadcast| {
500                    let slot_info = broadcast.slot_info;
501
502                    let maybe_slot_info = if let Some(last_slot_number) = last_slot_number
503                        && last_slot_number >= slot_info.slot
504                    {
505                        None
506                    } else {
507                        last_slot_number.replace(slot_info.slot);
508                        *last_slot_info_instance.lock() = broadcast.instance;
509
510                        Some(slot_info)
511                    };
512
513                    async move { maybe_slot_info }
514                }
515            });
516
517        Ok(Box::pin(subscription))
518    }
519
520    async fn submit_solution_response(
521        &self,
522        solution_response: SolutionResponse,
523    ) -> anyhow::Result<()> {
524        let last_slot_info_instance = self.last_slot_info_instance.lock().clone();
525        Ok(self
526            .nats_client
527            .notification(
528                &ClusterControllerSolutionNotification { solution_response },
529                Some(&last_slot_info_instance),
530            )
531            .await?)
532    }
533
534    async fn subscribe_block_sealing(
535        &self,
536    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = BlockSealInfo> + Send + 'static>>> {
537        let subscription = self
538            .nats_client
539            .subscribe_to_broadcasts::<ClusterControllerBlockSealingBroadcast>(None, None)
540            .await?
541            .map(|broadcast| broadcast.block_sealing_info);
542
543        Ok(Box::pin(subscription))
544    }
545
546    async fn submit_block_seal(&self, block_seal: BlockSealResponse) -> anyhow::Result<()> {
547        Ok(self
548            .nats_client
549            .notification(&ClusterControllerBlockSealNotification { block_seal }, None)
550            .await?)
551    }
552
553    async fn subscribe_new_super_segment_headers(
554        &self,
555    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SuperSegmentHeader> + Send + 'static>>> {
556        let subscription = self
557            .nats_client
558            .subscribe_to_broadcasts::<ClusterControllerNewSuperSegmentHeaderBroadcast>(None, None)
559            .await?
560            .filter_map({
561                let mut last_super_segment_index = None;
562
563                move |broadcast| {
564                    let new_super_segment_header = broadcast.new_super_segment_header;
565                    let super_segment_index = new_super_segment_header.index.as_inner();
566
567                    let maybe_super_segment_header = if let Some(last_super_segment_index) =
568                        last_super_segment_index
569                        && last_super_segment_index >= super_segment_index
570                    {
571                        None
572                    } else {
573                        last_super_segment_index.replace(super_segment_index);
574
575                        Some(new_super_segment_header)
576                    };
577
578                    async move { maybe_super_segment_header }
579                }
580            });
581
582        Ok(Box::pin(subscription))
583    }
584
585    async fn super_segment_headers(
586        &self,
587        super_segment_indices: Vec<SuperSegmentIndex>,
588    ) -> anyhow::Result<Vec<Option<SuperSegmentHeader>>> {
589        Ok(self
590            .nats_client
591            .request(
592                &ClusterControllerSuperSegmentHeadersRequest {
593                    super_segment_indices,
594                },
595                None,
596            )
597            .await?)
598    }
599
600    async fn super_segment_root_for_segment_index(
601        &self,
602        segment_index: SegmentIndex,
603    ) -> anyhow::Result<Option<SuperSegmentRoot>> {
604        Ok(self
605            .nats_client
606            .request(
607                &ClusterControllerSuperSegmentRootForSegmentIndexRequest { segment_index },
608                None,
609            )
610            .await?)
611    }
612
613    async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
614        Ok(self
615            .nats_client
616            .request(&ClusterControllerPieceRequest { piece_index }, None)
617            .await?)
618    }
619
620    async fn update_shard_membership_info(
621        &self,
622        _info: FarmerShardMembershipInfo,
623    ) -> anyhow::Result<()> {
624        // Controller aggregates these on its own
625        Ok(())
626    }
627}
628
629/// Create controller service that handles things like broadcasting information (for example slot
630/// notifications) as well as responding to incoming requests (like piece requests).
631///
632/// Implementation is using concurrency with multiple tokio tasks, but can be started multiple times
633/// per controller instance in order to parallelize more work across threads if needed.
634pub async fn controller_service<NC, PG>(
635    nats_client: &NatsClient,
636    node_client: &NC,
637    piece_getter: &PG,
638    farmer_caches: &[(&str, &FarmerCache)],
639    instance: &str,
640    primary_instance: bool,
641) -> anyhow::Result<()>
642where
643    NC: NodeClient,
644    PG: PieceGetter + Sync,
645{
646    if primary_instance {
647        select! {
648            result = slot_info_broadcaster(nats_client, node_client, instance).fuse() => {
649                result
650            },
651            result = block_sealing_broadcaster(nats_client, node_client, instance).fuse() => {
652                result
653            },
654            result = new_super_segment_headers_broadcaster(nats_client, node_client, instance).fuse() => {
655                result
656            },
657            result = solution_response_forwarder(nats_client, node_client, instance).fuse() => {
658                result
659            },
660            result = block_seal_forwarder(nats_client, node_client, instance).fuse() => {
661                result
662            },
663            result = farmer_app_info_responder(nats_client, node_client).fuse() => {
664                result
665            },
666            result = super_segment_headers_responder(nats_client, node_client).fuse() => {
667                result
668            },
669            result = super_segment_root_for_segment_index_responder(nats_client, node_client).fuse() => {
670                result
671            },
672            result = find_piece_responder(nats_client, farmer_caches).fuse() => {
673                result
674            },
675            result = find_pieces_responder(nats_client, farmer_caches).fuse() => {
676                result
677            },
678            result = piece_responder(nats_client, piece_getter).fuse() => {
679                result
680            },
681            result = pieces_responder(nats_client, piece_getter).fuse() => {
682                result
683            },
684        }
685    } else {
686        select! {
687            result = farmer_app_info_responder(nats_client, node_client).fuse() => {
688                result
689            },
690            result = super_segment_headers_responder(nats_client, node_client).fuse() => {
691                result
692            },
693            result = super_segment_root_for_segment_index_responder(nats_client, node_client).fuse() => {
694                result
695            },
696            result = find_piece_responder(nats_client, farmer_caches).fuse() => {
697                result
698            },
699            result = find_pieces_responder(nats_client, farmer_caches).fuse() => {
700                result
701            },
702            result = piece_responder(nats_client, piece_getter).fuse() => {
703                result
704            },
705            result = pieces_responder(nats_client, piece_getter).fuse() => {
706                result
707            },
708        }
709    }
710}
711
712async fn slot_info_broadcaster<NC>(
713    nats_client: &NatsClient,
714    node_client: &NC,
715    instance: &str,
716) -> anyhow::Result<()>
717where
718    NC: NodeClient,
719{
720    let mut slot_info_notifications = node_client
721        .subscribe_slot_info()
722        .await
723        .map_err(|error| anyhow!("Failed to subscribe to slot info notifications: {error}"))?;
724
725    while let Some(slot_info) = slot_info_notifications.next().await {
726        debug!(?slot_info, "New slot");
727
728        let slot = slot_info.slot;
729
730        if let Err(error) = nats_client
731            .broadcast(
732                &ClusterControllerSlotInfoBroadcast {
733                    slot_info,
734                    instance: instance.to_string(),
735                },
736                instance,
737            )
738            .await
739        {
740            warn!(%slot, %error, "Failed to broadcast slot info");
741        }
742    }
743
744    Ok(())
745}
746
747async fn block_sealing_broadcaster<NC>(
748    nats_client: &NatsClient,
749    node_client: &NC,
750    instance: &str,
751) -> anyhow::Result<()>
752where
753    NC: NodeClient,
754{
755    let mut block_sealing_subscription = node_client
756        .subscribe_block_sealing()
757        .await
758        .map_err(|error| anyhow!("Failed to subscribe to block sealing notifications: {error}"))?;
759
760    while let Some(block_sealing_info) = block_sealing_subscription.next().await {
761        trace!(?block_sealing_info, "New block sealing notification");
762
763        if let Err(error) = nats_client
764            .broadcast(
765                &ClusterControllerBlockSealingBroadcast { block_sealing_info },
766                instance,
767            )
768            .await
769        {
770            warn!(%error, "Failed to broadcast block sealing info");
771        }
772    }
773
774    Ok(())
775}
776
777async fn new_super_segment_headers_broadcaster<NC>(
778    nats_client: &NatsClient,
779    node_client: &NC,
780    instance: &str,
781) -> anyhow::Result<()>
782where
783    NC: NodeClient,
784{
785    let mut new_super_segments_notifications = node_client
786        .subscribe_new_super_segment_headers()
787        .await
788        .map_err(|error| {
789            anyhow!("Failed to subscribe to new super segment header notifications: {error}")
790        })?;
791
792    while let Some(new_super_segment_header) = new_super_segments_notifications.next().await {
793        trace!(
794            ?new_super_segment_header,
795            "New archived new super segment header notification"
796        );
797
798        if let Err(error) = nats_client
799            .broadcast(
800                &ClusterControllerNewSuperSegmentHeaderBroadcast {
801                    new_super_segment_header,
802                },
803                instance,
804            )
805            .await
806        {
807            warn!(%error, "Failed to broadcast new super segment header info");
808        }
809    }
810
811    Ok(())
812}
813
814async fn solution_response_forwarder<NC>(
815    nats_client: &NatsClient,
816    node_client: &NC,
817    instance: &str,
818) -> anyhow::Result<()>
819where
820    NC: NodeClient,
821{
822    let mut subscription = nats_client
823        .subscribe_to_notifications::<ClusterControllerSolutionNotification>(
824            Some(instance),
825            Some(instance.to_string()),
826        )
827        .await
828        .map_err(|error| anyhow!("Failed to subscribe to solution notifications: {error}"))?;
829
830    while let Some(notification) = subscription.next().await {
831        debug!(?notification, "Solution notification");
832
833        let slot = notification.solution_response.slot_number;
834        let public_key_hash = notification.solution_response.solution.public_key_hash;
835        let sector_index = notification.solution_response.solution.sector_index;
836
837        if let Err(error) = node_client
838            .submit_solution_response(notification.solution_response)
839            .await
840        {
841            warn!(
842                %error,
843                %slot,
844                %public_key_hash,
845                %sector_index,
846                "Failed to send solution response"
847            );
848        }
849    }
850
851    Ok(())
852}
853
854async fn block_seal_forwarder<NC>(
855    nats_client: &NatsClient,
856    node_client: &NC,
857    instance: &str,
858) -> anyhow::Result<()>
859where
860    NC: NodeClient,
861{
862    let mut subscription = nats_client
863        .subscribe_to_notifications::<ClusterControllerBlockSealNotification>(
864            None,
865            Some(instance.to_string()),
866        )
867        .await
868        .map_err(|error| anyhow!("Failed to subscribe to block seal notifications: {error}"))?;
869
870    while let Some(notification) = subscription.next().await {
871        debug!(?notification, "Block seal notification");
872
873        if let Err(error) = node_client.submit_block_seal(notification.block_seal).await {
874            warn!(%error, "Failed to send block seal");
875        }
876    }
877
878    Ok(())
879}
880
881async fn farmer_app_info_responder<NC>(
882    nats_client: &NatsClient,
883    node_client: &NC,
884) -> anyhow::Result<()>
885where
886    NC: NodeClient,
887{
888    nats_client
889        .request_responder(
890            None,
891            Some("ab.controller".to_string()),
892            |_: ClusterControllerFarmerAppInfoRequest| async move {
893                Some(
894                    node_client
895                        .farmer_app_info()
896                        .await
897                        .map_err(|error| error.to_string()),
898                )
899            },
900        )
901        .await
902}
903
904async fn super_segment_headers_responder<NC>(
905    nats_client: &NatsClient,
906    node_client: &NC,
907) -> anyhow::Result<()>
908where
909    NC: NodeClient,
910{
911    nats_client
912        .request_responder(
913            None,
914            Some("ab.controller".to_string()),
915            |ClusterControllerSuperSegmentHeadersRequest { super_segment_indices }| async move {
916                node_client
917                    .super_segment_headers(super_segment_indices.clone())
918                    .await
919                    .inspect_err(|error| {
920                        warn!(%error, ?super_segment_indices, "Failed to get super segment headers");
921                    })
922                    .ok()
923            },
924        )
925        .await
926}
927
928async fn super_segment_root_for_segment_index_responder<NC>(
929    nats_client: &NatsClient,
930    node_client: &NC,
931) -> anyhow::Result<()>
932where
933    NC: NodeClient,
934{
935    nats_client
936        .request_responder(
937            None,
938            Some("ab.controller".to_string()),
939            |ClusterControllerSuperSegmentRootForSegmentIndexRequest { segment_index }| async move {
940                node_client
941                    .super_segment_root_for_segment_index(segment_index)
942                    .await
943                    .inspect_err(|error| {
944                        warn!(
945                            %error,
946                            %segment_index,
947                            "Failed to get super segment root for segment index"
948                        );
949                    })
950                    .ok()
951            },
952        )
953        .await
954}
955
956async fn find_piece_responder(
957    nats_client: &NatsClient,
958    farmer_caches: &[(&str, &FarmerCache)],
959) -> anyhow::Result<()> {
960    futures::future::try_join(
961        farmer_caches
962            .iter()
963            .map(|(cache_group, farmer_cache)| {
964                nats_client.request_responder(
965                    Some(cache_group),
966                    Some("ab.controller".to_string()),
967                    move |ClusterControllerFindPieceInCacheRequest { piece_index }| async move {
968                        Some(farmer_cache.find_piece(piece_index).await)
969                    },
970                )
971            })
972            .collect::<FuturesUnordered<_>>()
973            .next()
974            .map(|result| result.unwrap_or(Ok(()))),
975        nats_client.request_responder(
976            Some(GLOBAL_CACHE_GROUP),
977            Some("ab.controller".to_string()),
978            |ClusterControllerFindPieceInCacheRequest { piece_index }| async move {
979                let (_cache_group, farmer_cache) = farmer_caches.iter().choose(&mut rand::rng())?;
980                Some(farmer_cache.find_piece(piece_index).await)
981            },
982        ),
983    )
984    .await
985    .map(|((), ())| ())
986}
987
988async fn find_pieces_responder(
989    nats_client: &NatsClient,
990    farmer_caches: &[(&str, &FarmerCache)],
991) -> anyhow::Result<()> {
992    futures::future::try_join(
993        farmer_caches
994            .iter()
995            .map(|(cache_group, farmer_cache)| {
996                nats_client.stream_request_responder(
997                    Some(cache_group),
998                    Some("ab.controller".to_string()),
999                    move |ClusterControllerFindPiecesInCacheRequest { piece_indices }| async move {
1000                        Some(stream::iter(farmer_cache.find_pieces(piece_indices).await))
1001                    },
1002                )
1003            })
1004            .collect::<FuturesUnordered<_>>()
1005            .next()
1006            .map(|result| result.unwrap_or(Ok(()))),
1007        nats_client.stream_request_responder(
1008            Some(GLOBAL_CACHE_GROUP),
1009            Some("ab.controller".to_string()),
1010            |ClusterControllerFindPiecesInCacheRequest { piece_indices }| async move {
1011                let (_cache_group, farmer_cache) = farmer_caches.iter().choose(&mut rand::rng())?;
1012                Some(stream::iter(farmer_cache.find_pieces(piece_indices).await))
1013            },
1014        ),
1015    )
1016    .await
1017    .map(|((), ())| ())
1018}
1019
1020async fn piece_responder<PG>(nats_client: &NatsClient, piece_getter: &PG) -> anyhow::Result<()>
1021where
1022    PG: PieceGetter + Sync,
1023{
1024    nats_client
1025        .request_responder(
1026            None,
1027            Some("ab.controller".to_string()),
1028            |ClusterControllerPieceRequest { piece_index }| async move {
1029                piece_getter
1030                    .get_piece(piece_index)
1031                    .await
1032                    .inspect_err(|error| warn!(%error, %piece_index, "Failed to get piece"))
1033                    .ok()
1034            },
1035        )
1036        .await
1037}
1038
1039async fn pieces_responder<PG>(nats_client: &NatsClient, piece_getter: &PG) -> anyhow::Result<()>
1040where
1041    PG: PieceGetter + Sync,
1042{
1043    nats_client
1044        .stream_request_responder(
1045            None,
1046            Some("ab.controller".to_string()),
1047            |ClusterControllerPiecesRequest { piece_indices }| async move {
1048                piece_getter
1049                    .get_pieces(piece_indices)
1050                    .await
1051                    .map(|stream| {
1052                        Box::pin(stream.filter_map(
1053                            |(piece_index, maybe_piece_result)| async move {
1054                                match maybe_piece_result {
1055                                    Ok(Some(piece)) => Some((piece_index, piece)),
1056                                    Ok(None) => None,
1057                                    Err(error) => {
1058                                        warn!(%error, %piece_index, "Failed to get piece");
1059                                        None
1060                                    }
1061                                }
1062                            },
1063                        ))
1064                    })
1065                    .inspect_err(|error| warn!(%error, "Failed to get pieces"))
1066                    .ok()
1067            },
1068        )
1069        .await
1070}