1pub mod caches;
10pub mod farms;
11mod stream_map;
12
13use crate::cluster::cache::{ClusterCacheReadPieceRequest, ClusterCacheReadPiecesRequest};
14use crate::cluster::nats_client::{
15 GenericBroadcast, GenericNotification, GenericRequest, GenericStreamRequest, NatsClient,
16};
17use crate::farm::{PieceCacheId, PieceCacheOffset};
18use crate::farmer_cache::FarmerCache;
19use crate::node_client::NodeClient;
20use ab_core_primitives::pieces::{Piece, PieceIndex};
21use ab_core_primitives::segments::{
22 SegmentHeader, SegmentIndex, SuperSegmentHeader, SuperSegmentIndex,
23};
24use ab_data_retrieval::piece_getter::PieceGetter;
25use ab_farmer_rpc_primitives::{
26 BlockSealInfo, BlockSealResponse, FarmerAppInfo, FarmerShardMembershipInfo, SlotInfo,
27 SolutionResponse,
28};
29use anyhow::anyhow;
30use async_nats::HeaderValue;
31use async_trait::async_trait;
32use futures::channel::mpsc;
33use futures::future::FusedFuture;
34use futures::stream::FuturesUnordered;
35use futures::{FutureExt, Stream, StreamExt, select, stream};
36use parity_scale_codec::{Decode, Encode};
37use parking_lot::Mutex;
38use rand::prelude::*;
39use std::collections::{HashMap, HashSet};
40use std::pin::Pin;
41use std::sync::Arc;
42use std::task::Poll;
43use tracing::{debug, error, trace, warn};
44
45const GLOBAL_CACHE_GROUP: &str = "_";
48
49#[derive(Debug, Copy, Clone, Encode, Decode)]
51pub struct ClusterControllerFarmerIdentifyBroadcast;
52
53impl GenericBroadcast for ClusterControllerFarmerIdentifyBroadcast {
54 const SUBJECT: &'static str = "ab.controller.farmer-identify";
55}
56
57#[derive(Debug, Copy, Clone, Encode, Decode)]
59pub struct ClusterControllerCacheIdentifyBroadcast;
60
61impl GenericBroadcast for ClusterControllerCacheIdentifyBroadcast {
62 const SUBJECT: &'static str = "ab.controller.*.cache-identify";
64}
65
66#[derive(Debug, Clone, Encode, Decode)]
68struct ClusterControllerSlotInfoBroadcast {
69 slot_info: SlotInfo,
70 instance: String,
71}
72
73impl GenericBroadcast for ClusterControllerSlotInfoBroadcast {
74 const SUBJECT: &'static str = "ab.controller.slot-info";
75
76 fn deterministic_message_id(&self) -> Option<HeaderValue> {
77 Some(HeaderValue::from(format!(
80 "slot-info-{}",
81 self.slot_info.slot
82 )))
83 }
84}
85
86#[derive(Debug, Clone, Encode, Decode)]
88struct ClusterControllerBlockSealingBroadcast {
89 block_sealing_info: BlockSealInfo,
90}
91
92impl GenericBroadcast for ClusterControllerBlockSealingBroadcast {
93 const SUBJECT: &'static str = "ab.controller.block-sealing-info";
94}
95
96#[derive(Debug, Clone, Encode, Decode)]
98struct ClusterControllerArchivedSegmentHeaderBroadcast {
99 archived_segment_header: SegmentHeader,
100}
101
102impl GenericBroadcast for ClusterControllerArchivedSegmentHeaderBroadcast {
103 const SUBJECT: &'static str = "ab.controller.archived-segment-header";
104
105 fn deterministic_message_id(&self) -> Option<HeaderValue> {
106 Some(HeaderValue::from(format!(
109 "archived-segment-{}",
110 self.archived_segment_header.segment_index
111 )))
112 }
113}
114
115#[derive(Debug, Clone, Encode, Decode)]
117struct ClusterControllerSolutionNotification {
118 solution_response: SolutionResponse,
119}
120
121impl GenericNotification for ClusterControllerSolutionNotification {
122 const SUBJECT: &'static str = "ab.controller.*.solution";
123}
124
125#[derive(Debug, Clone, Encode, Decode)]
127struct ClusterControllerBlockSealNotification {
128 block_seal: BlockSealResponse,
129}
130
131impl GenericNotification for ClusterControllerBlockSealNotification {
132 const SUBJECT: &'static str = "ab.controller.block-seal";
133}
134
135#[derive(Debug, Clone, Encode, Decode)]
137struct ClusterControllerFarmerAppInfoRequest;
138
139impl GenericRequest for ClusterControllerFarmerAppInfoRequest {
140 const SUBJECT: &'static str = "ab.controller.farmer-app-info";
141 type Response = Result<FarmerAppInfo, String>;
142}
143
144#[derive(Debug, Clone, Encode, Decode)]
146struct ClusterControllerSuperSegmentHeadersRequest {
147 super_segment_indices: Vec<SuperSegmentIndex>,
148}
149
150impl GenericRequest for ClusterControllerSuperSegmentHeadersRequest {
151 const SUBJECT: &'static str = "ab.controller.super-segment-headers";
152 type Response = Vec<Option<SuperSegmentHeader>>;
153}
154
155#[derive(Debug, Clone, Encode, Decode)]
157struct ClusterControllerSegmentHeadersRequest {
158 segment_indices: Vec<SegmentIndex>,
159}
160
161impl GenericRequest for ClusterControllerSegmentHeadersRequest {
162 const SUBJECT: &'static str = "ab.controller.segment-headers";
163 type Response = Vec<Option<SegmentHeader>>;
164}
165
166#[derive(Debug, Clone, Encode, Decode)]
168struct ClusterControllerFindPieceInCacheRequest {
169 piece_index: PieceIndex,
170}
171
172impl GenericRequest for ClusterControllerFindPieceInCacheRequest {
173 const SUBJECT: &'static str = "ab.controller.*.find-piece-in-cache";
174 type Response = Option<(PieceCacheId, PieceCacheOffset)>;
175}
176
177#[derive(Debug, Clone, Encode, Decode)]
179struct ClusterControllerFindPiecesInCacheRequest {
180 piece_indices: Vec<PieceIndex>,
181}
182
183impl GenericStreamRequest for ClusterControllerFindPiecesInCacheRequest {
184 const SUBJECT: &'static str = "ab.controller.*.find-pieces-in-cache";
185 type Response = (PieceIndex, PieceCacheId, PieceCacheOffset);
187}
188
189#[derive(Debug, Clone, Encode, Decode)]
191struct ClusterControllerPieceRequest {
192 piece_index: PieceIndex,
193}
194
195impl GenericRequest for ClusterControllerPieceRequest {
196 const SUBJECT: &'static str = "ab.controller.piece";
197 type Response = Option<Piece>;
198}
199
200#[derive(Debug, Clone, Encode, Decode)]
202struct ClusterControllerPiecesRequest {
203 piece_indices: Vec<PieceIndex>,
204}
205
206impl GenericStreamRequest for ClusterControllerPiecesRequest {
207 const SUBJECT: &'static str = "ab.controller.pieces";
208 type Response = (PieceIndex, Piece);
210}
211
212#[derive(Debug, Clone)]
214pub struct ClusterPieceGetter {
215 nats_client: NatsClient,
216 cache_group: String,
217}
218
219#[async_trait]
220impl PieceGetter for ClusterPieceGetter {
221 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
222 if let Some((piece_cache_id, piece_cache_offset)) = self
223 .nats_client
224 .request(
225 &ClusterControllerFindPieceInCacheRequest { piece_index },
226 Some(&self.cache_group),
227 )
228 .await?
229 {
230 trace!(
231 %piece_index,
232 %piece_cache_id,
233 %piece_cache_offset,
234 "Found piece in cache, retrieving"
235 );
236
237 match self
238 .nats_client
239 .request(
240 &ClusterCacheReadPieceRequest {
241 offset: piece_cache_offset,
242 },
243 Some(&piece_cache_id.to_string()),
244 )
245 .await
246 .map_err(|error| error.to_string())
247 .flatten()
248 {
249 Ok(Some((retrieved_piece_index, piece))) => {
250 if retrieved_piece_index == piece_index {
251 trace!(
252 %piece_index,
253 %piece_cache_id,
254 %piece_cache_offset,
255 "Retrieved piece from cache successfully"
256 );
257
258 return Ok(Some(piece));
259 } else {
260 trace!(
261 %piece_index,
262 %piece_cache_id,
263 %piece_cache_offset,
264 "Retrieving piece was replaced in cache during retrieval"
265 );
266 }
267 }
268 Ok(None) => {
269 trace!(
270 %piece_index,
271 %piece_cache_id,
272 %piece_cache_offset,
273 "Piece cache didn't have piece at offset"
274 );
275 }
276 Err(error) => {
277 debug!(
278 %piece_index,
279 %piece_cache_id,
280 %piece_cache_offset,
281 %error,
282 "Retrieving piece from cache failed"
283 );
284 }
285 }
286 } else {
287 trace!(%piece_index, "Piece not found in cache");
288 }
289
290 Ok(self
291 .nats_client
292 .request(&ClusterControllerPieceRequest { piece_index }, None)
293 .await?)
294 }
295
296 async fn get_pieces<'a>(
297 &'a self,
298 piece_indices: Vec<PieceIndex>,
299 ) -> anyhow::Result<
300 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
301 > {
302 let (tx, mut rx) = mpsc::unbounded();
303
304 let piece_indices_to_get =
305 Mutex::new(piece_indices.iter().copied().collect::<HashSet<_>>());
306
307 let mut cached_pieces_by_cache_id = HashMap::<PieceCacheId, Vec<PieceCacheOffset>>::new();
308
309 {
310 let mut cached_pieces = self
311 .nats_client
312 .stream_request(
313 &ClusterControllerFindPiecesInCacheRequest { piece_indices },
314 Some(&self.cache_group),
315 )
316 .await?;
317
318 while let Some((_piece_index, piece_cache_id, piece_cache_offset)) =
319 cached_pieces.next().await
320 {
321 cached_pieces_by_cache_id
322 .entry(piece_cache_id)
323 .or_default()
324 .push(piece_cache_offset);
325 }
326 }
327
328 let fut = async move {
329 let tx = &tx;
330
331 cached_pieces_by_cache_id
332 .into_iter()
333 .map(|(piece_cache_id, offsets)| {
334 let piece_indices_to_get = &piece_indices_to_get;
335
336 async move {
337 let mut pieces_stream = match self
338 .nats_client
339 .stream_request(
340 &ClusterCacheReadPiecesRequest { offsets },
341 Some(&piece_cache_id.to_string()),
342 )
343 .await
344 {
345 Ok(pieces) => pieces,
346 Err(error) => {
347 warn!(
348 %error,
349 %piece_cache_id,
350 "Failed to request pieces from cache"
351 );
352
353 return;
354 }
355 };
356
357 while let Some(piece_result) = pieces_stream.next().await {
358 let (piece_offset, maybe_piece) = match piece_result {
359 Ok(result) => result,
360 Err(error) => {
361 warn!(%error, "Failed to get piece from cache");
362 continue;
363 }
364 };
365
366 if let Some((piece_index, piece)) = maybe_piece {
367 piece_indices_to_get.lock().remove(&piece_index);
368
369 tx.unbounded_send((piece_index, Ok(Some(piece)))).expect(
370 "This future isn't polled after receiver is dropped; qed",
371 );
372 } else {
373 warn!(
374 %piece_cache_id,
375 %piece_offset,
376 "Failed to get piece from cache, it was missing or already gone"
377 );
378 }
379 }
380 }
381 })
382 .collect::<FuturesUnordered<_>>()
383 .for_each(|()| async {})
385 .await;
386
387 let mut piece_indices_to_get = piece_indices_to_get.into_inner();
388 if piece_indices_to_get.is_empty() {
389 return;
390 }
391
392 let mut pieces_from_controller = match self
393 .nats_client
394 .stream_request(
395 &ClusterControllerPiecesRequest {
396 piece_indices: piece_indices_to_get.iter().copied().collect(),
397 },
398 None,
399 )
400 .await
401 {
402 Ok(pieces_from_controller) => pieces_from_controller,
403 Err(error) => {
404 error!(%error, "Failed to get pieces from controller");
405
406 for piece_index in piece_indices_to_get {
407 tx.unbounded_send((
408 piece_index,
409 Err(anyhow::anyhow!("Failed to get piece from controller")),
410 ))
411 .expect("This future isn't polled after receiver is dropped; qed");
412 }
413 return;
414 }
415 };
416
417 while let Some((piece_index, piece)) = pieces_from_controller.next().await {
418 piece_indices_to_get.remove(&piece_index);
419 tx.unbounded_send((piece_index, Ok(Some(piece))))
420 .expect("This future isn't polled after receiver is dropped; qed");
421 }
422
423 for piece_index in piece_indices_to_get {
424 tx.unbounded_send((piece_index, Err(anyhow::anyhow!("Failed to get piece"))))
425 .expect("This future isn't polled after receiver is dropped; qed");
426 }
427 };
428 let mut fut = Box::pin(fut.fuse());
429
430 Ok(Box::new(stream::poll_fn(move |cx| {
432 if !fut.is_terminated() {
433 let _ = fut.poll_unpin(cx);
435 }
436
437 if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) {
438 return Poll::Ready(maybe_result);
439 }
440
441 Poll::Pending
443 })))
444 }
445}
446
447impl ClusterPieceGetter {
448 #[inline]
450 pub fn new(nats_client: NatsClient, cache_group: Option<String>) -> Self {
451 Self {
452 nats_client,
453 cache_group: cache_group.unwrap_or_else(|| GLOBAL_CACHE_GROUP.to_string()),
454 }
455 }
456}
457
458#[derive(Debug, Clone)]
461pub struct ClusterNodeClient {
462 nats_client: NatsClient,
463 last_slot_info_instance: Arc<Mutex<String>>,
466}
467
468impl ClusterNodeClient {
469 pub fn new(nats_client: NatsClient) -> Self {
471 Self {
472 nats_client,
473 last_slot_info_instance: Arc::default(),
474 }
475 }
476}
477
478#[async_trait]
479impl NodeClient for ClusterNodeClient {
480 async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
481 Ok(self
482 .nats_client
483 .request(&ClusterControllerFarmerAppInfoRequest, None)
484 .await?
485 .map_err(anyhow::Error::msg)?)
486 }
487
488 async fn subscribe_slot_info(
489 &self,
490 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
491 let subscription = self
492 .nats_client
493 .subscribe_to_broadcasts::<ClusterControllerSlotInfoBroadcast>(None, None)
494 .await?
495 .filter_map({
496 let mut last_slot_number = None;
497 let last_slot_info_instance = Arc::clone(&self.last_slot_info_instance);
498
499 move |broadcast| {
500 let slot_info = broadcast.slot_info;
501
502 let maybe_slot_info = if let Some(last_slot_number) = last_slot_number
503 && last_slot_number >= slot_info.slot
504 {
505 None
506 } else {
507 last_slot_number.replace(slot_info.slot);
508 *last_slot_info_instance.lock() = broadcast.instance;
509
510 Some(slot_info)
511 };
512
513 async move { maybe_slot_info }
514 }
515 });
516
517 Ok(Box::pin(subscription))
518 }
519
520 async fn submit_solution_response(
521 &self,
522 solution_response: SolutionResponse,
523 ) -> anyhow::Result<()> {
524 let last_slot_info_instance = self.last_slot_info_instance.lock().clone();
525 Ok(self
526 .nats_client
527 .notification(
528 &ClusterControllerSolutionNotification { solution_response },
529 Some(&last_slot_info_instance),
530 )
531 .await?)
532 }
533
534 async fn subscribe_block_sealing(
535 &self,
536 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = BlockSealInfo> + Send + 'static>>> {
537 let subscription = self
538 .nats_client
539 .subscribe_to_broadcasts::<ClusterControllerBlockSealingBroadcast>(None, None)
540 .await?
541 .map(|broadcast| broadcast.block_sealing_info);
542
543 Ok(Box::pin(subscription))
544 }
545
546 async fn submit_block_seal(&self, block_seal: BlockSealResponse) -> anyhow::Result<()> {
547 Ok(self
548 .nats_client
549 .notification(&ClusterControllerBlockSealNotification { block_seal }, None)
550 .await?)
551 }
552
553 async fn subscribe_archived_segment_headers(
554 &self,
555 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> {
556 let subscription = self
557 .nats_client
558 .subscribe_to_broadcasts::<ClusterControllerArchivedSegmentHeaderBroadcast>(None, None)
559 .await?
560 .filter_map({
561 let mut last_archived_segment_index = None;
562
563 move |broadcast| {
564 let archived_segment_header = broadcast.archived_segment_header;
565 let segment_index = archived_segment_header.local_segment_index();
566
567 let maybe_archived_segment_header = if let Some(last_archived_segment_index) =
568 last_archived_segment_index
569 && last_archived_segment_index >= segment_index
570 {
571 None
572 } else {
573 last_archived_segment_index.replace(segment_index);
574
575 Some(archived_segment_header)
576 };
577
578 async move { maybe_archived_segment_header }
579 }
580 });
581
582 Ok(Box::pin(subscription))
583 }
584
585 async fn super_segment_headers(
586 &self,
587 super_segment_indices: Vec<SuperSegmentIndex>,
588 ) -> anyhow::Result<Vec<Option<SuperSegmentHeader>>> {
589 Ok(self
590 .nats_client
591 .request(
592 &ClusterControllerSuperSegmentHeadersRequest {
593 super_segment_indices,
594 },
595 None,
596 )
597 .await?)
598 }
599
600 async fn segment_headers(
601 &self,
602 segment_indices: Vec<SegmentIndex>,
603 ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
604 Ok(self
605 .nats_client
606 .request(
607 &ClusterControllerSegmentHeadersRequest { segment_indices },
608 None,
609 )
610 .await?)
611 }
612
613 async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
614 Ok(self
615 .nats_client
616 .request(&ClusterControllerPieceRequest { piece_index }, None)
617 .await?)
618 }
619
620 async fn acknowledge_archived_segment_header(
621 &self,
622 _segment_index: SegmentIndex,
623 ) -> anyhow::Result<()> {
624 Ok(())
626 }
627
628 async fn update_shard_membership_info(
629 &self,
630 _info: FarmerShardMembershipInfo,
631 ) -> anyhow::Result<()> {
632 Ok(())
634 }
635}
636
637pub async fn controller_service<NC, PG>(
643 nats_client: &NatsClient,
644 node_client: &NC,
645 piece_getter: &PG,
646 farmer_caches: &[(&str, &FarmerCache)],
647 instance: &str,
648 primary_instance: bool,
649) -> anyhow::Result<()>
650where
651 NC: NodeClient,
652 PG: PieceGetter + Sync,
653{
654 if primary_instance {
655 select! {
656 result = slot_info_broadcaster(nats_client, node_client, instance).fuse() => {
657 result
658 },
659 result = block_sealing_broadcaster(nats_client, node_client, instance).fuse() => {
660 result
661 },
662 result = archived_segment_headers_broadcaster(nats_client, node_client, instance).fuse() => {
663 result
664 },
665 result = solution_response_forwarder(nats_client, node_client, instance).fuse() => {
666 result
667 },
668 result = block_seal_forwarder(nats_client, node_client, instance).fuse() => {
669 result
670 },
671 result = farmer_app_info_responder(nats_client, node_client).fuse() => {
672 result
673 },
674 result = super_segment_headers_responder(nats_client, node_client).fuse() => {
675 result
676 },
677 result = segment_headers_responder(nats_client, node_client).fuse() => {
678 result
679 },
680 result = find_piece_responder(nats_client, farmer_caches).fuse() => {
681 result
682 },
683 result = find_pieces_responder(nats_client, farmer_caches).fuse() => {
684 result
685 },
686 result = piece_responder(nats_client, piece_getter).fuse() => {
687 result
688 },
689 result = pieces_responder(nats_client, piece_getter).fuse() => {
690 result
691 },
692 }
693 } else {
694 select! {
695 result = farmer_app_info_responder(nats_client, node_client).fuse() => {
696 result
697 },
698 result = super_segment_headers_responder(nats_client, node_client).fuse() => {
699 result
700 },
701 result = segment_headers_responder(nats_client, node_client).fuse() => {
702 result
703 },
704 result = find_piece_responder(nats_client, farmer_caches).fuse() => {
705 result
706 },
707 result = find_pieces_responder(nats_client, farmer_caches).fuse() => {
708 result
709 },
710 result = piece_responder(nats_client, piece_getter).fuse() => {
711 result
712 },
713 result = pieces_responder(nats_client, piece_getter).fuse() => {
714 result
715 },
716 }
717 }
718}
719
720async fn slot_info_broadcaster<NC>(
721 nats_client: &NatsClient,
722 node_client: &NC,
723 instance: &str,
724) -> anyhow::Result<()>
725where
726 NC: NodeClient,
727{
728 let mut slot_info_notifications = node_client
729 .subscribe_slot_info()
730 .await
731 .map_err(|error| anyhow!("Failed to subscribe to slot info notifications: {error}"))?;
732
733 while let Some(slot_info) = slot_info_notifications.next().await {
734 debug!(?slot_info, "New slot");
735
736 let slot = slot_info.slot;
737
738 if let Err(error) = nats_client
739 .broadcast(
740 &ClusterControllerSlotInfoBroadcast {
741 slot_info,
742 instance: instance.to_string(),
743 },
744 instance,
745 )
746 .await
747 {
748 warn!(%slot, %error, "Failed to broadcast slot info");
749 }
750 }
751
752 Ok(())
753}
754
755async fn block_sealing_broadcaster<NC>(
756 nats_client: &NatsClient,
757 node_client: &NC,
758 instance: &str,
759) -> anyhow::Result<()>
760where
761 NC: NodeClient,
762{
763 let mut block_sealing_subscription = node_client
764 .subscribe_block_sealing()
765 .await
766 .map_err(|error| anyhow!("Failed to subscribe to block sealing notifications: {error}"))?;
767
768 while let Some(block_sealing_info) = block_sealing_subscription.next().await {
769 trace!(?block_sealing_info, "New block sealing notification");
770
771 if let Err(error) = nats_client
772 .broadcast(
773 &ClusterControllerBlockSealingBroadcast { block_sealing_info },
774 instance,
775 )
776 .await
777 {
778 warn!(%error, "Failed to broadcast block sealing info");
779 }
780 }
781
782 Ok(())
783}
784
785async fn archived_segment_headers_broadcaster<NC>(
786 nats_client: &NatsClient,
787 node_client: &NC,
788 instance: &str,
789) -> anyhow::Result<()>
790where
791 NC: NodeClient,
792{
793 let mut archived_segments_notifications = node_client
794 .subscribe_archived_segment_headers()
795 .await
796 .map_err(|error| {
797 anyhow!("Failed to subscribe to archived segment header notifications: {error}")
798 })?;
799
800 while let Some(archived_segment_header) = archived_segments_notifications.next().await {
801 trace!(
802 ?archived_segment_header,
803 "New archived archived segment header notification"
804 );
805
806 node_client
807 .acknowledge_archived_segment_header(
808 archived_segment_header.local_segment_index().into(),
809 )
810 .await
811 .map_err(|error| anyhow!("Failed to acknowledge archived segment header: {error}"))?;
812
813 if let Err(error) = nats_client
814 .broadcast(
815 &ClusterControllerArchivedSegmentHeaderBroadcast {
816 archived_segment_header,
817 },
818 instance,
819 )
820 .await
821 {
822 warn!(%error, "Failed to broadcast archived segment header info");
823 }
824 }
825
826 Ok(())
827}
828
829async fn solution_response_forwarder<NC>(
830 nats_client: &NatsClient,
831 node_client: &NC,
832 instance: &str,
833) -> anyhow::Result<()>
834where
835 NC: NodeClient,
836{
837 let mut subscription = nats_client
838 .subscribe_to_notifications::<ClusterControllerSolutionNotification>(
839 Some(instance),
840 Some(instance.to_string()),
841 )
842 .await
843 .map_err(|error| anyhow!("Failed to subscribe to solution notifications: {error}"))?;
844
845 while let Some(notification) = subscription.next().await {
846 debug!(?notification, "Solution notification");
847
848 let slot = notification.solution_response.slot_number;
849 let public_key_hash = notification.solution_response.solution.public_key_hash;
850 let sector_index = notification.solution_response.solution.sector_index;
851
852 if let Err(error) = node_client
853 .submit_solution_response(notification.solution_response)
854 .await
855 {
856 warn!(
857 %error,
858 %slot,
859 %public_key_hash,
860 %sector_index,
861 "Failed to send solution response"
862 );
863 }
864 }
865
866 Ok(())
867}
868
869async fn block_seal_forwarder<NC>(
870 nats_client: &NatsClient,
871 node_client: &NC,
872 instance: &str,
873) -> anyhow::Result<()>
874where
875 NC: NodeClient,
876{
877 let mut subscription = nats_client
878 .subscribe_to_notifications::<ClusterControllerBlockSealNotification>(
879 None,
880 Some(instance.to_string()),
881 )
882 .await
883 .map_err(|error| anyhow!("Failed to subscribe to block seal notifications: {error}"))?;
884
885 while let Some(notification) = subscription.next().await {
886 debug!(?notification, "Block seal notification");
887
888 if let Err(error) = node_client.submit_block_seal(notification.block_seal).await {
889 warn!(%error, "Failed to send block seal");
890 }
891 }
892
893 Ok(())
894}
895
896async fn farmer_app_info_responder<NC>(
897 nats_client: &NatsClient,
898 node_client: &NC,
899) -> anyhow::Result<()>
900where
901 NC: NodeClient,
902{
903 nats_client
904 .request_responder(
905 None,
906 Some("ab.controller".to_string()),
907 |_: ClusterControllerFarmerAppInfoRequest| async move {
908 Some(
909 node_client
910 .farmer_app_info()
911 .await
912 .map_err(|error| error.to_string()),
913 )
914 },
915 )
916 .await
917}
918
919async fn super_segment_headers_responder<NC>(
920 nats_client: &NatsClient,
921 node_client: &NC,
922) -> anyhow::Result<()>
923where
924 NC: NodeClient,
925{
926 nats_client
927 .request_responder(
928 None,
929 Some("ab.controller".to_string()),
930 |ClusterControllerSuperSegmentHeadersRequest { super_segment_indices }| async move {
931 node_client
932 .super_segment_headers(super_segment_indices.clone())
933 .await
934 .inspect_err(|error| {
935 warn!(%error, ?super_segment_indices, "Failed to get super segment headers");
936 })
937 .ok()
938 },
939 )
940 .await
941}
942
943async fn segment_headers_responder<NC>(
944 nats_client: &NatsClient,
945 node_client: &NC,
946) -> anyhow::Result<()>
947where
948 NC: NodeClient,
949{
950 nats_client
951 .request_responder(
952 None,
953 Some("ab.controller".to_string()),
954 |ClusterControllerSegmentHeadersRequest { segment_indices }| async move {
955 node_client
956 .segment_headers(segment_indices.clone())
957 .await
958 .inspect_err(|error| {
959 warn!(%error, ?segment_indices, "Failed to get segment headers");
960 })
961 .ok()
962 },
963 )
964 .await
965}
966
967async fn find_piece_responder(
968 nats_client: &NatsClient,
969 farmer_caches: &[(&str, &FarmerCache)],
970) -> anyhow::Result<()> {
971 futures::future::try_join(
972 farmer_caches
973 .iter()
974 .map(|(cache_group, farmer_cache)| {
975 nats_client.request_responder(
976 Some(cache_group),
977 Some("ab.controller".to_string()),
978 move |ClusterControllerFindPieceInCacheRequest { piece_index }| async move {
979 Some(farmer_cache.find_piece(piece_index).await)
980 },
981 )
982 })
983 .collect::<FuturesUnordered<_>>()
984 .next()
985 .map(|result| result.unwrap_or(Ok(()))),
986 nats_client.request_responder(
987 Some(GLOBAL_CACHE_GROUP),
988 Some("ab.controller".to_string()),
989 |ClusterControllerFindPieceInCacheRequest { piece_index }| async move {
990 let (_cache_group, farmer_cache) = farmer_caches.iter().choose(&mut rand::rng())?;
991 Some(farmer_cache.find_piece(piece_index).await)
992 },
993 ),
994 )
995 .await
996 .map(|((), ())| ())
997}
998
999async fn find_pieces_responder(
1000 nats_client: &NatsClient,
1001 farmer_caches: &[(&str, &FarmerCache)],
1002) -> anyhow::Result<()> {
1003 futures::future::try_join(
1004 farmer_caches
1005 .iter()
1006 .map(|(cache_group, farmer_cache)| {
1007 nats_client.stream_request_responder(
1008 Some(cache_group),
1009 Some("ab.controller".to_string()),
1010 move |ClusterControllerFindPiecesInCacheRequest { piece_indices }| async move {
1011 Some(stream::iter(farmer_cache.find_pieces(piece_indices).await))
1012 },
1013 )
1014 })
1015 .collect::<FuturesUnordered<_>>()
1016 .next()
1017 .map(|result| result.unwrap_or(Ok(()))),
1018 nats_client.stream_request_responder(
1019 Some(GLOBAL_CACHE_GROUP),
1020 Some("ab.controller".to_string()),
1021 |ClusterControllerFindPiecesInCacheRequest { piece_indices }| async move {
1022 let (_cache_group, farmer_cache) = farmer_caches.iter().choose(&mut rand::rng())?;
1023 Some(stream::iter(farmer_cache.find_pieces(piece_indices).await))
1024 },
1025 ),
1026 )
1027 .await
1028 .map(|((), ())| ())
1029}
1030
1031async fn piece_responder<PG>(nats_client: &NatsClient, piece_getter: &PG) -> anyhow::Result<()>
1032where
1033 PG: PieceGetter + Sync,
1034{
1035 nats_client
1036 .request_responder(
1037 None,
1038 Some("ab.controller".to_string()),
1039 |ClusterControllerPieceRequest { piece_index }| async move {
1040 piece_getter
1041 .get_piece(piece_index)
1042 .await
1043 .inspect_err(|error| warn!(%error, %piece_index, "Failed to get piece"))
1044 .ok()
1045 },
1046 )
1047 .await
1048}
1049
1050async fn pieces_responder<PG>(nats_client: &NatsClient, piece_getter: &PG) -> anyhow::Result<()>
1051where
1052 PG: PieceGetter + Sync,
1053{
1054 nats_client
1055 .stream_request_responder(
1056 None,
1057 Some("ab.controller".to_string()),
1058 |ClusterControllerPiecesRequest { piece_indices }| async move {
1059 piece_getter
1060 .get_pieces(piece_indices)
1061 .await
1062 .map(|stream| {
1063 Box::pin(stream.filter_map(
1064 |(piece_index, maybe_piece_result)| async move {
1065 match maybe_piece_result {
1066 Ok(Some(piece)) => Some((piece_index, piece)),
1067 Ok(None) => None,
1068 Err(error) => {
1069 warn!(%error, %piece_index, "Failed to get piece");
1070 None
1071 }
1072 }
1073 },
1074 ))
1075 })
1076 .inspect_err(|error| warn!(%error, "Failed to get pieces"))
1077 .ok()
1078 },
1079 )
1080 .await
1081}