ab_farmer/cluster/
controller.rs

1//! Farming cluster controller
2//!
3//! Controller is responsible for managing farming cluster.
4//!
5//! This module exposes some data structures for NATS communication, custom piece getter and node
6//! client implementations designed to work with cluster controller and a service function to drive
7//! the backend part of the controller.
8
9pub mod caches;
10pub mod farms;
11mod stream_map;
12
13use crate::cluster::cache::{ClusterCacheReadPieceRequest, ClusterCacheReadPiecesRequest};
14use crate::cluster::nats_client::{
15    GenericBroadcast, GenericNotification, GenericRequest, GenericStreamRequest, NatsClient,
16};
17use crate::farm::{PieceCacheId, PieceCacheOffset};
18use crate::farmer_cache::FarmerCache;
19use crate::node_client::NodeClient;
20use ab_core_primitives::pieces::{Piece, PieceIndex};
21use ab_core_primitives::segments::{SegmentHeader, SegmentIndex};
22use ab_data_retrieval::piece_getter::PieceGetter;
23use ab_farmer_rpc_primitives::{
24    BlockSealInfo, BlockSealResponse, FarmerAppInfo, SlotInfo, SolutionResponse,
25};
26use anyhow::anyhow;
27use async_nats::HeaderValue;
28use async_trait::async_trait;
29use futures::channel::mpsc;
30use futures::future::FusedFuture;
31use futures::stream::FuturesUnordered;
32use futures::{FutureExt, Stream, StreamExt, select, stream};
33use parity_scale_codec::{Decode, Encode};
34use parking_lot::Mutex;
35use rand::prelude::*;
36use std::collections::{HashMap, HashSet};
37use std::pin::Pin;
38use std::sync::Arc;
39use std::task::Poll;
40use tracing::{debug, error, trace, warn};
41
42/// Special "cache group" that all controllers subscribe to and that can be used to query any cache
43/// group. The cache group for each query is chosen at random.
44const GLOBAL_CACHE_GROUP: &str = "_";
45
46/// Broadcast sent by controllers requesting farmers to identify themselves
47#[derive(Debug, Copy, Clone, Encode, Decode)]
48pub struct ClusterControllerFarmerIdentifyBroadcast;
49
50impl GenericBroadcast for ClusterControllerFarmerIdentifyBroadcast {
51    const SUBJECT: &'static str = "ab.controller.farmer-identify";
52}
53
54/// Broadcast sent by controllers requesting caches in cache group to identify themselves
55#[derive(Debug, Copy, Clone, Encode, Decode)]
56pub struct ClusterControllerCacheIdentifyBroadcast;
57
58impl GenericBroadcast for ClusterControllerCacheIdentifyBroadcast {
59    /// `*` here stands for cache group
60    const SUBJECT: &'static str = "ab.controller.*.cache-identify";
61}
62
63/// Broadcast with slot info sent by controllers
64#[derive(Debug, Clone, Encode, Decode)]
65struct ClusterControllerSlotInfoBroadcast {
66    slot_info: SlotInfo,
67    instance: String,
68}
69
70impl GenericBroadcast for ClusterControllerSlotInfoBroadcast {
71    const SUBJECT: &'static str = "ab.controller.slot-info";
72
73    fn deterministic_message_id(&self) -> Option<HeaderValue> {
74        // TODO: Depending on answer in `https://github.com/nats-io/nats.docs/issues/663` this might
75        //  be simplified to just a slot number
76        Some(HeaderValue::from(format!(
77            "slot-info-{}",
78            self.slot_info.slot_number
79        )))
80    }
81}
82
83/// Broadcast with block sealing info by controllers
84#[derive(Debug, Clone, Encode, Decode)]
85struct ClusterControllerBlockSealingBroadcast {
86    block_sealing_info: BlockSealInfo,
87}
88
89impl GenericBroadcast for ClusterControllerBlockSealingBroadcast {
90    const SUBJECT: &'static str = "ab.controller.block-sealing-info";
91}
92
93/// Broadcast with archived segment headers by controllers
94#[derive(Debug, Clone, Encode, Decode)]
95struct ClusterControllerArchivedSegmentHeaderBroadcast {
96    archived_segment_header: SegmentHeader,
97}
98
99impl GenericBroadcast for ClusterControllerArchivedSegmentHeaderBroadcast {
100    const SUBJECT: &'static str = "ab.controller.archived-segment-header";
101
102    fn deterministic_message_id(&self) -> Option<HeaderValue> {
103        // TODO: Depending on answer in `https://github.com/nats-io/nats.docs/issues/663` this might
104        //  be simplified to just a segment index
105        Some(HeaderValue::from(format!(
106            "archived-segment-{}",
107            self.archived_segment_header.segment_index
108        )))
109    }
110}
111
112/// Notification messages with solution by farmers
113#[derive(Debug, Clone, Encode, Decode)]
114struct ClusterControllerSolutionNotification {
115    solution_response: SolutionResponse,
116}
117
118impl GenericNotification for ClusterControllerSolutionNotification {
119    const SUBJECT: &'static str = "ab.controller.*.solution";
120}
121
122/// Notification messages with block seal by farmers
123#[derive(Debug, Clone, Encode, Decode)]
124struct ClusterControllerBlockSealNotification {
125    block_seal: BlockSealResponse,
126}
127
128impl GenericNotification for ClusterControllerBlockSealNotification {
129    const SUBJECT: &'static str = "ab.controller.block-seal";
130}
131
132/// Request farmer app info from controller
133#[derive(Debug, Clone, Encode, Decode)]
134struct ClusterControllerFarmerAppInfoRequest;
135
136impl GenericRequest for ClusterControllerFarmerAppInfoRequest {
137    const SUBJECT: &'static str = "ab.controller.farmer-app-info";
138    type Response = Result<FarmerAppInfo, String>;
139}
140
141/// Request segment headers with specified segment indices
142#[derive(Debug, Clone, Encode, Decode)]
143struct ClusterControllerSegmentHeadersRequest {
144    segment_indices: Vec<SegmentIndex>,
145}
146
147impl GenericRequest for ClusterControllerSegmentHeadersRequest {
148    const SUBJECT: &'static str = "ab.controller.segment-headers";
149    type Response = Vec<Option<SegmentHeader>>;
150}
151
152/// Find piece with specified index in cache
153#[derive(Debug, Clone, Encode, Decode)]
154struct ClusterControllerFindPieceInCacheRequest {
155    piece_index: PieceIndex,
156}
157
158impl GenericRequest for ClusterControllerFindPieceInCacheRequest {
159    const SUBJECT: &'static str = "ab.controller.*.find-piece-in-cache";
160    type Response = Option<(PieceCacheId, PieceCacheOffset)>;
161}
162
163/// Find pieces with specified indices in cache
164#[derive(Debug, Clone, Encode, Decode)]
165struct ClusterControllerFindPiecesInCacheRequest {
166    piece_indices: Vec<PieceIndex>,
167}
168
169impl GenericStreamRequest for ClusterControllerFindPiecesInCacheRequest {
170    const SUBJECT: &'static str = "ab.controller.*.find-pieces-in-cache";
171    /// Only pieces that were found are returned
172    type Response = (PieceIndex, PieceCacheId, PieceCacheOffset);
173}
174
175/// Request piece with specified index
176#[derive(Debug, Clone, Encode, Decode)]
177struct ClusterControllerPieceRequest {
178    piece_index: PieceIndex,
179}
180
181impl GenericRequest for ClusterControllerPieceRequest {
182    const SUBJECT: &'static str = "ab.controller.piece";
183    type Response = Option<Piece>;
184}
185
186/// Request pieces with specified index
187#[derive(Debug, Clone, Encode, Decode)]
188struct ClusterControllerPiecesRequest {
189    piece_indices: Vec<PieceIndex>,
190}
191
192impl GenericStreamRequest for ClusterControllerPiecesRequest {
193    const SUBJECT: &'static str = "ab.controller.pieces";
194    /// Only pieces that were found are returned
195    type Response = (PieceIndex, Piece);
196}
197
198/// Cluster piece getter
199#[derive(Debug, Clone)]
200pub struct ClusterPieceGetter {
201    nats_client: NatsClient,
202    cache_group: String,
203}
204
205#[async_trait]
206impl PieceGetter for ClusterPieceGetter {
207    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
208        if let Some((piece_cache_id, piece_cache_offset)) = self
209            .nats_client
210            .request(
211                &ClusterControllerFindPieceInCacheRequest { piece_index },
212                Some(&self.cache_group),
213            )
214            .await?
215        {
216            trace!(
217                %piece_index,
218                %piece_cache_id,
219                %piece_cache_offset,
220                "Found piece in cache, retrieving"
221            );
222
223            match self
224                .nats_client
225                .request(
226                    &ClusterCacheReadPieceRequest {
227                        offset: piece_cache_offset,
228                    },
229                    Some(&piece_cache_id.to_string()),
230                )
231                .await
232                .map_err(|error| error.to_string())
233                .flatten()
234            {
235                Ok(Some((retrieved_piece_index, piece))) => {
236                    if retrieved_piece_index == piece_index {
237                        trace!(
238                            %piece_index,
239                            %piece_cache_id,
240                            %piece_cache_offset,
241                            "Retrieved piece from cache successfully"
242                        );
243
244                        return Ok(Some(piece));
245                    } else {
246                        trace!(
247                            %piece_index,
248                            %piece_cache_id,
249                            %piece_cache_offset,
250                            "Retrieving piece was replaced in cache during retrieval"
251                        );
252                    }
253                }
254                Ok(None) => {
255                    trace!(
256                        %piece_index,
257                        %piece_cache_id,
258                        %piece_cache_offset,
259                        "Piece cache didn't have piece at offset"
260                    );
261                }
262                Err(error) => {
263                    debug!(
264                        %piece_index,
265                        %piece_cache_id,
266                        %piece_cache_offset,
267                        %error,
268                        "Retrieving piece from cache failed"
269                    );
270                }
271            }
272        } else {
273            trace!(%piece_index, "Piece not found in cache");
274        }
275
276        Ok(self
277            .nats_client
278            .request(&ClusterControllerPieceRequest { piece_index }, None)
279            .await?)
280    }
281
282    async fn get_pieces<'a>(
283        &'a self,
284        piece_indices: Vec<PieceIndex>,
285    ) -> anyhow::Result<
286        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
287    > {
288        let (tx, mut rx) = mpsc::unbounded();
289
290        let piece_indices_to_get =
291            Mutex::new(piece_indices.iter().copied().collect::<HashSet<_>>());
292
293        let mut cached_pieces_by_cache_id = HashMap::<PieceCacheId, Vec<PieceCacheOffset>>::new();
294
295        {
296            let mut cached_pieces = self
297                .nats_client
298                .stream_request(
299                    &ClusterControllerFindPiecesInCacheRequest { piece_indices },
300                    Some(&self.cache_group),
301                )
302                .await?;
303
304            while let Some((_piece_index, piece_cache_id, piece_cache_offset)) =
305                cached_pieces.next().await
306            {
307                cached_pieces_by_cache_id
308                    .entry(piece_cache_id)
309                    .or_default()
310                    .push(piece_cache_offset);
311            }
312        }
313
314        let fut = async move {
315            let tx = &tx;
316
317            cached_pieces_by_cache_id
318                .into_iter()
319                .map(|(piece_cache_id, offsets)| {
320                    let piece_indices_to_get = &piece_indices_to_get;
321
322                    async move {
323                        let mut pieces_stream = match self
324                            .nats_client
325                            .stream_request(
326                                &ClusterCacheReadPiecesRequest { offsets },
327                                Some(&piece_cache_id.to_string()),
328                            )
329                            .await
330                        {
331                            Ok(pieces) => pieces,
332                            Err(error) => {
333                                warn!(
334                                    %error,
335                                    %piece_cache_id,
336                                    "Failed to request pieces from cache"
337                                );
338
339                                return;
340                            }
341                        };
342
343                        while let Some(piece_result) = pieces_stream.next().await {
344                            let (piece_offset, maybe_piece) = match piece_result {
345                                Ok(result) => result,
346                                Err(error) => {
347                                    warn!(%error, "Failed to get piece from cache");
348                                    continue;
349                                }
350                            };
351
352                            if let Some((piece_index, piece)) = maybe_piece {
353                                piece_indices_to_get.lock().remove(&piece_index);
354
355                                tx.unbounded_send((piece_index, Ok(Some(piece)))).expect(
356                                    "This future isn't polled after receiver is dropped; qed",
357                                );
358                            } else {
359                                warn!(
360                                    %piece_cache_id,
361                                    %piece_offset,
362                                    "Failed to get piece from cache, it was missing or already gone"
363                                );
364                            }
365                        }
366                    }
367                })
368                .collect::<FuturesUnordered<_>>()
369                // Simply drain everything
370                .for_each(|()| async {})
371                .await;
372
373            let mut piece_indices_to_get = piece_indices_to_get.into_inner();
374            if piece_indices_to_get.is_empty() {
375                return;
376            }
377
378            let mut pieces_from_controller = match self
379                .nats_client
380                .stream_request(
381                    &ClusterControllerPiecesRequest {
382                        piece_indices: piece_indices_to_get.iter().copied().collect(),
383                    },
384                    None,
385                )
386                .await
387            {
388                Ok(pieces_from_controller) => pieces_from_controller,
389                Err(error) => {
390                    error!(%error, "Failed to get pieces from controller");
391
392                    for piece_index in piece_indices_to_get {
393                        tx.unbounded_send((
394                            piece_index,
395                            Err(anyhow::anyhow!("Failed to get piece from controller")),
396                        ))
397                        .expect("This future isn't polled after receiver is dropped; qed");
398                    }
399                    return;
400                }
401            };
402
403            while let Some((piece_index, piece)) = pieces_from_controller.next().await {
404                piece_indices_to_get.remove(&piece_index);
405                tx.unbounded_send((piece_index, Ok(Some(piece))))
406                    .expect("This future isn't polled after receiver is dropped; qed");
407            }
408
409            for piece_index in piece_indices_to_get {
410                tx.unbounded_send((piece_index, Err(anyhow::anyhow!("Failed to get piece"))))
411                    .expect("This future isn't polled after receiver is dropped; qed");
412            }
413        };
414        let mut fut = Box::pin(fut.fuse());
415
416        // Drive above future and stream back any pieces that were downloaded so far
417        Ok(Box::new(stream::poll_fn(move |cx| {
418            if !fut.is_terminated() {
419                // Result doesn't matter, we'll need to poll stream below anyway
420                let _ = fut.poll_unpin(cx);
421            }
422
423            if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) {
424                return Poll::Ready(maybe_result);
425            }
426
427            // Exit will be done by the stream above
428            Poll::Pending
429        })))
430    }
431}
432
433impl ClusterPieceGetter {
434    /// Create a new instance
435    #[inline]
436    pub fn new(nats_client: NatsClient, cache_group: Option<String>) -> Self {
437        Self {
438            nats_client,
439            cache_group: cache_group.unwrap_or_else(|| GLOBAL_CACHE_GROUP.to_string()),
440        }
441    }
442}
443
444/// [`NodeClient`] used in cluster environment that connects to node through a controller instead
445/// of to the node directly
446#[derive(Debug, Clone)]
447pub struct ClusterNodeClient {
448    nats_client: NatsClient,
449    // Store last slot info instance that can be used to send solution response to (some instances
450    // may be not synced and not able to receive solution responses)
451    last_slot_info_instance: Arc<Mutex<String>>,
452}
453
454impl ClusterNodeClient {
455    /// Create a new instance
456    pub fn new(nats_client: NatsClient) -> Self {
457        Self {
458            nats_client,
459            last_slot_info_instance: Arc::default(),
460        }
461    }
462}
463
464#[async_trait]
465impl NodeClient for ClusterNodeClient {
466    async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
467        Ok(self
468            .nats_client
469            .request(&ClusterControllerFarmerAppInfoRequest, None)
470            .await?
471            .map_err(anyhow::Error::msg)?)
472    }
473
474    async fn subscribe_slot_info(
475        &self,
476    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
477        let subscription = self
478            .nats_client
479            .subscribe_to_broadcasts::<ClusterControllerSlotInfoBroadcast>(None, None)
480            .await?
481            .filter_map({
482                let mut last_slot_number = None;
483                let last_slot_info_instance = Arc::clone(&self.last_slot_info_instance);
484
485                move |broadcast| {
486                    let slot_info = broadcast.slot_info;
487
488                    let maybe_slot_info = if let Some(last_slot_number) = last_slot_number
489                        && last_slot_number >= slot_info.slot_number
490                    {
491                        None
492                    } else {
493                        last_slot_number.replace(slot_info.slot_number);
494                        *last_slot_info_instance.lock() = broadcast.instance;
495
496                        Some(slot_info)
497                    };
498
499                    async move { maybe_slot_info }
500                }
501            });
502
503        Ok(Box::pin(subscription))
504    }
505
506    async fn submit_solution_response(
507        &self,
508        solution_response: SolutionResponse,
509    ) -> anyhow::Result<()> {
510        let last_slot_info_instance = self.last_slot_info_instance.lock().clone();
511        Ok(self
512            .nats_client
513            .notification(
514                &ClusterControllerSolutionNotification { solution_response },
515                Some(&last_slot_info_instance),
516            )
517            .await?)
518    }
519
520    async fn subscribe_block_sealing(
521        &self,
522    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = BlockSealInfo> + Send + 'static>>> {
523        let subscription = self
524            .nats_client
525            .subscribe_to_broadcasts::<ClusterControllerBlockSealingBroadcast>(None, None)
526            .await?
527            .map(|broadcast| broadcast.block_sealing_info);
528
529        Ok(Box::pin(subscription))
530    }
531
532    async fn submit_block_seal(&self, block_seal: BlockSealResponse) -> anyhow::Result<()> {
533        Ok(self
534            .nats_client
535            .notification(&ClusterControllerBlockSealNotification { block_seal }, None)
536            .await?)
537    }
538
539    async fn subscribe_archived_segment_headers(
540        &self,
541    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> {
542        let subscription = self
543            .nats_client
544            .subscribe_to_broadcasts::<ClusterControllerArchivedSegmentHeaderBroadcast>(None, None)
545            .await?
546            .filter_map({
547                let mut last_archived_segment_index = None;
548
549                move |broadcast| {
550                    let archived_segment_header = broadcast.archived_segment_header;
551                    let segment_index = archived_segment_header.segment_index();
552
553                    let maybe_archived_segment_header = if let Some(last_archived_segment_index) =
554                        last_archived_segment_index
555                        && last_archived_segment_index >= segment_index
556                    {
557                        None
558                    } else {
559                        last_archived_segment_index.replace(segment_index);
560
561                        Some(archived_segment_header)
562                    };
563
564                    async move { maybe_archived_segment_header }
565                }
566            });
567
568        Ok(Box::pin(subscription))
569    }
570
571    async fn segment_headers(
572        &self,
573        segment_indices: Vec<SegmentIndex>,
574    ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
575        Ok(self
576            .nats_client
577            .request(
578                &ClusterControllerSegmentHeadersRequest { segment_indices },
579                None,
580            )
581            .await?)
582    }
583
584    async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
585        Ok(self
586            .nats_client
587            .request(&ClusterControllerPieceRequest { piece_index }, None)
588            .await?)
589    }
590
591    async fn acknowledge_archived_segment_header(
592        &self,
593        _segment_index: SegmentIndex,
594    ) -> anyhow::Result<()> {
595        // Acknowledgement is unnecessary/unsupported
596        Ok(())
597    }
598}
599
600/// Create controller service that handles things like broadcasting information (for example slot
601/// notifications) as well as responding to incoming requests (like piece requests).
602///
603/// Implementation is using concurrency with multiple tokio tasks, but can be started multiple times
604/// per controller instance in order to parallelize more work across threads if needed.
605pub async fn controller_service<NC, PG>(
606    nats_client: &NatsClient,
607    node_client: &NC,
608    piece_getter: &PG,
609    farmer_caches: &[(&str, &FarmerCache)],
610    instance: &str,
611    primary_instance: bool,
612) -> anyhow::Result<()>
613where
614    NC: NodeClient,
615    PG: PieceGetter + Sync,
616{
617    if primary_instance {
618        select! {
619            result = slot_info_broadcaster(nats_client, node_client, instance).fuse() => {
620                result
621            },
622            result = block_sealing_broadcaster(nats_client, node_client, instance).fuse() => {
623                result
624            },
625            result = archived_segment_headers_broadcaster(nats_client, node_client, instance).fuse() => {
626                result
627            },
628            result = solution_response_forwarder(nats_client, node_client, instance).fuse() => {
629                result
630            },
631            result = block_seal_forwarder(nats_client, node_client, instance).fuse() => {
632                result
633            },
634            result = farmer_app_info_responder(nats_client, node_client).fuse() => {
635                result
636            },
637            result = segment_headers_responder(nats_client, node_client).fuse() => {
638                result
639            },
640            result = find_piece_responder(nats_client, farmer_caches).fuse() => {
641                result
642            },
643            result = find_pieces_responder(nats_client, farmer_caches).fuse() => {
644                result
645            },
646            result = piece_responder(nats_client, piece_getter).fuse() => {
647                result
648            },
649            result = pieces_responder(nats_client, piece_getter).fuse() => {
650                result
651            },
652        }
653    } else {
654        select! {
655            result = farmer_app_info_responder(nats_client, node_client).fuse() => {
656                result
657            },
658            result = segment_headers_responder(nats_client, node_client).fuse() => {
659                result
660            },
661            result = find_piece_responder(nats_client, farmer_caches).fuse() => {
662                result
663            },
664            result = find_pieces_responder(nats_client, farmer_caches).fuse() => {
665                result
666            },
667            result = piece_responder(nats_client, piece_getter).fuse() => {
668                result
669            },
670            result = pieces_responder(nats_client, piece_getter).fuse() => {
671                result
672            },
673        }
674    }
675}
676
677async fn slot_info_broadcaster<NC>(
678    nats_client: &NatsClient,
679    node_client: &NC,
680    instance: &str,
681) -> anyhow::Result<()>
682where
683    NC: NodeClient,
684{
685    let mut slot_info_notifications = node_client
686        .subscribe_slot_info()
687        .await
688        .map_err(|error| anyhow!("Failed to subscribe to slot info notifications: {error}"))?;
689
690    while let Some(slot_info) = slot_info_notifications.next().await {
691        debug!(?slot_info, "New slot");
692
693        let slot = slot_info.slot_number;
694
695        if let Err(error) = nats_client
696            .broadcast(
697                &ClusterControllerSlotInfoBroadcast {
698                    slot_info,
699                    instance: instance.to_string(),
700                },
701                instance,
702            )
703            .await
704        {
705            warn!(%slot, %error, "Failed to broadcast slot info");
706        }
707    }
708
709    Ok(())
710}
711
712async fn block_sealing_broadcaster<NC>(
713    nats_client: &NatsClient,
714    node_client: &NC,
715    instance: &str,
716) -> anyhow::Result<()>
717where
718    NC: NodeClient,
719{
720    let mut block_sealing_subscription = node_client
721        .subscribe_block_sealing()
722        .await
723        .map_err(|error| anyhow!("Failed to subscribe to block sealing notifications: {error}"))?;
724
725    while let Some(block_sealing_info) = block_sealing_subscription.next().await {
726        trace!(?block_sealing_info, "New block sealing notification");
727
728        if let Err(error) = nats_client
729            .broadcast(
730                &ClusterControllerBlockSealingBroadcast { block_sealing_info },
731                instance,
732            )
733            .await
734        {
735            warn!(%error, "Failed to broadcast block sealing info");
736        }
737    }
738
739    Ok(())
740}
741
742async fn archived_segment_headers_broadcaster<NC>(
743    nats_client: &NatsClient,
744    node_client: &NC,
745    instance: &str,
746) -> anyhow::Result<()>
747where
748    NC: NodeClient,
749{
750    let mut archived_segments_notifications = node_client
751        .subscribe_archived_segment_headers()
752        .await
753        .map_err(|error| {
754            anyhow!("Failed to subscribe to archived segment header notifications: {error}")
755        })?;
756
757    while let Some(archived_segment_header) = archived_segments_notifications.next().await {
758        trace!(
759            ?archived_segment_header,
760            "New archived archived segment header notification"
761        );
762
763        node_client
764            .acknowledge_archived_segment_header(archived_segment_header.segment_index())
765            .await
766            .map_err(|error| anyhow!("Failed to acknowledge archived segment header: {error}"))?;
767
768        if let Err(error) = nats_client
769            .broadcast(
770                &ClusterControllerArchivedSegmentHeaderBroadcast {
771                    archived_segment_header,
772                },
773                instance,
774            )
775            .await
776        {
777            warn!(%error, "Failed to broadcast archived segment header info");
778        }
779    }
780
781    Ok(())
782}
783
784async fn solution_response_forwarder<NC>(
785    nats_client: &NatsClient,
786    node_client: &NC,
787    instance: &str,
788) -> anyhow::Result<()>
789where
790    NC: NodeClient,
791{
792    let mut subscription = nats_client
793        .subscribe_to_notifications::<ClusterControllerSolutionNotification>(
794            Some(instance),
795            Some(instance.to_string()),
796        )
797        .await
798        .map_err(|error| anyhow!("Failed to subscribe to solution notifications: {error}"))?;
799
800    while let Some(notification) = subscription.next().await {
801        debug!(?notification, "Solution notification");
802
803        let slot = notification.solution_response.slot_number;
804        let public_key_hash = notification.solution_response.solution.public_key_hash;
805        let sector_index = notification.solution_response.solution.sector_index;
806
807        if let Err(error) = node_client
808            .submit_solution_response(notification.solution_response)
809            .await
810        {
811            warn!(
812                %error,
813                %slot,
814                %public_key_hash,
815                %sector_index,
816                "Failed to send solution response"
817            );
818        }
819    }
820
821    Ok(())
822}
823
824async fn block_seal_forwarder<NC>(
825    nats_client: &NatsClient,
826    node_client: &NC,
827    instance: &str,
828) -> anyhow::Result<()>
829where
830    NC: NodeClient,
831{
832    let mut subscription = nats_client
833        .subscribe_to_notifications::<ClusterControllerBlockSealNotification>(
834            None,
835            Some(instance.to_string()),
836        )
837        .await
838        .map_err(|error| anyhow!("Failed to subscribe to block seal notifications: {error}"))?;
839
840    while let Some(notification) = subscription.next().await {
841        debug!(?notification, "Block seal notification");
842
843        if let Err(error) = node_client.submit_block_seal(notification.block_seal).await {
844            warn!(%error, "Failed to send block seal");
845        }
846    }
847
848    Ok(())
849}
850
851async fn farmer_app_info_responder<NC>(
852    nats_client: &NatsClient,
853    node_client: &NC,
854) -> anyhow::Result<()>
855where
856    NC: NodeClient,
857{
858    nats_client
859        .request_responder(
860            None,
861            Some("ab.controller".to_string()),
862            |_: ClusterControllerFarmerAppInfoRequest| async move {
863                Some(
864                    node_client
865                        .farmer_app_info()
866                        .await
867                        .map_err(|error| error.to_string()),
868                )
869            },
870        )
871        .await
872}
873
874async fn segment_headers_responder<NC>(
875    nats_client: &NatsClient,
876    node_client: &NC,
877) -> anyhow::Result<()>
878where
879    NC: NodeClient,
880{
881    nats_client
882        .request_responder(
883            None,
884            Some("ab.controller".to_string()),
885            |ClusterControllerSegmentHeadersRequest { segment_indices }| async move {
886                node_client
887                    .segment_headers(segment_indices.clone())
888                    .await
889                    .inspect_err(|error| {
890                        warn!(%error, ?segment_indices, "Failed to get segment headers");
891                    })
892                    .ok()
893            },
894        )
895        .await
896}
897
898async fn find_piece_responder(
899    nats_client: &NatsClient,
900    farmer_caches: &[(&str, &FarmerCache)],
901) -> anyhow::Result<()> {
902    futures::future::try_join(
903        farmer_caches
904            .iter()
905            .map(|(cache_group, farmer_cache)| {
906                nats_client.request_responder(
907                    Some(cache_group),
908                    Some("ab.controller".to_string()),
909                    move |ClusterControllerFindPieceInCacheRequest { piece_index }| async move {
910                        Some(farmer_cache.find_piece(piece_index).await)
911                    },
912                )
913            })
914            .collect::<FuturesUnordered<_>>()
915            .next()
916            .map(|result| result.unwrap_or(Ok(()))),
917        nats_client.request_responder(
918            Some(GLOBAL_CACHE_GROUP),
919            Some("ab.controller".to_string()),
920            |ClusterControllerFindPieceInCacheRequest { piece_index }| async move {
921                let (_cache_group, farmer_cache) = farmer_caches.iter().choose(&mut rand::rng())?;
922                Some(farmer_cache.find_piece(piece_index).await)
923            },
924        ),
925    )
926    .await
927    .map(|((), ())| ())
928}
929
930async fn find_pieces_responder(
931    nats_client: &NatsClient,
932    farmer_caches: &[(&str, &FarmerCache)],
933) -> anyhow::Result<()> {
934    futures::future::try_join(
935        farmer_caches
936            .iter()
937            .map(|(cache_group, farmer_cache)| {
938                nats_client.stream_request_responder(
939                    Some(cache_group),
940                    Some("ab.controller".to_string()),
941                    move |ClusterControllerFindPiecesInCacheRequest { piece_indices }| async move {
942                        Some(stream::iter(farmer_cache.find_pieces(piece_indices).await))
943                    },
944                )
945            })
946            .collect::<FuturesUnordered<_>>()
947            .next()
948            .map(|result| result.unwrap_or(Ok(()))),
949        nats_client.stream_request_responder(
950            Some(GLOBAL_CACHE_GROUP),
951            Some("ab.controller".to_string()),
952            |ClusterControllerFindPiecesInCacheRequest { piece_indices }| async move {
953                let (_cache_group, farmer_cache) = farmer_caches.iter().choose(&mut rand::rng())?;
954                Some(stream::iter(farmer_cache.find_pieces(piece_indices).await))
955            },
956        ),
957    )
958    .await
959    .map(|((), ())| ())
960}
961
962async fn piece_responder<PG>(nats_client: &NatsClient, piece_getter: &PG) -> anyhow::Result<()>
963where
964    PG: PieceGetter + Sync,
965{
966    nats_client
967        .request_responder(
968            None,
969            Some("ab.controller".to_string()),
970            |ClusterControllerPieceRequest { piece_index }| async move {
971                piece_getter
972                    .get_piece(piece_index)
973                    .await
974                    .inspect_err(|error| warn!(%error, %piece_index, "Failed to get piece"))
975                    .ok()
976            },
977        )
978        .await
979}
980
981async fn pieces_responder<PG>(nats_client: &NatsClient, piece_getter: &PG) -> anyhow::Result<()>
982where
983    PG: PieceGetter + Sync,
984{
985    nats_client
986        .stream_request_responder(
987            None,
988            Some("ab.controller".to_string()),
989            |ClusterControllerPiecesRequest { piece_indices }| async move {
990                piece_getter
991                    .get_pieces(piece_indices)
992                    .await
993                    .map(|stream| {
994                        Box::pin(stream.filter_map(
995                            |(piece_index, maybe_piece_result)| async move {
996                                match maybe_piece_result {
997                                    Ok(Some(piece)) => Some((piece_index, piece)),
998                                    Ok(None) => None,
999                                    Err(error) => {
1000                                        warn!(%error, %piece_index, "Failed to get piece");
1001                                        None
1002                                    }
1003                                }
1004                            },
1005                        ))
1006                    })
1007                    .inspect_err(|error| warn!(%error, "Failed to get pieces"))
1008                    .ok()
1009            },
1010        )
1011        .await
1012}