ab_farmer/cluster/
controller.rs

1//! Farming cluster controller
2//!
3//! Controller is responsible for managing farming cluster.
4//!
5//! This module exposes some data structures for NATS communication, custom piece getter and node
6//! client implementations designed to work with cluster controller and a service function to drive
7//! the backend part of the controller.
8
9pub mod caches;
10pub mod farms;
11mod stream_map;
12
13use crate::cluster::cache::{ClusterCacheReadPieceRequest, ClusterCacheReadPiecesRequest};
14use crate::cluster::nats_client::{
15    GenericBroadcast, GenericNotification, GenericRequest, GenericStreamRequest, NatsClient,
16};
17use crate::farm::{PieceCacheId, PieceCacheOffset};
18use crate::farmer_cache::FarmerCache;
19use crate::node_client::NodeClient;
20use ab_core_primitives::pieces::{Piece, PieceIndex};
21use ab_core_primitives::segments::{SegmentHeader, SegmentIndex};
22use ab_data_retrieval::piece_getter::PieceGetter;
23use ab_farmer_rpc_primitives::{
24    BlockSealInfo, BlockSealResponse, FarmerAppInfo, FarmerShardMembershipInfo, SlotInfo,
25    SolutionResponse,
26};
27use anyhow::anyhow;
28use async_nats::HeaderValue;
29use async_trait::async_trait;
30use futures::channel::mpsc;
31use futures::future::FusedFuture;
32use futures::stream::FuturesUnordered;
33use futures::{FutureExt, Stream, StreamExt, select, stream};
34use parity_scale_codec::{Decode, Encode};
35use parking_lot::Mutex;
36use rand::prelude::*;
37use std::collections::{HashMap, HashSet};
38use std::pin::Pin;
39use std::sync::Arc;
40use std::task::Poll;
41use tracing::{debug, error, trace, warn};
42
43/// Special "cache group" that all controllers subscribe to and that can be used to query any cache
44/// group. The cache group for each query is chosen at random.
45const GLOBAL_CACHE_GROUP: &str = "_";
46
47/// Broadcast sent by controllers requesting farmers to identify themselves
48#[derive(Debug, Copy, Clone, Encode, Decode)]
49pub struct ClusterControllerFarmerIdentifyBroadcast;
50
51impl GenericBroadcast for ClusterControllerFarmerIdentifyBroadcast {
52    const SUBJECT: &'static str = "ab.controller.farmer-identify";
53}
54
55/// Broadcast sent by controllers requesting caches in cache group to identify themselves
56#[derive(Debug, Copy, Clone, Encode, Decode)]
57pub struct ClusterControllerCacheIdentifyBroadcast;
58
59impl GenericBroadcast for ClusterControllerCacheIdentifyBroadcast {
60    /// `*` here stands for cache group
61    const SUBJECT: &'static str = "ab.controller.*.cache-identify";
62}
63
64/// Broadcast with slot info sent by controllers
65#[derive(Debug, Clone, Encode, Decode)]
66struct ClusterControllerSlotInfoBroadcast {
67    slot_info: SlotInfo,
68    instance: String,
69}
70
71impl GenericBroadcast for ClusterControllerSlotInfoBroadcast {
72    const SUBJECT: &'static str = "ab.controller.slot-info";
73
74    fn deterministic_message_id(&self) -> Option<HeaderValue> {
75        // TODO: Depending on answer in `https://github.com/nats-io/nats.docs/issues/663` this might
76        //  be simplified to just a slot number
77        Some(HeaderValue::from(format!(
78            "slot-info-{}",
79            self.slot_info.slot
80        )))
81    }
82}
83
84/// Broadcast with block sealing info by controllers
85#[derive(Debug, Clone, Encode, Decode)]
86struct ClusterControllerBlockSealingBroadcast {
87    block_sealing_info: BlockSealInfo,
88}
89
90impl GenericBroadcast for ClusterControllerBlockSealingBroadcast {
91    const SUBJECT: &'static str = "ab.controller.block-sealing-info";
92}
93
94/// Broadcast with archived segment headers by controllers
95#[derive(Debug, Clone, Encode, Decode)]
96struct ClusterControllerArchivedSegmentHeaderBroadcast {
97    archived_segment_header: SegmentHeader,
98}
99
100impl GenericBroadcast for ClusterControllerArchivedSegmentHeaderBroadcast {
101    const SUBJECT: &'static str = "ab.controller.archived-segment-header";
102
103    fn deterministic_message_id(&self) -> Option<HeaderValue> {
104        // TODO: Depending on answer in `https://github.com/nats-io/nats.docs/issues/663` this might
105        //  be simplified to just a segment index
106        Some(HeaderValue::from(format!(
107            "archived-segment-{}",
108            self.archived_segment_header.segment_index
109        )))
110    }
111}
112
113/// Notification messages with solution by farmers
114#[derive(Debug, Clone, Encode, Decode)]
115struct ClusterControllerSolutionNotification {
116    solution_response: SolutionResponse,
117}
118
119impl GenericNotification for ClusterControllerSolutionNotification {
120    const SUBJECT: &'static str = "ab.controller.*.solution";
121}
122
123/// Notification messages with block seal by farmers
124#[derive(Debug, Clone, Encode, Decode)]
125struct ClusterControllerBlockSealNotification {
126    block_seal: BlockSealResponse,
127}
128
129impl GenericNotification for ClusterControllerBlockSealNotification {
130    const SUBJECT: &'static str = "ab.controller.block-seal";
131}
132
133/// Request farmer app info from controller
134#[derive(Debug, Clone, Encode, Decode)]
135struct ClusterControllerFarmerAppInfoRequest;
136
137impl GenericRequest for ClusterControllerFarmerAppInfoRequest {
138    const SUBJECT: &'static str = "ab.controller.farmer-app-info";
139    type Response = Result<FarmerAppInfo, String>;
140}
141
142/// Request segment headers with specified segment indices
143#[derive(Debug, Clone, Encode, Decode)]
144struct ClusterControllerSegmentHeadersRequest {
145    segment_indices: Vec<SegmentIndex>,
146}
147
148impl GenericRequest for ClusterControllerSegmentHeadersRequest {
149    const SUBJECT: &'static str = "ab.controller.segment-headers";
150    type Response = Vec<Option<SegmentHeader>>;
151}
152
153/// Find piece with specified index in cache
154#[derive(Debug, Clone, Encode, Decode)]
155struct ClusterControllerFindPieceInCacheRequest {
156    piece_index: PieceIndex,
157}
158
159impl GenericRequest for ClusterControllerFindPieceInCacheRequest {
160    const SUBJECT: &'static str = "ab.controller.*.find-piece-in-cache";
161    type Response = Option<(PieceCacheId, PieceCacheOffset)>;
162}
163
164/// Find pieces with specified indices in cache
165#[derive(Debug, Clone, Encode, Decode)]
166struct ClusterControllerFindPiecesInCacheRequest {
167    piece_indices: Vec<PieceIndex>,
168}
169
170impl GenericStreamRequest for ClusterControllerFindPiecesInCacheRequest {
171    const SUBJECT: &'static str = "ab.controller.*.find-pieces-in-cache";
172    /// Only pieces that were found are returned
173    type Response = (PieceIndex, PieceCacheId, PieceCacheOffset);
174}
175
176/// Request piece with specified index
177#[derive(Debug, Clone, Encode, Decode)]
178struct ClusterControllerPieceRequest {
179    piece_index: PieceIndex,
180}
181
182impl GenericRequest for ClusterControllerPieceRequest {
183    const SUBJECT: &'static str = "ab.controller.piece";
184    type Response = Option<Piece>;
185}
186
187/// Request pieces with specified index
188#[derive(Debug, Clone, Encode, Decode)]
189struct ClusterControllerPiecesRequest {
190    piece_indices: Vec<PieceIndex>,
191}
192
193impl GenericStreamRequest for ClusterControllerPiecesRequest {
194    const SUBJECT: &'static str = "ab.controller.pieces";
195    /// Only pieces that were found are returned
196    type Response = (PieceIndex, Piece);
197}
198
199/// Cluster piece getter
200#[derive(Debug, Clone)]
201pub struct ClusterPieceGetter {
202    nats_client: NatsClient,
203    cache_group: String,
204}
205
206#[async_trait]
207impl PieceGetter for ClusterPieceGetter {
208    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
209        if let Some((piece_cache_id, piece_cache_offset)) = self
210            .nats_client
211            .request(
212                &ClusterControllerFindPieceInCacheRequest { piece_index },
213                Some(&self.cache_group),
214            )
215            .await?
216        {
217            trace!(
218                %piece_index,
219                %piece_cache_id,
220                %piece_cache_offset,
221                "Found piece in cache, retrieving"
222            );
223
224            match self
225                .nats_client
226                .request(
227                    &ClusterCacheReadPieceRequest {
228                        offset: piece_cache_offset,
229                    },
230                    Some(&piece_cache_id.to_string()),
231                )
232                .await
233                .map_err(|error| error.to_string())
234                .flatten()
235            {
236                Ok(Some((retrieved_piece_index, piece))) => {
237                    if retrieved_piece_index == piece_index {
238                        trace!(
239                            %piece_index,
240                            %piece_cache_id,
241                            %piece_cache_offset,
242                            "Retrieved piece from cache successfully"
243                        );
244
245                        return Ok(Some(piece));
246                    } else {
247                        trace!(
248                            %piece_index,
249                            %piece_cache_id,
250                            %piece_cache_offset,
251                            "Retrieving piece was replaced in cache during retrieval"
252                        );
253                    }
254                }
255                Ok(None) => {
256                    trace!(
257                        %piece_index,
258                        %piece_cache_id,
259                        %piece_cache_offset,
260                        "Piece cache didn't have piece at offset"
261                    );
262                }
263                Err(error) => {
264                    debug!(
265                        %piece_index,
266                        %piece_cache_id,
267                        %piece_cache_offset,
268                        %error,
269                        "Retrieving piece from cache failed"
270                    );
271                }
272            }
273        } else {
274            trace!(%piece_index, "Piece not found in cache");
275        }
276
277        Ok(self
278            .nats_client
279            .request(&ClusterControllerPieceRequest { piece_index }, None)
280            .await?)
281    }
282
283    async fn get_pieces<'a>(
284        &'a self,
285        piece_indices: Vec<PieceIndex>,
286    ) -> anyhow::Result<
287        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
288    > {
289        let (tx, mut rx) = mpsc::unbounded();
290
291        let piece_indices_to_get =
292            Mutex::new(piece_indices.iter().copied().collect::<HashSet<_>>());
293
294        let mut cached_pieces_by_cache_id = HashMap::<PieceCacheId, Vec<PieceCacheOffset>>::new();
295
296        {
297            let mut cached_pieces = self
298                .nats_client
299                .stream_request(
300                    &ClusterControllerFindPiecesInCacheRequest { piece_indices },
301                    Some(&self.cache_group),
302                )
303                .await?;
304
305            while let Some((_piece_index, piece_cache_id, piece_cache_offset)) =
306                cached_pieces.next().await
307            {
308                cached_pieces_by_cache_id
309                    .entry(piece_cache_id)
310                    .or_default()
311                    .push(piece_cache_offset);
312            }
313        }
314
315        let fut = async move {
316            let tx = &tx;
317
318            cached_pieces_by_cache_id
319                .into_iter()
320                .map(|(piece_cache_id, offsets)| {
321                    let piece_indices_to_get = &piece_indices_to_get;
322
323                    async move {
324                        let mut pieces_stream = match self
325                            .nats_client
326                            .stream_request(
327                                &ClusterCacheReadPiecesRequest { offsets },
328                                Some(&piece_cache_id.to_string()),
329                            )
330                            .await
331                        {
332                            Ok(pieces) => pieces,
333                            Err(error) => {
334                                warn!(
335                                    %error,
336                                    %piece_cache_id,
337                                    "Failed to request pieces from cache"
338                                );
339
340                                return;
341                            }
342                        };
343
344                        while let Some(piece_result) = pieces_stream.next().await {
345                            let (piece_offset, maybe_piece) = match piece_result {
346                                Ok(result) => result,
347                                Err(error) => {
348                                    warn!(%error, "Failed to get piece from cache");
349                                    continue;
350                                }
351                            };
352
353                            if let Some((piece_index, piece)) = maybe_piece {
354                                piece_indices_to_get.lock().remove(&piece_index);
355
356                                tx.unbounded_send((piece_index, Ok(Some(piece)))).expect(
357                                    "This future isn't polled after receiver is dropped; qed",
358                                );
359                            } else {
360                                warn!(
361                                    %piece_cache_id,
362                                    %piece_offset,
363                                    "Failed to get piece from cache, it was missing or already gone"
364                                );
365                            }
366                        }
367                    }
368                })
369                .collect::<FuturesUnordered<_>>()
370                // Simply drain everything
371                .for_each(|()| async {})
372                .await;
373
374            let mut piece_indices_to_get = piece_indices_to_get.into_inner();
375            if piece_indices_to_get.is_empty() {
376                return;
377            }
378
379            let mut pieces_from_controller = match self
380                .nats_client
381                .stream_request(
382                    &ClusterControllerPiecesRequest {
383                        piece_indices: piece_indices_to_get.iter().copied().collect(),
384                    },
385                    None,
386                )
387                .await
388            {
389                Ok(pieces_from_controller) => pieces_from_controller,
390                Err(error) => {
391                    error!(%error, "Failed to get pieces from controller");
392
393                    for piece_index in piece_indices_to_get {
394                        tx.unbounded_send((
395                            piece_index,
396                            Err(anyhow::anyhow!("Failed to get piece from controller")),
397                        ))
398                        .expect("This future isn't polled after receiver is dropped; qed");
399                    }
400                    return;
401                }
402            };
403
404            while let Some((piece_index, piece)) = pieces_from_controller.next().await {
405                piece_indices_to_get.remove(&piece_index);
406                tx.unbounded_send((piece_index, Ok(Some(piece))))
407                    .expect("This future isn't polled after receiver is dropped; qed");
408            }
409
410            for piece_index in piece_indices_to_get {
411                tx.unbounded_send((piece_index, Err(anyhow::anyhow!("Failed to get piece"))))
412                    .expect("This future isn't polled after receiver is dropped; qed");
413            }
414        };
415        let mut fut = Box::pin(fut.fuse());
416
417        // Drive above future and stream back any pieces that were downloaded so far
418        Ok(Box::new(stream::poll_fn(move |cx| {
419            if !fut.is_terminated() {
420                // Result doesn't matter, we'll need to poll stream below anyway
421                let _ = fut.poll_unpin(cx);
422            }
423
424            if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) {
425                return Poll::Ready(maybe_result);
426            }
427
428            // Exit will be done by the stream above
429            Poll::Pending
430        })))
431    }
432}
433
434impl ClusterPieceGetter {
435    /// Create a new instance
436    #[inline]
437    pub fn new(nats_client: NatsClient, cache_group: Option<String>) -> Self {
438        Self {
439            nats_client,
440            cache_group: cache_group.unwrap_or_else(|| GLOBAL_CACHE_GROUP.to_string()),
441        }
442    }
443}
444
445/// [`NodeClient`] used in cluster environment that connects to node through a controller instead
446/// of to the node directly
447#[derive(Debug, Clone)]
448pub struct ClusterNodeClient {
449    nats_client: NatsClient,
450    // Store last slot info instance that can be used to send solution response to (some instances
451    // may be not synced and not able to receive solution responses)
452    last_slot_info_instance: Arc<Mutex<String>>,
453}
454
455impl ClusterNodeClient {
456    /// Create a new instance
457    pub fn new(nats_client: NatsClient) -> Self {
458        Self {
459            nats_client,
460            last_slot_info_instance: Arc::default(),
461        }
462    }
463}
464
465#[async_trait]
466impl NodeClient for ClusterNodeClient {
467    async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
468        Ok(self
469            .nats_client
470            .request(&ClusterControllerFarmerAppInfoRequest, None)
471            .await?
472            .map_err(anyhow::Error::msg)?)
473    }
474
475    async fn subscribe_slot_info(
476        &self,
477    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
478        let subscription = self
479            .nats_client
480            .subscribe_to_broadcasts::<ClusterControllerSlotInfoBroadcast>(None, None)
481            .await?
482            .filter_map({
483                let mut last_slot_number = None;
484                let last_slot_info_instance = Arc::clone(&self.last_slot_info_instance);
485
486                move |broadcast| {
487                    let slot_info = broadcast.slot_info;
488
489                    let maybe_slot_info = if let Some(last_slot_number) = last_slot_number
490                        && last_slot_number >= slot_info.slot
491                    {
492                        None
493                    } else {
494                        last_slot_number.replace(slot_info.slot);
495                        *last_slot_info_instance.lock() = broadcast.instance;
496
497                        Some(slot_info)
498                    };
499
500                    async move { maybe_slot_info }
501                }
502            });
503
504        Ok(Box::pin(subscription))
505    }
506
507    async fn submit_solution_response(
508        &self,
509        solution_response: SolutionResponse,
510    ) -> anyhow::Result<()> {
511        let last_slot_info_instance = self.last_slot_info_instance.lock().clone();
512        Ok(self
513            .nats_client
514            .notification(
515                &ClusterControllerSolutionNotification { solution_response },
516                Some(&last_slot_info_instance),
517            )
518            .await?)
519    }
520
521    async fn subscribe_block_sealing(
522        &self,
523    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = BlockSealInfo> + Send + 'static>>> {
524        let subscription = self
525            .nats_client
526            .subscribe_to_broadcasts::<ClusterControllerBlockSealingBroadcast>(None, None)
527            .await?
528            .map(|broadcast| broadcast.block_sealing_info);
529
530        Ok(Box::pin(subscription))
531    }
532
533    async fn submit_block_seal(&self, block_seal: BlockSealResponse) -> anyhow::Result<()> {
534        Ok(self
535            .nats_client
536            .notification(&ClusterControllerBlockSealNotification { block_seal }, None)
537            .await?)
538    }
539
540    async fn subscribe_archived_segment_headers(
541        &self,
542    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> {
543        let subscription = self
544            .nats_client
545            .subscribe_to_broadcasts::<ClusterControllerArchivedSegmentHeaderBroadcast>(None, None)
546            .await?
547            .filter_map({
548                let mut last_archived_segment_index = None;
549
550                move |broadcast| {
551                    let archived_segment_header = broadcast.archived_segment_header;
552                    let segment_index = archived_segment_header.segment_index();
553
554                    let maybe_archived_segment_header = if let Some(last_archived_segment_index) =
555                        last_archived_segment_index
556                        && last_archived_segment_index >= segment_index
557                    {
558                        None
559                    } else {
560                        last_archived_segment_index.replace(segment_index);
561
562                        Some(archived_segment_header)
563                    };
564
565                    async move { maybe_archived_segment_header }
566                }
567            });
568
569        Ok(Box::pin(subscription))
570    }
571
572    async fn segment_headers(
573        &self,
574        segment_indices: Vec<SegmentIndex>,
575    ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
576        Ok(self
577            .nats_client
578            .request(
579                &ClusterControllerSegmentHeadersRequest { segment_indices },
580                None,
581            )
582            .await?)
583    }
584
585    async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
586        Ok(self
587            .nats_client
588            .request(&ClusterControllerPieceRequest { piece_index }, None)
589            .await?)
590    }
591
592    async fn acknowledge_archived_segment_header(
593        &self,
594        _segment_index: SegmentIndex,
595    ) -> anyhow::Result<()> {
596        // Acknowledgement is unnecessary/unsupported
597        Ok(())
598    }
599
600    async fn update_shard_membership_info(
601        &self,
602        _info: FarmerShardMembershipInfo,
603    ) -> anyhow::Result<()> {
604        // Controller aggregates these on its own
605        Ok(())
606    }
607}
608
609/// Create controller service that handles things like broadcasting information (for example slot
610/// notifications) as well as responding to incoming requests (like piece requests).
611///
612/// Implementation is using concurrency with multiple tokio tasks, but can be started multiple times
613/// per controller instance in order to parallelize more work across threads if needed.
614pub async fn controller_service<NC, PG>(
615    nats_client: &NatsClient,
616    node_client: &NC,
617    piece_getter: &PG,
618    farmer_caches: &[(&str, &FarmerCache)],
619    instance: &str,
620    primary_instance: bool,
621) -> anyhow::Result<()>
622where
623    NC: NodeClient,
624    PG: PieceGetter + Sync,
625{
626    if primary_instance {
627        select! {
628            result = slot_info_broadcaster(nats_client, node_client, instance).fuse() => {
629                result
630            },
631            result = block_sealing_broadcaster(nats_client, node_client, instance).fuse() => {
632                result
633            },
634            result = archived_segment_headers_broadcaster(nats_client, node_client, instance).fuse() => {
635                result
636            },
637            result = solution_response_forwarder(nats_client, node_client, instance).fuse() => {
638                result
639            },
640            result = block_seal_forwarder(nats_client, node_client, instance).fuse() => {
641                result
642            },
643            result = farmer_app_info_responder(nats_client, node_client).fuse() => {
644                result
645            },
646            result = segment_headers_responder(nats_client, node_client).fuse() => {
647                result
648            },
649            result = find_piece_responder(nats_client, farmer_caches).fuse() => {
650                result
651            },
652            result = find_pieces_responder(nats_client, farmer_caches).fuse() => {
653                result
654            },
655            result = piece_responder(nats_client, piece_getter).fuse() => {
656                result
657            },
658            result = pieces_responder(nats_client, piece_getter).fuse() => {
659                result
660            },
661        }
662    } else {
663        select! {
664            result = farmer_app_info_responder(nats_client, node_client).fuse() => {
665                result
666            },
667            result = segment_headers_responder(nats_client, node_client).fuse() => {
668                result
669            },
670            result = find_piece_responder(nats_client, farmer_caches).fuse() => {
671                result
672            },
673            result = find_pieces_responder(nats_client, farmer_caches).fuse() => {
674                result
675            },
676            result = piece_responder(nats_client, piece_getter).fuse() => {
677                result
678            },
679            result = pieces_responder(nats_client, piece_getter).fuse() => {
680                result
681            },
682        }
683    }
684}
685
686async fn slot_info_broadcaster<NC>(
687    nats_client: &NatsClient,
688    node_client: &NC,
689    instance: &str,
690) -> anyhow::Result<()>
691where
692    NC: NodeClient,
693{
694    let mut slot_info_notifications = node_client
695        .subscribe_slot_info()
696        .await
697        .map_err(|error| anyhow!("Failed to subscribe to slot info notifications: {error}"))?;
698
699    while let Some(slot_info) = slot_info_notifications.next().await {
700        debug!(?slot_info, "New slot");
701
702        let slot = slot_info.slot;
703
704        if let Err(error) = nats_client
705            .broadcast(
706                &ClusterControllerSlotInfoBroadcast {
707                    slot_info,
708                    instance: instance.to_string(),
709                },
710                instance,
711            )
712            .await
713        {
714            warn!(%slot, %error, "Failed to broadcast slot info");
715        }
716    }
717
718    Ok(())
719}
720
721async fn block_sealing_broadcaster<NC>(
722    nats_client: &NatsClient,
723    node_client: &NC,
724    instance: &str,
725) -> anyhow::Result<()>
726where
727    NC: NodeClient,
728{
729    let mut block_sealing_subscription = node_client
730        .subscribe_block_sealing()
731        .await
732        .map_err(|error| anyhow!("Failed to subscribe to block sealing notifications: {error}"))?;
733
734    while let Some(block_sealing_info) = block_sealing_subscription.next().await {
735        trace!(?block_sealing_info, "New block sealing notification");
736
737        if let Err(error) = nats_client
738            .broadcast(
739                &ClusterControllerBlockSealingBroadcast { block_sealing_info },
740                instance,
741            )
742            .await
743        {
744            warn!(%error, "Failed to broadcast block sealing info");
745        }
746    }
747
748    Ok(())
749}
750
751async fn archived_segment_headers_broadcaster<NC>(
752    nats_client: &NatsClient,
753    node_client: &NC,
754    instance: &str,
755) -> anyhow::Result<()>
756where
757    NC: NodeClient,
758{
759    let mut archived_segments_notifications = node_client
760        .subscribe_archived_segment_headers()
761        .await
762        .map_err(|error| {
763            anyhow!("Failed to subscribe to archived segment header notifications: {error}")
764        })?;
765
766    while let Some(archived_segment_header) = archived_segments_notifications.next().await {
767        trace!(
768            ?archived_segment_header,
769            "New archived archived segment header notification"
770        );
771
772        node_client
773            .acknowledge_archived_segment_header(archived_segment_header.segment_index())
774            .await
775            .map_err(|error| anyhow!("Failed to acknowledge archived segment header: {error}"))?;
776
777        if let Err(error) = nats_client
778            .broadcast(
779                &ClusterControllerArchivedSegmentHeaderBroadcast {
780                    archived_segment_header,
781                },
782                instance,
783            )
784            .await
785        {
786            warn!(%error, "Failed to broadcast archived segment header info");
787        }
788    }
789
790    Ok(())
791}
792
793async fn solution_response_forwarder<NC>(
794    nats_client: &NatsClient,
795    node_client: &NC,
796    instance: &str,
797) -> anyhow::Result<()>
798where
799    NC: NodeClient,
800{
801    let mut subscription = nats_client
802        .subscribe_to_notifications::<ClusterControllerSolutionNotification>(
803            Some(instance),
804            Some(instance.to_string()),
805        )
806        .await
807        .map_err(|error| anyhow!("Failed to subscribe to solution notifications: {error}"))?;
808
809    while let Some(notification) = subscription.next().await {
810        debug!(?notification, "Solution notification");
811
812        let slot = notification.solution_response.slot_number;
813        let public_key_hash = notification.solution_response.solution.public_key_hash;
814        let sector_index = notification.solution_response.solution.sector_index;
815
816        if let Err(error) = node_client
817            .submit_solution_response(notification.solution_response)
818            .await
819        {
820            warn!(
821                %error,
822                %slot,
823                %public_key_hash,
824                %sector_index,
825                "Failed to send solution response"
826            );
827        }
828    }
829
830    Ok(())
831}
832
833async fn block_seal_forwarder<NC>(
834    nats_client: &NatsClient,
835    node_client: &NC,
836    instance: &str,
837) -> anyhow::Result<()>
838where
839    NC: NodeClient,
840{
841    let mut subscription = nats_client
842        .subscribe_to_notifications::<ClusterControllerBlockSealNotification>(
843            None,
844            Some(instance.to_string()),
845        )
846        .await
847        .map_err(|error| anyhow!("Failed to subscribe to block seal notifications: {error}"))?;
848
849    while let Some(notification) = subscription.next().await {
850        debug!(?notification, "Block seal notification");
851
852        if let Err(error) = node_client.submit_block_seal(notification.block_seal).await {
853            warn!(%error, "Failed to send block seal");
854        }
855    }
856
857    Ok(())
858}
859
860async fn farmer_app_info_responder<NC>(
861    nats_client: &NatsClient,
862    node_client: &NC,
863) -> anyhow::Result<()>
864where
865    NC: NodeClient,
866{
867    nats_client
868        .request_responder(
869            None,
870            Some("ab.controller".to_string()),
871            |_: ClusterControllerFarmerAppInfoRequest| async move {
872                Some(
873                    node_client
874                        .farmer_app_info()
875                        .await
876                        .map_err(|error| error.to_string()),
877                )
878            },
879        )
880        .await
881}
882
883async fn segment_headers_responder<NC>(
884    nats_client: &NatsClient,
885    node_client: &NC,
886) -> anyhow::Result<()>
887where
888    NC: NodeClient,
889{
890    nats_client
891        .request_responder(
892            None,
893            Some("ab.controller".to_string()),
894            |ClusterControllerSegmentHeadersRequest { segment_indices }| async move {
895                node_client
896                    .segment_headers(segment_indices.clone())
897                    .await
898                    .inspect_err(|error| {
899                        warn!(%error, ?segment_indices, "Failed to get segment headers");
900                    })
901                    .ok()
902            },
903        )
904        .await
905}
906
907async fn find_piece_responder(
908    nats_client: &NatsClient,
909    farmer_caches: &[(&str, &FarmerCache)],
910) -> anyhow::Result<()> {
911    futures::future::try_join(
912        farmer_caches
913            .iter()
914            .map(|(cache_group, farmer_cache)| {
915                nats_client.request_responder(
916                    Some(cache_group),
917                    Some("ab.controller".to_string()),
918                    move |ClusterControllerFindPieceInCacheRequest { piece_index }| async move {
919                        Some(farmer_cache.find_piece(piece_index).await)
920                    },
921                )
922            })
923            .collect::<FuturesUnordered<_>>()
924            .next()
925            .map(|result| result.unwrap_or(Ok(()))),
926        nats_client.request_responder(
927            Some(GLOBAL_CACHE_GROUP),
928            Some("ab.controller".to_string()),
929            |ClusterControllerFindPieceInCacheRequest { piece_index }| async move {
930                let (_cache_group, farmer_cache) = farmer_caches.iter().choose(&mut rand::rng())?;
931                Some(farmer_cache.find_piece(piece_index).await)
932            },
933        ),
934    )
935    .await
936    .map(|((), ())| ())
937}
938
939async fn find_pieces_responder(
940    nats_client: &NatsClient,
941    farmer_caches: &[(&str, &FarmerCache)],
942) -> anyhow::Result<()> {
943    futures::future::try_join(
944        farmer_caches
945            .iter()
946            .map(|(cache_group, farmer_cache)| {
947                nats_client.stream_request_responder(
948                    Some(cache_group),
949                    Some("ab.controller".to_string()),
950                    move |ClusterControllerFindPiecesInCacheRequest { piece_indices }| async move {
951                        Some(stream::iter(farmer_cache.find_pieces(piece_indices).await))
952                    },
953                )
954            })
955            .collect::<FuturesUnordered<_>>()
956            .next()
957            .map(|result| result.unwrap_or(Ok(()))),
958        nats_client.stream_request_responder(
959            Some(GLOBAL_CACHE_GROUP),
960            Some("ab.controller".to_string()),
961            |ClusterControllerFindPiecesInCacheRequest { piece_indices }| async move {
962                let (_cache_group, farmer_cache) = farmer_caches.iter().choose(&mut rand::rng())?;
963                Some(stream::iter(farmer_cache.find_pieces(piece_indices).await))
964            },
965        ),
966    )
967    .await
968    .map(|((), ())| ())
969}
970
971async fn piece_responder<PG>(nats_client: &NatsClient, piece_getter: &PG) -> anyhow::Result<()>
972where
973    PG: PieceGetter + Sync,
974{
975    nats_client
976        .request_responder(
977            None,
978            Some("ab.controller".to_string()),
979            |ClusterControllerPieceRequest { piece_index }| async move {
980                piece_getter
981                    .get_piece(piece_index)
982                    .await
983                    .inspect_err(|error| warn!(%error, %piece_index, "Failed to get piece"))
984                    .ok()
985            },
986        )
987        .await
988}
989
990async fn pieces_responder<PG>(nats_client: &NatsClient, piece_getter: &PG) -> anyhow::Result<()>
991where
992    PG: PieceGetter + Sync,
993{
994    nats_client
995        .stream_request_responder(
996            None,
997            Some("ab.controller".to_string()),
998            |ClusterControllerPiecesRequest { piece_indices }| async move {
999                piece_getter
1000                    .get_pieces(piece_indices)
1001                    .await
1002                    .map(|stream| {
1003                        Box::pin(stream.filter_map(
1004                            |(piece_index, maybe_piece_result)| async move {
1005                                match maybe_piece_result {
1006                                    Ok(Some(piece)) => Some((piece_index, piece)),
1007                                    Ok(None) => None,
1008                                    Err(error) => {
1009                                        warn!(%error, %piece_index, "Failed to get piece");
1010                                        None
1011                                    }
1012                                }
1013                            },
1014                        ))
1015                    })
1016                    .inspect_err(|error| warn!(%error, "Failed to get pieces"))
1017                    .ok()
1018            },
1019        )
1020        .await
1021}