Skip to main content

ab_farmer/cluster/
controller.rs

1//! Farming cluster controller
2//!
3//! Controller is responsible for managing farming cluster.
4//!
5//! This module exposes some data structures for NATS communication, custom piece getter and node
6//! client implementations designed to work with cluster controller and a service function to drive
7//! the backend part of the controller.
8
9pub mod caches;
10pub mod farms;
11mod stream_map;
12
13use crate::cluster::cache::{ClusterCacheReadPieceRequest, ClusterCacheReadPiecesRequest};
14use crate::cluster::nats_client::{
15    GenericBroadcast, GenericNotification, GenericRequest, GenericStreamRequest, NatsClient,
16};
17use crate::farm::{PieceCacheId, PieceCacheOffset};
18use crate::farmer_cache::FarmerCache;
19use crate::node_client::NodeClient;
20use ab_core_primitives::pieces::{Piece, PieceIndex};
21use ab_core_primitives::segments::{
22    SegmentHeader, SegmentIndex, SuperSegmentHeader, SuperSegmentIndex,
23};
24use ab_data_retrieval::piece_getter::PieceGetter;
25use ab_farmer_rpc_primitives::{
26    BlockSealInfo, BlockSealResponse, FarmerAppInfo, FarmerShardMembershipInfo, SlotInfo,
27    SolutionResponse,
28};
29use anyhow::anyhow;
30use async_nats::HeaderValue;
31use async_trait::async_trait;
32use futures::channel::mpsc;
33use futures::future::FusedFuture;
34use futures::stream::FuturesUnordered;
35use futures::{FutureExt, Stream, StreamExt, select, stream};
36use parity_scale_codec::{Decode, Encode};
37use parking_lot::Mutex;
38use rand::prelude::*;
39use std::collections::{HashMap, HashSet};
40use std::pin::Pin;
41use std::sync::Arc;
42use std::task::Poll;
43use tracing::{debug, error, trace, warn};
44
45/// Special "cache group" that all controllers subscribe to and that can be used to query any cache
46/// group. The cache group for each query is chosen at random.
47const GLOBAL_CACHE_GROUP: &str = "_";
48
49/// Broadcast sent by controllers requesting farmers to identify themselves
50#[derive(Debug, Copy, Clone, Encode, Decode)]
51pub struct ClusterControllerFarmerIdentifyBroadcast;
52
53impl GenericBroadcast for ClusterControllerFarmerIdentifyBroadcast {
54    const SUBJECT: &'static str = "ab.controller.farmer-identify";
55}
56
57/// Broadcast sent by controllers requesting caches in cache group to identify themselves
58#[derive(Debug, Copy, Clone, Encode, Decode)]
59pub struct ClusterControllerCacheIdentifyBroadcast;
60
61impl GenericBroadcast for ClusterControllerCacheIdentifyBroadcast {
62    /// `*` here stands for cache group
63    const SUBJECT: &'static str = "ab.controller.*.cache-identify";
64}
65
66/// Broadcast with slot info sent by controllers
67#[derive(Debug, Clone, Encode, Decode)]
68struct ClusterControllerSlotInfoBroadcast {
69    slot_info: SlotInfo,
70    instance: String,
71}
72
73impl GenericBroadcast for ClusterControllerSlotInfoBroadcast {
74    const SUBJECT: &'static str = "ab.controller.slot-info";
75
76    fn deterministic_message_id(&self) -> Option<HeaderValue> {
77        // TODO: Depending on answer in `https://github.com/nats-io/nats.docs/issues/663` this might
78        //  be simplified to just a slot number
79        Some(HeaderValue::from(format!(
80            "slot-info-{}",
81            self.slot_info.slot
82        )))
83    }
84}
85
86/// Broadcast with block sealing info by controllers
87#[derive(Debug, Clone, Encode, Decode)]
88struct ClusterControllerBlockSealingBroadcast {
89    block_sealing_info: BlockSealInfo,
90}
91
92impl GenericBroadcast for ClusterControllerBlockSealingBroadcast {
93    const SUBJECT: &'static str = "ab.controller.block-sealing-info";
94}
95
96/// Broadcast with archived segment headers by controllers
97#[derive(Debug, Clone, Encode, Decode)]
98struct ClusterControllerArchivedSegmentHeaderBroadcast {
99    archived_segment_header: SegmentHeader,
100}
101
102impl GenericBroadcast for ClusterControllerArchivedSegmentHeaderBroadcast {
103    const SUBJECT: &'static str = "ab.controller.archived-segment-header";
104
105    fn deterministic_message_id(&self) -> Option<HeaderValue> {
106        // TODO: Depending on answer in `https://github.com/nats-io/nats.docs/issues/663` this might
107        //  be simplified to just a segment index
108        Some(HeaderValue::from(format!(
109            "archived-segment-{}",
110            self.archived_segment_header.segment_index
111        )))
112    }
113}
114
115/// Notification messages with solution by farmers
116#[derive(Debug, Clone, Encode, Decode)]
117struct ClusterControllerSolutionNotification {
118    solution_response: SolutionResponse,
119}
120
121impl GenericNotification for ClusterControllerSolutionNotification {
122    const SUBJECT: &'static str = "ab.controller.*.solution";
123}
124
125/// Notification messages with block seal by farmers
126#[derive(Debug, Clone, Encode, Decode)]
127struct ClusterControllerBlockSealNotification {
128    block_seal: BlockSealResponse,
129}
130
131impl GenericNotification for ClusterControllerBlockSealNotification {
132    const SUBJECT: &'static str = "ab.controller.block-seal";
133}
134
135/// Request farmer app info from controller
136#[derive(Debug, Clone, Encode, Decode)]
137struct ClusterControllerFarmerAppInfoRequest;
138
139impl GenericRequest for ClusterControllerFarmerAppInfoRequest {
140    const SUBJECT: &'static str = "ab.controller.farmer-app-info";
141    type Response = Result<FarmerAppInfo, String>;
142}
143
144/// Request super segment headers with specified super segment indices
145#[derive(Debug, Clone, Encode, Decode)]
146struct ClusterControllerSuperSegmentHeadersRequest {
147    super_segment_indices: Vec<SuperSegmentIndex>,
148}
149
150impl GenericRequest for ClusterControllerSuperSegmentHeadersRequest {
151    const SUBJECT: &'static str = "ab.controller.super-segment-headers";
152    type Response = Vec<Option<SuperSegmentHeader>>;
153}
154
155/// Request segment headers with specified segment indices
156#[derive(Debug, Clone, Encode, Decode)]
157struct ClusterControllerSegmentHeadersRequest {
158    segment_indices: Vec<SegmentIndex>,
159}
160
161impl GenericRequest for ClusterControllerSegmentHeadersRequest {
162    const SUBJECT: &'static str = "ab.controller.segment-headers";
163    type Response = Vec<Option<SegmentHeader>>;
164}
165
166/// Find piece with specified index in cache
167#[derive(Debug, Clone, Encode, Decode)]
168struct ClusterControllerFindPieceInCacheRequest {
169    piece_index: PieceIndex,
170}
171
172impl GenericRequest for ClusterControllerFindPieceInCacheRequest {
173    const SUBJECT: &'static str = "ab.controller.*.find-piece-in-cache";
174    type Response = Option<(PieceCacheId, PieceCacheOffset)>;
175}
176
177/// Find pieces with specified indices in cache
178#[derive(Debug, Clone, Encode, Decode)]
179struct ClusterControllerFindPiecesInCacheRequest {
180    piece_indices: Vec<PieceIndex>,
181}
182
183impl GenericStreamRequest for ClusterControllerFindPiecesInCacheRequest {
184    const SUBJECT: &'static str = "ab.controller.*.find-pieces-in-cache";
185    /// Only pieces that were found are returned
186    type Response = (PieceIndex, PieceCacheId, PieceCacheOffset);
187}
188
189/// Request piece with specified index
190#[derive(Debug, Clone, Encode, Decode)]
191struct ClusterControllerPieceRequest {
192    piece_index: PieceIndex,
193}
194
195impl GenericRequest for ClusterControllerPieceRequest {
196    const SUBJECT: &'static str = "ab.controller.piece";
197    type Response = Option<Piece>;
198}
199
200/// Request pieces with specified index
201#[derive(Debug, Clone, Encode, Decode)]
202struct ClusterControllerPiecesRequest {
203    piece_indices: Vec<PieceIndex>,
204}
205
206impl GenericStreamRequest for ClusterControllerPiecesRequest {
207    const SUBJECT: &'static str = "ab.controller.pieces";
208    /// Only pieces that were found are returned
209    type Response = (PieceIndex, Piece);
210}
211
212/// Cluster piece getter
213#[derive(Debug, Clone)]
214pub struct ClusterPieceGetter {
215    nats_client: NatsClient,
216    cache_group: String,
217}
218
219#[async_trait]
220impl PieceGetter for ClusterPieceGetter {
221    async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
222        if let Some((piece_cache_id, piece_cache_offset)) = self
223            .nats_client
224            .request(
225                &ClusterControllerFindPieceInCacheRequest { piece_index },
226                Some(&self.cache_group),
227            )
228            .await?
229        {
230            trace!(
231                %piece_index,
232                %piece_cache_id,
233                %piece_cache_offset,
234                "Found piece in cache, retrieving"
235            );
236
237            match self
238                .nats_client
239                .request(
240                    &ClusterCacheReadPieceRequest {
241                        offset: piece_cache_offset,
242                    },
243                    Some(&piece_cache_id.to_string()),
244                )
245                .await
246                .map_err(|error| error.to_string())
247                .flatten()
248            {
249                Ok(Some((retrieved_piece_index, piece))) => {
250                    if retrieved_piece_index == piece_index {
251                        trace!(
252                            %piece_index,
253                            %piece_cache_id,
254                            %piece_cache_offset,
255                            "Retrieved piece from cache successfully"
256                        );
257
258                        return Ok(Some(piece));
259                    } else {
260                        trace!(
261                            %piece_index,
262                            %piece_cache_id,
263                            %piece_cache_offset,
264                            "Retrieving piece was replaced in cache during retrieval"
265                        );
266                    }
267                }
268                Ok(None) => {
269                    trace!(
270                        %piece_index,
271                        %piece_cache_id,
272                        %piece_cache_offset,
273                        "Piece cache didn't have piece at offset"
274                    );
275                }
276                Err(error) => {
277                    debug!(
278                        %piece_index,
279                        %piece_cache_id,
280                        %piece_cache_offset,
281                        %error,
282                        "Retrieving piece from cache failed"
283                    );
284                }
285            }
286        } else {
287            trace!(%piece_index, "Piece not found in cache");
288        }
289
290        Ok(self
291            .nats_client
292            .request(&ClusterControllerPieceRequest { piece_index }, None)
293            .await?)
294    }
295
296    async fn get_pieces<'a>(
297        &'a self,
298        piece_indices: Vec<PieceIndex>,
299    ) -> anyhow::Result<
300        Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
301    > {
302        let (tx, mut rx) = mpsc::unbounded();
303
304        let piece_indices_to_get =
305            Mutex::new(piece_indices.iter().copied().collect::<HashSet<_>>());
306
307        let mut cached_pieces_by_cache_id = HashMap::<PieceCacheId, Vec<PieceCacheOffset>>::new();
308
309        {
310            let mut cached_pieces = self
311                .nats_client
312                .stream_request(
313                    &ClusterControllerFindPiecesInCacheRequest { piece_indices },
314                    Some(&self.cache_group),
315                )
316                .await?;
317
318            while let Some((_piece_index, piece_cache_id, piece_cache_offset)) =
319                cached_pieces.next().await
320            {
321                cached_pieces_by_cache_id
322                    .entry(piece_cache_id)
323                    .or_default()
324                    .push(piece_cache_offset);
325            }
326        }
327
328        let fut = async move {
329            let tx = &tx;
330
331            cached_pieces_by_cache_id
332                .into_iter()
333                .map(|(piece_cache_id, offsets)| {
334                    let piece_indices_to_get = &piece_indices_to_get;
335
336                    async move {
337                        let mut pieces_stream = match self
338                            .nats_client
339                            .stream_request(
340                                &ClusterCacheReadPiecesRequest { offsets },
341                                Some(&piece_cache_id.to_string()),
342                            )
343                            .await
344                        {
345                            Ok(pieces) => pieces,
346                            Err(error) => {
347                                warn!(
348                                    %error,
349                                    %piece_cache_id,
350                                    "Failed to request pieces from cache"
351                                );
352
353                                return;
354                            }
355                        };
356
357                        while let Some(piece_result) = pieces_stream.next().await {
358                            let (piece_offset, maybe_piece) = match piece_result {
359                                Ok(result) => result,
360                                Err(error) => {
361                                    warn!(%error, "Failed to get piece from cache");
362                                    continue;
363                                }
364                            };
365
366                            if let Some((piece_index, piece)) = maybe_piece {
367                                piece_indices_to_get.lock().remove(&piece_index);
368
369                                tx.unbounded_send((piece_index, Ok(Some(piece)))).expect(
370                                    "This future isn't polled after receiver is dropped; qed",
371                                );
372                            } else {
373                                warn!(
374                                    %piece_cache_id,
375                                    %piece_offset,
376                                    "Failed to get piece from cache, it was missing or already gone"
377                                );
378                            }
379                        }
380                    }
381                })
382                .collect::<FuturesUnordered<_>>()
383                // Simply drain everything
384                .for_each(|()| async {})
385                .await;
386
387            let mut piece_indices_to_get = piece_indices_to_get.into_inner();
388            if piece_indices_to_get.is_empty() {
389                return;
390            }
391
392            let mut pieces_from_controller = match self
393                .nats_client
394                .stream_request(
395                    &ClusterControllerPiecesRequest {
396                        piece_indices: piece_indices_to_get.iter().copied().collect(),
397                    },
398                    None,
399                )
400                .await
401            {
402                Ok(pieces_from_controller) => pieces_from_controller,
403                Err(error) => {
404                    error!(%error, "Failed to get pieces from controller");
405
406                    for piece_index in piece_indices_to_get {
407                        tx.unbounded_send((
408                            piece_index,
409                            Err(anyhow::anyhow!("Failed to get piece from controller")),
410                        ))
411                        .expect("This future isn't polled after receiver is dropped; qed");
412                    }
413                    return;
414                }
415            };
416
417            while let Some((piece_index, piece)) = pieces_from_controller.next().await {
418                piece_indices_to_get.remove(&piece_index);
419                tx.unbounded_send((piece_index, Ok(Some(piece))))
420                    .expect("This future isn't polled after receiver is dropped; qed");
421            }
422
423            for piece_index in piece_indices_to_get {
424                tx.unbounded_send((piece_index, Err(anyhow::anyhow!("Failed to get piece"))))
425                    .expect("This future isn't polled after receiver is dropped; qed");
426            }
427        };
428        let mut fut = Box::pin(fut.fuse());
429
430        // Drive above future and stream back any pieces that were downloaded so far
431        Ok(Box::new(stream::poll_fn(move |cx| {
432            if !fut.is_terminated() {
433                // Result doesn't matter, we'll need to poll stream below anyway
434                let _ = fut.poll_unpin(cx);
435            }
436
437            if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) {
438                return Poll::Ready(maybe_result);
439            }
440
441            // Exit will be done by the stream above
442            Poll::Pending
443        })))
444    }
445}
446
447impl ClusterPieceGetter {
448    /// Create a new instance
449    #[inline]
450    pub fn new(nats_client: NatsClient, cache_group: Option<String>) -> Self {
451        Self {
452            nats_client,
453            cache_group: cache_group.unwrap_or_else(|| GLOBAL_CACHE_GROUP.to_string()),
454        }
455    }
456}
457
458/// [`NodeClient`] used in cluster environment that connects to node through a controller instead
459/// of to the node directly
460#[derive(Debug, Clone)]
461pub struct ClusterNodeClient {
462    nats_client: NatsClient,
463    // Store last slot info instance that can be used to send solution response to (some instances
464    // may be not synced and not able to receive solution responses)
465    last_slot_info_instance: Arc<Mutex<String>>,
466}
467
468impl ClusterNodeClient {
469    /// Create a new instance
470    pub fn new(nats_client: NatsClient) -> Self {
471        Self {
472            nats_client,
473            last_slot_info_instance: Arc::default(),
474        }
475    }
476}
477
478#[async_trait]
479impl NodeClient for ClusterNodeClient {
480    async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
481        Ok(self
482            .nats_client
483            .request(&ClusterControllerFarmerAppInfoRequest, None)
484            .await?
485            .map_err(anyhow::Error::msg)?)
486    }
487
488    async fn subscribe_slot_info(
489        &self,
490    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
491        let subscription = self
492            .nats_client
493            .subscribe_to_broadcasts::<ClusterControllerSlotInfoBroadcast>(None, None)
494            .await?
495            .filter_map({
496                let mut last_slot_number = None;
497                let last_slot_info_instance = Arc::clone(&self.last_slot_info_instance);
498
499                move |broadcast| {
500                    let slot_info = broadcast.slot_info;
501
502                    let maybe_slot_info = if let Some(last_slot_number) = last_slot_number
503                        && last_slot_number >= slot_info.slot
504                    {
505                        None
506                    } else {
507                        last_slot_number.replace(slot_info.slot);
508                        *last_slot_info_instance.lock() = broadcast.instance;
509
510                        Some(slot_info)
511                    };
512
513                    async move { maybe_slot_info }
514                }
515            });
516
517        Ok(Box::pin(subscription))
518    }
519
520    async fn submit_solution_response(
521        &self,
522        solution_response: SolutionResponse,
523    ) -> anyhow::Result<()> {
524        let last_slot_info_instance = self.last_slot_info_instance.lock().clone();
525        Ok(self
526            .nats_client
527            .notification(
528                &ClusterControllerSolutionNotification { solution_response },
529                Some(&last_slot_info_instance),
530            )
531            .await?)
532    }
533
534    async fn subscribe_block_sealing(
535        &self,
536    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = BlockSealInfo> + Send + 'static>>> {
537        let subscription = self
538            .nats_client
539            .subscribe_to_broadcasts::<ClusterControllerBlockSealingBroadcast>(None, None)
540            .await?
541            .map(|broadcast| broadcast.block_sealing_info);
542
543        Ok(Box::pin(subscription))
544    }
545
546    async fn submit_block_seal(&self, block_seal: BlockSealResponse) -> anyhow::Result<()> {
547        Ok(self
548            .nats_client
549            .notification(&ClusterControllerBlockSealNotification { block_seal }, None)
550            .await?)
551    }
552
553    async fn subscribe_archived_segment_headers(
554        &self,
555    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> {
556        let subscription = self
557            .nats_client
558            .subscribe_to_broadcasts::<ClusterControllerArchivedSegmentHeaderBroadcast>(None, None)
559            .await?
560            .filter_map({
561                let mut last_archived_segment_index = None;
562
563                move |broadcast| {
564                    let archived_segment_header = broadcast.archived_segment_header;
565                    let segment_index = archived_segment_header.local_segment_index();
566
567                    let maybe_archived_segment_header = if let Some(last_archived_segment_index) =
568                        last_archived_segment_index
569                        && last_archived_segment_index >= segment_index
570                    {
571                        None
572                    } else {
573                        last_archived_segment_index.replace(segment_index);
574
575                        Some(archived_segment_header)
576                    };
577
578                    async move { maybe_archived_segment_header }
579                }
580            });
581
582        Ok(Box::pin(subscription))
583    }
584
585    async fn super_segment_headers(
586        &self,
587        super_segment_indices: Vec<SuperSegmentIndex>,
588    ) -> anyhow::Result<Vec<Option<SuperSegmentHeader>>> {
589        Ok(self
590            .nats_client
591            .request(
592                &ClusterControllerSuperSegmentHeadersRequest {
593                    super_segment_indices,
594                },
595                None,
596            )
597            .await?)
598    }
599
600    async fn segment_headers(
601        &self,
602        segment_indices: Vec<SegmentIndex>,
603    ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
604        Ok(self
605            .nats_client
606            .request(
607                &ClusterControllerSegmentHeadersRequest { segment_indices },
608                None,
609            )
610            .await?)
611    }
612
613    async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
614        Ok(self
615            .nats_client
616            .request(&ClusterControllerPieceRequest { piece_index }, None)
617            .await?)
618    }
619
620    async fn acknowledge_archived_segment_header(
621        &self,
622        _segment_index: SegmentIndex,
623    ) -> anyhow::Result<()> {
624        // Acknowledgement is unnecessary/unsupported
625        Ok(())
626    }
627
628    async fn update_shard_membership_info(
629        &self,
630        _info: FarmerShardMembershipInfo,
631    ) -> anyhow::Result<()> {
632        // Controller aggregates these on its own
633        Ok(())
634    }
635}
636
637/// Create controller service that handles things like broadcasting information (for example slot
638/// notifications) as well as responding to incoming requests (like piece requests).
639///
640/// Implementation is using concurrency with multiple tokio tasks, but can be started multiple times
641/// per controller instance in order to parallelize more work across threads if needed.
642pub async fn controller_service<NC, PG>(
643    nats_client: &NatsClient,
644    node_client: &NC,
645    piece_getter: &PG,
646    farmer_caches: &[(&str, &FarmerCache)],
647    instance: &str,
648    primary_instance: bool,
649) -> anyhow::Result<()>
650where
651    NC: NodeClient,
652    PG: PieceGetter + Sync,
653{
654    if primary_instance {
655        select! {
656            result = slot_info_broadcaster(nats_client, node_client, instance).fuse() => {
657                result
658            },
659            result = block_sealing_broadcaster(nats_client, node_client, instance).fuse() => {
660                result
661            },
662            result = archived_segment_headers_broadcaster(nats_client, node_client, instance).fuse() => {
663                result
664            },
665            result = solution_response_forwarder(nats_client, node_client, instance).fuse() => {
666                result
667            },
668            result = block_seal_forwarder(nats_client, node_client, instance).fuse() => {
669                result
670            },
671            result = farmer_app_info_responder(nats_client, node_client).fuse() => {
672                result
673            },
674            result = super_segment_headers_responder(nats_client, node_client).fuse() => {
675                result
676            },
677            result = segment_headers_responder(nats_client, node_client).fuse() => {
678                result
679            },
680            result = find_piece_responder(nats_client, farmer_caches).fuse() => {
681                result
682            },
683            result = find_pieces_responder(nats_client, farmer_caches).fuse() => {
684                result
685            },
686            result = piece_responder(nats_client, piece_getter).fuse() => {
687                result
688            },
689            result = pieces_responder(nats_client, piece_getter).fuse() => {
690                result
691            },
692        }
693    } else {
694        select! {
695            result = farmer_app_info_responder(nats_client, node_client).fuse() => {
696                result
697            },
698            result = super_segment_headers_responder(nats_client, node_client).fuse() => {
699                result
700            },
701            result = segment_headers_responder(nats_client, node_client).fuse() => {
702                result
703            },
704            result = find_piece_responder(nats_client, farmer_caches).fuse() => {
705                result
706            },
707            result = find_pieces_responder(nats_client, farmer_caches).fuse() => {
708                result
709            },
710            result = piece_responder(nats_client, piece_getter).fuse() => {
711                result
712            },
713            result = pieces_responder(nats_client, piece_getter).fuse() => {
714                result
715            },
716        }
717    }
718}
719
720async fn slot_info_broadcaster<NC>(
721    nats_client: &NatsClient,
722    node_client: &NC,
723    instance: &str,
724) -> anyhow::Result<()>
725where
726    NC: NodeClient,
727{
728    let mut slot_info_notifications = node_client
729        .subscribe_slot_info()
730        .await
731        .map_err(|error| anyhow!("Failed to subscribe to slot info notifications: {error}"))?;
732
733    while let Some(slot_info) = slot_info_notifications.next().await {
734        debug!(?slot_info, "New slot");
735
736        let slot = slot_info.slot;
737
738        if let Err(error) = nats_client
739            .broadcast(
740                &ClusterControllerSlotInfoBroadcast {
741                    slot_info,
742                    instance: instance.to_string(),
743                },
744                instance,
745            )
746            .await
747        {
748            warn!(%slot, %error, "Failed to broadcast slot info");
749        }
750    }
751
752    Ok(())
753}
754
755async fn block_sealing_broadcaster<NC>(
756    nats_client: &NatsClient,
757    node_client: &NC,
758    instance: &str,
759) -> anyhow::Result<()>
760where
761    NC: NodeClient,
762{
763    let mut block_sealing_subscription = node_client
764        .subscribe_block_sealing()
765        .await
766        .map_err(|error| anyhow!("Failed to subscribe to block sealing notifications: {error}"))?;
767
768    while let Some(block_sealing_info) = block_sealing_subscription.next().await {
769        trace!(?block_sealing_info, "New block sealing notification");
770
771        if let Err(error) = nats_client
772            .broadcast(
773                &ClusterControllerBlockSealingBroadcast { block_sealing_info },
774                instance,
775            )
776            .await
777        {
778            warn!(%error, "Failed to broadcast block sealing info");
779        }
780    }
781
782    Ok(())
783}
784
785async fn archived_segment_headers_broadcaster<NC>(
786    nats_client: &NatsClient,
787    node_client: &NC,
788    instance: &str,
789) -> anyhow::Result<()>
790where
791    NC: NodeClient,
792{
793    let mut archived_segments_notifications = node_client
794        .subscribe_archived_segment_headers()
795        .await
796        .map_err(|error| {
797            anyhow!("Failed to subscribe to archived segment header notifications: {error}")
798        })?;
799
800    while let Some(archived_segment_header) = archived_segments_notifications.next().await {
801        trace!(
802            ?archived_segment_header,
803            "New archived archived segment header notification"
804        );
805
806        node_client
807            .acknowledge_archived_segment_header(
808                archived_segment_header.local_segment_index().into(),
809            )
810            .await
811            .map_err(|error| anyhow!("Failed to acknowledge archived segment header: {error}"))?;
812
813        if let Err(error) = nats_client
814            .broadcast(
815                &ClusterControllerArchivedSegmentHeaderBroadcast {
816                    archived_segment_header,
817                },
818                instance,
819            )
820            .await
821        {
822            warn!(%error, "Failed to broadcast archived segment header info");
823        }
824    }
825
826    Ok(())
827}
828
829async fn solution_response_forwarder<NC>(
830    nats_client: &NatsClient,
831    node_client: &NC,
832    instance: &str,
833) -> anyhow::Result<()>
834where
835    NC: NodeClient,
836{
837    let mut subscription = nats_client
838        .subscribe_to_notifications::<ClusterControllerSolutionNotification>(
839            Some(instance),
840            Some(instance.to_string()),
841        )
842        .await
843        .map_err(|error| anyhow!("Failed to subscribe to solution notifications: {error}"))?;
844
845    while let Some(notification) = subscription.next().await {
846        debug!(?notification, "Solution notification");
847
848        let slot = notification.solution_response.slot_number;
849        let public_key_hash = notification.solution_response.solution.public_key_hash;
850        let sector_index = notification.solution_response.solution.sector_index;
851
852        if let Err(error) = node_client
853            .submit_solution_response(notification.solution_response)
854            .await
855        {
856            warn!(
857                %error,
858                %slot,
859                %public_key_hash,
860                %sector_index,
861                "Failed to send solution response"
862            );
863        }
864    }
865
866    Ok(())
867}
868
869async fn block_seal_forwarder<NC>(
870    nats_client: &NatsClient,
871    node_client: &NC,
872    instance: &str,
873) -> anyhow::Result<()>
874where
875    NC: NodeClient,
876{
877    let mut subscription = nats_client
878        .subscribe_to_notifications::<ClusterControllerBlockSealNotification>(
879            None,
880            Some(instance.to_string()),
881        )
882        .await
883        .map_err(|error| anyhow!("Failed to subscribe to block seal notifications: {error}"))?;
884
885    while let Some(notification) = subscription.next().await {
886        debug!(?notification, "Block seal notification");
887
888        if let Err(error) = node_client.submit_block_seal(notification.block_seal).await {
889            warn!(%error, "Failed to send block seal");
890        }
891    }
892
893    Ok(())
894}
895
896async fn farmer_app_info_responder<NC>(
897    nats_client: &NatsClient,
898    node_client: &NC,
899) -> anyhow::Result<()>
900where
901    NC: NodeClient,
902{
903    nats_client
904        .request_responder(
905            None,
906            Some("ab.controller".to_string()),
907            |_: ClusterControllerFarmerAppInfoRequest| async move {
908                Some(
909                    node_client
910                        .farmer_app_info()
911                        .await
912                        .map_err(|error| error.to_string()),
913                )
914            },
915        )
916        .await
917}
918
919async fn super_segment_headers_responder<NC>(
920    nats_client: &NatsClient,
921    node_client: &NC,
922) -> anyhow::Result<()>
923where
924    NC: NodeClient,
925{
926    nats_client
927        .request_responder(
928            None,
929            Some("ab.controller".to_string()),
930            |ClusterControllerSuperSegmentHeadersRequest { super_segment_indices }| async move {
931                node_client
932                    .super_segment_headers(super_segment_indices.clone())
933                    .await
934                    .inspect_err(|error| {
935                        warn!(%error, ?super_segment_indices, "Failed to get super segment headers");
936                    })
937                    .ok()
938            },
939        )
940        .await
941}
942
943async fn segment_headers_responder<NC>(
944    nats_client: &NatsClient,
945    node_client: &NC,
946) -> anyhow::Result<()>
947where
948    NC: NodeClient,
949{
950    nats_client
951        .request_responder(
952            None,
953            Some("ab.controller".to_string()),
954            |ClusterControllerSegmentHeadersRequest { segment_indices }| async move {
955                node_client
956                    .segment_headers(segment_indices.clone())
957                    .await
958                    .inspect_err(|error| {
959                        warn!(%error, ?segment_indices, "Failed to get segment headers");
960                    })
961                    .ok()
962            },
963        )
964        .await
965}
966
967async fn find_piece_responder(
968    nats_client: &NatsClient,
969    farmer_caches: &[(&str, &FarmerCache)],
970) -> anyhow::Result<()> {
971    futures::future::try_join(
972        farmer_caches
973            .iter()
974            .map(|(cache_group, farmer_cache)| {
975                nats_client.request_responder(
976                    Some(cache_group),
977                    Some("ab.controller".to_string()),
978                    move |ClusterControllerFindPieceInCacheRequest { piece_index }| async move {
979                        Some(farmer_cache.find_piece(piece_index).await)
980                    },
981                )
982            })
983            .collect::<FuturesUnordered<_>>()
984            .next()
985            .map(|result| result.unwrap_or(Ok(()))),
986        nats_client.request_responder(
987            Some(GLOBAL_CACHE_GROUP),
988            Some("ab.controller".to_string()),
989            |ClusterControllerFindPieceInCacheRequest { piece_index }| async move {
990                let (_cache_group, farmer_cache) = farmer_caches.iter().choose(&mut rand::rng())?;
991                Some(farmer_cache.find_piece(piece_index).await)
992            },
993        ),
994    )
995    .await
996    .map(|((), ())| ())
997}
998
999async fn find_pieces_responder(
1000    nats_client: &NatsClient,
1001    farmer_caches: &[(&str, &FarmerCache)],
1002) -> anyhow::Result<()> {
1003    futures::future::try_join(
1004        farmer_caches
1005            .iter()
1006            .map(|(cache_group, farmer_cache)| {
1007                nats_client.stream_request_responder(
1008                    Some(cache_group),
1009                    Some("ab.controller".to_string()),
1010                    move |ClusterControllerFindPiecesInCacheRequest { piece_indices }| async move {
1011                        Some(stream::iter(farmer_cache.find_pieces(piece_indices).await))
1012                    },
1013                )
1014            })
1015            .collect::<FuturesUnordered<_>>()
1016            .next()
1017            .map(|result| result.unwrap_or(Ok(()))),
1018        nats_client.stream_request_responder(
1019            Some(GLOBAL_CACHE_GROUP),
1020            Some("ab.controller".to_string()),
1021            |ClusterControllerFindPiecesInCacheRequest { piece_indices }| async move {
1022                let (_cache_group, farmer_cache) = farmer_caches.iter().choose(&mut rand::rng())?;
1023                Some(stream::iter(farmer_cache.find_pieces(piece_indices).await))
1024            },
1025        ),
1026    )
1027    .await
1028    .map(|((), ())| ())
1029}
1030
1031async fn piece_responder<PG>(nats_client: &NatsClient, piece_getter: &PG) -> anyhow::Result<()>
1032where
1033    PG: PieceGetter + Sync,
1034{
1035    nats_client
1036        .request_responder(
1037            None,
1038            Some("ab.controller".to_string()),
1039            |ClusterControllerPieceRequest { piece_index }| async move {
1040                piece_getter
1041                    .get_piece(piece_index)
1042                    .await
1043                    .inspect_err(|error| warn!(%error, %piece_index, "Failed to get piece"))
1044                    .ok()
1045            },
1046        )
1047        .await
1048}
1049
1050async fn pieces_responder<PG>(nats_client: &NatsClient, piece_getter: &PG) -> anyhow::Result<()>
1051where
1052    PG: PieceGetter + Sync,
1053{
1054    nats_client
1055        .stream_request_responder(
1056            None,
1057            Some("ab.controller".to_string()),
1058            |ClusterControllerPiecesRequest { piece_indices }| async move {
1059                piece_getter
1060                    .get_pieces(piece_indices)
1061                    .await
1062                    .map(|stream| {
1063                        Box::pin(stream.filter_map(
1064                            |(piece_index, maybe_piece_result)| async move {
1065                                match maybe_piece_result {
1066                                    Ok(Some(piece)) => Some((piece_index, piece)),
1067                                    Ok(None) => None,
1068                                    Err(error) => {
1069                                        warn!(%error, %piece_index, "Failed to get piece");
1070                                        None
1071                                    }
1072                                }
1073                            },
1074                        ))
1075                    })
1076                    .inspect_err(|error| warn!(%error, "Failed to get pieces"))
1077                    .ok()
1078            },
1079        )
1080        .await
1081}