1pub mod caches;
10pub mod farms;
11mod stream_map;
12
13use crate::cluster::cache::{ClusterCacheReadPieceRequest, ClusterCacheReadPiecesRequest};
14use crate::cluster::nats_client::{
15 GenericBroadcast, GenericNotification, GenericRequest, GenericStreamRequest, NatsClient,
16};
17use crate::farm::{PieceCacheId, PieceCacheOffset};
18use crate::farmer_cache::FarmerCache;
19use crate::node_client::NodeClient;
20use ab_core_primitives::pieces::{Piece, PieceIndex};
21use ab_core_primitives::segments::{
22 SegmentIndex, SuperSegmentHeader, SuperSegmentIndex, SuperSegmentRoot,
23};
24use ab_data_retrieval::piece_getter::PieceGetter;
25use ab_farmer_rpc_primitives::{
26 BlockSealInfo, BlockSealResponse, FarmerAppInfo, FarmerShardMembershipInfo, SlotInfo,
27 SolutionResponse,
28};
29use anyhow::anyhow;
30use async_nats::HeaderValue;
31use async_trait::async_trait;
32use futures::channel::mpsc;
33use futures::future::FusedFuture;
34use futures::stream::FuturesUnordered;
35use futures::{FutureExt, Stream, StreamExt, select, stream};
36use parity_scale_codec::{Decode, Encode};
37use parking_lot::Mutex;
38use rand::prelude::*;
39use std::collections::{HashMap, HashSet};
40use std::pin::Pin;
41use std::sync::Arc;
42use std::task::Poll;
43use tracing::{debug, error, trace, warn};
44
45const GLOBAL_CACHE_GROUP: &str = "_";
48
49#[derive(Debug, Copy, Clone, Encode, Decode)]
51pub struct ClusterControllerFarmerIdentifyBroadcast;
52
53impl GenericBroadcast for ClusterControllerFarmerIdentifyBroadcast {
54 const SUBJECT: &'static str = "ab.controller.farmer-identify";
55}
56
57#[derive(Debug, Copy, Clone, Encode, Decode)]
59pub struct ClusterControllerCacheIdentifyBroadcast;
60
61impl GenericBroadcast for ClusterControllerCacheIdentifyBroadcast {
62 const SUBJECT: &'static str = "ab.controller.*.cache-identify";
64}
65
66#[derive(Debug, Clone, Encode, Decode)]
68struct ClusterControllerSlotInfoBroadcast {
69 slot_info: SlotInfo,
70 instance: String,
71}
72
73impl GenericBroadcast for ClusterControllerSlotInfoBroadcast {
74 const SUBJECT: &'static str = "ab.controller.slot-info";
75
76 fn deterministic_message_id(&self) -> Option<HeaderValue> {
77 Some(HeaderValue::from(format!(
80 "slot-info-{}",
81 self.slot_info.slot
82 )))
83 }
84}
85
86#[derive(Debug, Clone, Encode, Decode)]
88struct ClusterControllerBlockSealingBroadcast {
89 block_sealing_info: BlockSealInfo,
90}
91
92impl GenericBroadcast for ClusterControllerBlockSealingBroadcast {
93 const SUBJECT: &'static str = "ab.controller.block-sealing-info";
94}
95
96#[derive(Debug, Clone, Encode, Decode)]
98struct ClusterControllerNewSuperSegmentHeaderBroadcast {
99 new_super_segment_header: SuperSegmentHeader,
100}
101
102impl GenericBroadcast for ClusterControllerNewSuperSegmentHeaderBroadcast {
103 const SUBJECT: &'static str = "ab.controller.new-super-segment-header";
104
105 fn deterministic_message_id(&self) -> Option<HeaderValue> {
106 Some(HeaderValue::from(format!(
109 "new-super-segment-{}",
110 self.new_super_segment_header.index
111 )))
112 }
113}
114
115#[derive(Debug, Clone, Encode, Decode)]
117struct ClusterControllerSolutionNotification {
118 solution_response: SolutionResponse,
119}
120
121impl GenericNotification for ClusterControllerSolutionNotification {
122 const SUBJECT: &'static str = "ab.controller.*.solution";
123}
124
125#[derive(Debug, Clone, Encode, Decode)]
127struct ClusterControllerBlockSealNotification {
128 block_seal: BlockSealResponse,
129}
130
131impl GenericNotification for ClusterControllerBlockSealNotification {
132 const SUBJECT: &'static str = "ab.controller.block-seal";
133}
134
135#[derive(Debug, Clone, Encode, Decode)]
137struct ClusterControllerFarmerAppInfoRequest;
138
139impl GenericRequest for ClusterControllerFarmerAppInfoRequest {
140 const SUBJECT: &'static str = "ab.controller.farmer-app-info";
141 type Response = Result<FarmerAppInfo, String>;
142}
143
144#[derive(Debug, Clone, Encode, Decode)]
146struct ClusterControllerSuperSegmentHeadersRequest {
147 super_segment_indices: Vec<SuperSegmentIndex>,
148}
149
150impl GenericRequest for ClusterControllerSuperSegmentHeadersRequest {
151 const SUBJECT: &'static str = "ab.controller.super-segment-headers";
152 type Response = Vec<Option<SuperSegmentHeader>>;
153}
154
155#[derive(Debug, Clone, Encode, Decode)]
157struct ClusterControllerSuperSegmentRootForSegmentIndexRequest {
158 segment_index: SegmentIndex,
159}
160
161impl GenericRequest for ClusterControllerSuperSegmentRootForSegmentIndexRequest {
162 const SUBJECT: &'static str = "ab.controller.super-segment-root-for-segment-index";
163 type Response = Option<SuperSegmentRoot>;
164}
165
166#[derive(Debug, Clone, Encode, Decode)]
168struct ClusterControllerFindPieceInCacheRequest {
169 piece_index: PieceIndex,
170}
171
172impl GenericRequest for ClusterControllerFindPieceInCacheRequest {
173 const SUBJECT: &'static str = "ab.controller.*.find-piece-in-cache";
174 type Response = Option<(PieceCacheId, PieceCacheOffset)>;
175}
176
177#[derive(Debug, Clone, Encode, Decode)]
179struct ClusterControllerFindPiecesInCacheRequest {
180 piece_indices: Vec<PieceIndex>,
181}
182
183impl GenericStreamRequest for ClusterControllerFindPiecesInCacheRequest {
184 const SUBJECT: &'static str = "ab.controller.*.find-pieces-in-cache";
185 type Response = (PieceIndex, PieceCacheId, PieceCacheOffset);
187}
188
189#[derive(Debug, Clone, Encode, Decode)]
191struct ClusterControllerPieceRequest {
192 piece_index: PieceIndex,
193}
194
195impl GenericRequest for ClusterControllerPieceRequest {
196 const SUBJECT: &'static str = "ab.controller.piece";
197 type Response = Option<Piece>;
198}
199
200#[derive(Debug, Clone, Encode, Decode)]
202struct ClusterControllerPiecesRequest {
203 piece_indices: Vec<PieceIndex>,
204}
205
206impl GenericStreamRequest for ClusterControllerPiecesRequest {
207 const SUBJECT: &'static str = "ab.controller.pieces";
208 type Response = (PieceIndex, Piece);
210}
211
212#[derive(Debug, Clone)]
214pub struct ClusterPieceGetter {
215 nats_client: NatsClient,
216 cache_group: String,
217}
218
219#[async_trait]
220impl PieceGetter for ClusterPieceGetter {
221 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
222 if let Some((piece_cache_id, piece_cache_offset)) = self
223 .nats_client
224 .request(
225 &ClusterControllerFindPieceInCacheRequest { piece_index },
226 Some(&self.cache_group),
227 )
228 .await?
229 {
230 trace!(
231 %piece_index,
232 %piece_cache_id,
233 %piece_cache_offset,
234 "Found piece in cache, retrieving"
235 );
236
237 match self
238 .nats_client
239 .request(
240 &ClusterCacheReadPieceRequest {
241 offset: piece_cache_offset,
242 },
243 Some(&piece_cache_id.to_string()),
244 )
245 .await
246 .map_err(|error| error.to_string())
247 .flatten()
248 {
249 Ok(Some((retrieved_piece_index, piece))) => {
250 if retrieved_piece_index == piece_index {
251 trace!(
252 %piece_index,
253 %piece_cache_id,
254 %piece_cache_offset,
255 "Retrieved piece from cache successfully"
256 );
257
258 return Ok(Some(piece));
259 } else {
260 trace!(
261 %piece_index,
262 %piece_cache_id,
263 %piece_cache_offset,
264 "Retrieving piece was replaced in cache during retrieval"
265 );
266 }
267 }
268 Ok(None) => {
269 trace!(
270 %piece_index,
271 %piece_cache_id,
272 %piece_cache_offset,
273 "Piece cache didn't have piece at offset"
274 );
275 }
276 Err(error) => {
277 debug!(
278 %piece_index,
279 %piece_cache_id,
280 %piece_cache_offset,
281 %error,
282 "Retrieving piece from cache failed"
283 );
284 }
285 }
286 } else {
287 trace!(%piece_index, "Piece not found in cache");
288 }
289
290 Ok(self
291 .nats_client
292 .request(&ClusterControllerPieceRequest { piece_index }, None)
293 .await?)
294 }
295
296 async fn get_pieces<'a>(
297 &'a self,
298 piece_indices: Vec<PieceIndex>,
299 ) -> anyhow::Result<
300 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
301 > {
302 let (tx, mut rx) = mpsc::unbounded();
303
304 let piece_indices_to_get =
305 Mutex::new(piece_indices.iter().copied().collect::<HashSet<_>>());
306
307 let mut cached_pieces_by_cache_id = HashMap::<PieceCacheId, Vec<PieceCacheOffset>>::new();
308
309 {
310 let mut cached_pieces = self
311 .nats_client
312 .stream_request(
313 &ClusterControllerFindPiecesInCacheRequest { piece_indices },
314 Some(&self.cache_group),
315 )
316 .await?;
317
318 while let Some((_piece_index, piece_cache_id, piece_cache_offset)) =
319 cached_pieces.next().await
320 {
321 cached_pieces_by_cache_id
322 .entry(piece_cache_id)
323 .or_default()
324 .push(piece_cache_offset);
325 }
326 }
327
328 let fut = async move {
329 let tx = &tx;
330
331 cached_pieces_by_cache_id
332 .into_iter()
333 .map(|(piece_cache_id, offsets)| {
334 let piece_indices_to_get = &piece_indices_to_get;
335
336 async move {
337 let mut pieces_stream = match self
338 .nats_client
339 .stream_request(
340 &ClusterCacheReadPiecesRequest { offsets },
341 Some(&piece_cache_id.to_string()),
342 )
343 .await
344 {
345 Ok(pieces) => pieces,
346 Err(error) => {
347 warn!(
348 %error,
349 %piece_cache_id,
350 "Failed to request pieces from cache"
351 );
352
353 return;
354 }
355 };
356
357 while let Some(piece_result) = pieces_stream.next().await {
358 let (piece_offset, maybe_piece) = match piece_result {
359 Ok(result) => result,
360 Err(error) => {
361 warn!(%error, "Failed to get piece from cache");
362 continue;
363 }
364 };
365
366 if let Some((piece_index, piece)) = maybe_piece {
367 piece_indices_to_get.lock().remove(&piece_index);
368
369 tx.unbounded_send((piece_index, Ok(Some(piece)))).expect(
370 "This future isn't polled after receiver is dropped; qed",
371 );
372 } else {
373 warn!(
374 %piece_cache_id,
375 %piece_offset,
376 "Failed to get piece from cache, it was missing or already gone"
377 );
378 }
379 }
380 }
381 })
382 .collect::<FuturesUnordered<_>>()
383 .for_each(|()| async {})
385 .await;
386
387 let mut piece_indices_to_get = piece_indices_to_get.into_inner();
388 if piece_indices_to_get.is_empty() {
389 return;
390 }
391
392 let mut pieces_from_controller = match self
393 .nats_client
394 .stream_request(
395 &ClusterControllerPiecesRequest {
396 piece_indices: piece_indices_to_get.iter().copied().collect(),
397 },
398 None,
399 )
400 .await
401 {
402 Ok(pieces_from_controller) => pieces_from_controller,
403 Err(error) => {
404 error!(%error, "Failed to get pieces from controller");
405
406 for piece_index in piece_indices_to_get {
407 tx.unbounded_send((
408 piece_index,
409 Err(anyhow::anyhow!("Failed to get piece from controller")),
410 ))
411 .expect("This future isn't polled after receiver is dropped; qed");
412 }
413 return;
414 }
415 };
416
417 while let Some((piece_index, piece)) = pieces_from_controller.next().await {
418 piece_indices_to_get.remove(&piece_index);
419 tx.unbounded_send((piece_index, Ok(Some(piece))))
420 .expect("This future isn't polled after receiver is dropped; qed");
421 }
422
423 for piece_index in piece_indices_to_get {
424 tx.unbounded_send((piece_index, Err(anyhow::anyhow!("Failed to get piece"))))
425 .expect("This future isn't polled after receiver is dropped; qed");
426 }
427 };
428 let mut fut = Box::pin(fut.fuse());
429
430 Ok(Box::new(stream::poll_fn(move |cx| {
432 if !fut.is_terminated() {
433 let _ = fut.poll_unpin(cx);
435 }
436
437 if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) {
438 return Poll::Ready(maybe_result);
439 }
440
441 Poll::Pending
443 })))
444 }
445}
446
447impl ClusterPieceGetter {
448 #[inline]
450 pub fn new(nats_client: NatsClient, cache_group: Option<String>) -> Self {
451 Self {
452 nats_client,
453 cache_group: cache_group.unwrap_or_else(|| GLOBAL_CACHE_GROUP.to_string()),
454 }
455 }
456}
457
458#[derive(Debug, Clone)]
461pub struct ClusterNodeClient {
462 nats_client: NatsClient,
463 last_slot_info_instance: Arc<Mutex<String>>,
466}
467
468impl ClusterNodeClient {
469 pub fn new(nats_client: NatsClient) -> Self {
471 Self {
472 nats_client,
473 last_slot_info_instance: Arc::default(),
474 }
475 }
476}
477
478#[async_trait]
479impl NodeClient for ClusterNodeClient {
480 async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
481 Ok(self
482 .nats_client
483 .request(&ClusterControllerFarmerAppInfoRequest, None)
484 .await?
485 .map_err(anyhow::Error::msg)?)
486 }
487
488 async fn subscribe_slot_info(
489 &self,
490 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
491 let subscription = self
492 .nats_client
493 .subscribe_to_broadcasts::<ClusterControllerSlotInfoBroadcast>(None, None)
494 .await?
495 .filter_map({
496 let mut last_slot_number = None;
497 let last_slot_info_instance = Arc::clone(&self.last_slot_info_instance);
498
499 move |broadcast| {
500 let slot_info = broadcast.slot_info;
501
502 let maybe_slot_info = if let Some(last_slot_number) = last_slot_number
503 && last_slot_number >= slot_info.slot
504 {
505 None
506 } else {
507 last_slot_number.replace(slot_info.slot);
508 *last_slot_info_instance.lock() = broadcast.instance;
509
510 Some(slot_info)
511 };
512
513 async move { maybe_slot_info }
514 }
515 });
516
517 Ok(Box::pin(subscription))
518 }
519
520 async fn submit_solution_response(
521 &self,
522 solution_response: SolutionResponse,
523 ) -> anyhow::Result<()> {
524 let last_slot_info_instance = self.last_slot_info_instance.lock().clone();
525 Ok(self
526 .nats_client
527 .notification(
528 &ClusterControllerSolutionNotification { solution_response },
529 Some(&last_slot_info_instance),
530 )
531 .await?)
532 }
533
534 async fn subscribe_block_sealing(
535 &self,
536 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = BlockSealInfo> + Send + 'static>>> {
537 let subscription = self
538 .nats_client
539 .subscribe_to_broadcasts::<ClusterControllerBlockSealingBroadcast>(None, None)
540 .await?
541 .map(|broadcast| broadcast.block_sealing_info);
542
543 Ok(Box::pin(subscription))
544 }
545
546 async fn submit_block_seal(&self, block_seal: BlockSealResponse) -> anyhow::Result<()> {
547 Ok(self
548 .nats_client
549 .notification(&ClusterControllerBlockSealNotification { block_seal }, None)
550 .await?)
551 }
552
553 async fn subscribe_new_super_segment_headers(
554 &self,
555 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SuperSegmentHeader> + Send + 'static>>> {
556 let subscription = self
557 .nats_client
558 .subscribe_to_broadcasts::<ClusterControllerNewSuperSegmentHeaderBroadcast>(None, None)
559 .await?
560 .filter_map({
561 let mut last_super_segment_index = None;
562
563 move |broadcast| {
564 let new_super_segment_header = broadcast.new_super_segment_header;
565 let super_segment_index = new_super_segment_header.index.as_inner();
566
567 let maybe_super_segment_header = if let Some(last_super_segment_index) =
568 last_super_segment_index
569 && last_super_segment_index >= super_segment_index
570 {
571 None
572 } else {
573 last_super_segment_index.replace(super_segment_index);
574
575 Some(new_super_segment_header)
576 };
577
578 async move { maybe_super_segment_header }
579 }
580 });
581
582 Ok(Box::pin(subscription))
583 }
584
585 async fn super_segment_headers(
586 &self,
587 super_segment_indices: Vec<SuperSegmentIndex>,
588 ) -> anyhow::Result<Vec<Option<SuperSegmentHeader>>> {
589 Ok(self
590 .nats_client
591 .request(
592 &ClusterControllerSuperSegmentHeadersRequest {
593 super_segment_indices,
594 },
595 None,
596 )
597 .await?)
598 }
599
600 async fn super_segment_root_for_segment_index(
601 &self,
602 segment_index: SegmentIndex,
603 ) -> anyhow::Result<Option<SuperSegmentRoot>> {
604 Ok(self
605 .nats_client
606 .request(
607 &ClusterControllerSuperSegmentRootForSegmentIndexRequest { segment_index },
608 None,
609 )
610 .await?)
611 }
612
613 async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
614 Ok(self
615 .nats_client
616 .request(&ClusterControllerPieceRequest { piece_index }, None)
617 .await?)
618 }
619
620 async fn update_shard_membership_info(
621 &self,
622 _info: FarmerShardMembershipInfo,
623 ) -> anyhow::Result<()> {
624 Ok(())
626 }
627}
628
629pub async fn controller_service<NC, PG>(
635 nats_client: &NatsClient,
636 node_client: &NC,
637 piece_getter: &PG,
638 farmer_caches: &[(&str, &FarmerCache)],
639 instance: &str,
640 primary_instance: bool,
641) -> anyhow::Result<()>
642where
643 NC: NodeClient,
644 PG: PieceGetter + Sync,
645{
646 if primary_instance {
647 select! {
648 result = slot_info_broadcaster(nats_client, node_client, instance).fuse() => {
649 result
650 },
651 result = block_sealing_broadcaster(nats_client, node_client, instance).fuse() => {
652 result
653 },
654 result = new_super_segment_headers_broadcaster(nats_client, node_client, instance).fuse() => {
655 result
656 },
657 result = solution_response_forwarder(nats_client, node_client, instance).fuse() => {
658 result
659 },
660 result = block_seal_forwarder(nats_client, node_client, instance).fuse() => {
661 result
662 },
663 result = farmer_app_info_responder(nats_client, node_client).fuse() => {
664 result
665 },
666 result = super_segment_headers_responder(nats_client, node_client).fuse() => {
667 result
668 },
669 result = super_segment_root_for_segment_index_responder(nats_client, node_client).fuse() => {
670 result
671 },
672 result = find_piece_responder(nats_client, farmer_caches).fuse() => {
673 result
674 },
675 result = find_pieces_responder(nats_client, farmer_caches).fuse() => {
676 result
677 },
678 result = piece_responder(nats_client, piece_getter).fuse() => {
679 result
680 },
681 result = pieces_responder(nats_client, piece_getter).fuse() => {
682 result
683 },
684 }
685 } else {
686 select! {
687 result = farmer_app_info_responder(nats_client, node_client).fuse() => {
688 result
689 },
690 result = super_segment_headers_responder(nats_client, node_client).fuse() => {
691 result
692 },
693 result = super_segment_root_for_segment_index_responder(nats_client, node_client).fuse() => {
694 result
695 },
696 result = find_piece_responder(nats_client, farmer_caches).fuse() => {
697 result
698 },
699 result = find_pieces_responder(nats_client, farmer_caches).fuse() => {
700 result
701 },
702 result = piece_responder(nats_client, piece_getter).fuse() => {
703 result
704 },
705 result = pieces_responder(nats_client, piece_getter).fuse() => {
706 result
707 },
708 }
709 }
710}
711
712async fn slot_info_broadcaster<NC>(
713 nats_client: &NatsClient,
714 node_client: &NC,
715 instance: &str,
716) -> anyhow::Result<()>
717where
718 NC: NodeClient,
719{
720 let mut slot_info_notifications = node_client
721 .subscribe_slot_info()
722 .await
723 .map_err(|error| anyhow!("Failed to subscribe to slot info notifications: {error}"))?;
724
725 while let Some(slot_info) = slot_info_notifications.next().await {
726 debug!(?slot_info, "New slot");
727
728 let slot = slot_info.slot;
729
730 if let Err(error) = nats_client
731 .broadcast(
732 &ClusterControllerSlotInfoBroadcast {
733 slot_info,
734 instance: instance.to_string(),
735 },
736 instance,
737 )
738 .await
739 {
740 warn!(%slot, %error, "Failed to broadcast slot info");
741 }
742 }
743
744 Ok(())
745}
746
747async fn block_sealing_broadcaster<NC>(
748 nats_client: &NatsClient,
749 node_client: &NC,
750 instance: &str,
751) -> anyhow::Result<()>
752where
753 NC: NodeClient,
754{
755 let mut block_sealing_subscription = node_client
756 .subscribe_block_sealing()
757 .await
758 .map_err(|error| anyhow!("Failed to subscribe to block sealing notifications: {error}"))?;
759
760 while let Some(block_sealing_info) = block_sealing_subscription.next().await {
761 trace!(?block_sealing_info, "New block sealing notification");
762
763 if let Err(error) = nats_client
764 .broadcast(
765 &ClusterControllerBlockSealingBroadcast { block_sealing_info },
766 instance,
767 )
768 .await
769 {
770 warn!(%error, "Failed to broadcast block sealing info");
771 }
772 }
773
774 Ok(())
775}
776
777async fn new_super_segment_headers_broadcaster<NC>(
778 nats_client: &NatsClient,
779 node_client: &NC,
780 instance: &str,
781) -> anyhow::Result<()>
782where
783 NC: NodeClient,
784{
785 let mut new_super_segments_notifications = node_client
786 .subscribe_new_super_segment_headers()
787 .await
788 .map_err(|error| {
789 anyhow!("Failed to subscribe to new super segment header notifications: {error}")
790 })?;
791
792 while let Some(new_super_segment_header) = new_super_segments_notifications.next().await {
793 trace!(
794 ?new_super_segment_header,
795 "New archived new super segment header notification"
796 );
797
798 if let Err(error) = nats_client
799 .broadcast(
800 &ClusterControllerNewSuperSegmentHeaderBroadcast {
801 new_super_segment_header,
802 },
803 instance,
804 )
805 .await
806 {
807 warn!(%error, "Failed to broadcast new super segment header info");
808 }
809 }
810
811 Ok(())
812}
813
814async fn solution_response_forwarder<NC>(
815 nats_client: &NatsClient,
816 node_client: &NC,
817 instance: &str,
818) -> anyhow::Result<()>
819where
820 NC: NodeClient,
821{
822 let mut subscription = nats_client
823 .subscribe_to_notifications::<ClusterControllerSolutionNotification>(
824 Some(instance),
825 Some(instance.to_string()),
826 )
827 .await
828 .map_err(|error| anyhow!("Failed to subscribe to solution notifications: {error}"))?;
829
830 while let Some(notification) = subscription.next().await {
831 debug!(?notification, "Solution notification");
832
833 let slot = notification.solution_response.slot_number;
834 let public_key_hash = notification.solution_response.solution.public_key_hash;
835 let sector_index = notification.solution_response.solution.sector_index;
836
837 if let Err(error) = node_client
838 .submit_solution_response(notification.solution_response)
839 .await
840 {
841 warn!(
842 %error,
843 %slot,
844 %public_key_hash,
845 %sector_index,
846 "Failed to send solution response"
847 );
848 }
849 }
850
851 Ok(())
852}
853
854async fn block_seal_forwarder<NC>(
855 nats_client: &NatsClient,
856 node_client: &NC,
857 instance: &str,
858) -> anyhow::Result<()>
859where
860 NC: NodeClient,
861{
862 let mut subscription = nats_client
863 .subscribe_to_notifications::<ClusterControllerBlockSealNotification>(
864 None,
865 Some(instance.to_string()),
866 )
867 .await
868 .map_err(|error| anyhow!("Failed to subscribe to block seal notifications: {error}"))?;
869
870 while let Some(notification) = subscription.next().await {
871 debug!(?notification, "Block seal notification");
872
873 if let Err(error) = node_client.submit_block_seal(notification.block_seal).await {
874 warn!(%error, "Failed to send block seal");
875 }
876 }
877
878 Ok(())
879}
880
881async fn farmer_app_info_responder<NC>(
882 nats_client: &NatsClient,
883 node_client: &NC,
884) -> anyhow::Result<()>
885where
886 NC: NodeClient,
887{
888 nats_client
889 .request_responder(
890 None,
891 Some("ab.controller".to_string()),
892 |_: ClusterControllerFarmerAppInfoRequest| async move {
893 Some(
894 node_client
895 .farmer_app_info()
896 .await
897 .map_err(|error| error.to_string()),
898 )
899 },
900 )
901 .await
902}
903
904async fn super_segment_headers_responder<NC>(
905 nats_client: &NatsClient,
906 node_client: &NC,
907) -> anyhow::Result<()>
908where
909 NC: NodeClient,
910{
911 nats_client
912 .request_responder(
913 None,
914 Some("ab.controller".to_string()),
915 |ClusterControllerSuperSegmentHeadersRequest { super_segment_indices }| async move {
916 node_client
917 .super_segment_headers(super_segment_indices.clone())
918 .await
919 .inspect_err(|error| {
920 warn!(%error, ?super_segment_indices, "Failed to get super segment headers");
921 })
922 .ok()
923 },
924 )
925 .await
926}
927
928async fn super_segment_root_for_segment_index_responder<NC>(
929 nats_client: &NatsClient,
930 node_client: &NC,
931) -> anyhow::Result<()>
932where
933 NC: NodeClient,
934{
935 nats_client
936 .request_responder(
937 None,
938 Some("ab.controller".to_string()),
939 |ClusterControllerSuperSegmentRootForSegmentIndexRequest { segment_index }| async move {
940 node_client
941 .super_segment_root_for_segment_index(segment_index)
942 .await
943 .inspect_err(|error| {
944 warn!(
945 %error,
946 %segment_index,
947 "Failed to get super segment root for segment index"
948 );
949 })
950 .ok()
951 },
952 )
953 .await
954}
955
956async fn find_piece_responder(
957 nats_client: &NatsClient,
958 farmer_caches: &[(&str, &FarmerCache)],
959) -> anyhow::Result<()> {
960 futures::future::try_join(
961 farmer_caches
962 .iter()
963 .map(|(cache_group, farmer_cache)| {
964 nats_client.request_responder(
965 Some(cache_group),
966 Some("ab.controller".to_string()),
967 move |ClusterControllerFindPieceInCacheRequest { piece_index }| async move {
968 Some(farmer_cache.find_piece(piece_index).await)
969 },
970 )
971 })
972 .collect::<FuturesUnordered<_>>()
973 .next()
974 .map(|result| result.unwrap_or(Ok(()))),
975 nats_client.request_responder(
976 Some(GLOBAL_CACHE_GROUP),
977 Some("ab.controller".to_string()),
978 |ClusterControllerFindPieceInCacheRequest { piece_index }| async move {
979 let (_cache_group, farmer_cache) = farmer_caches.iter().choose(&mut rand::rng())?;
980 Some(farmer_cache.find_piece(piece_index).await)
981 },
982 ),
983 )
984 .await
985 .map(|((), ())| ())
986}
987
988async fn find_pieces_responder(
989 nats_client: &NatsClient,
990 farmer_caches: &[(&str, &FarmerCache)],
991) -> anyhow::Result<()> {
992 futures::future::try_join(
993 farmer_caches
994 .iter()
995 .map(|(cache_group, farmer_cache)| {
996 nats_client.stream_request_responder(
997 Some(cache_group),
998 Some("ab.controller".to_string()),
999 move |ClusterControllerFindPiecesInCacheRequest { piece_indices }| async move {
1000 Some(stream::iter(farmer_cache.find_pieces(piece_indices).await))
1001 },
1002 )
1003 })
1004 .collect::<FuturesUnordered<_>>()
1005 .next()
1006 .map(|result| result.unwrap_or(Ok(()))),
1007 nats_client.stream_request_responder(
1008 Some(GLOBAL_CACHE_GROUP),
1009 Some("ab.controller".to_string()),
1010 |ClusterControllerFindPiecesInCacheRequest { piece_indices }| async move {
1011 let (_cache_group, farmer_cache) = farmer_caches.iter().choose(&mut rand::rng())?;
1012 Some(stream::iter(farmer_cache.find_pieces(piece_indices).await))
1013 },
1014 ),
1015 )
1016 .await
1017 .map(|((), ())| ())
1018}
1019
1020async fn piece_responder<PG>(nats_client: &NatsClient, piece_getter: &PG) -> anyhow::Result<()>
1021where
1022 PG: PieceGetter + Sync,
1023{
1024 nats_client
1025 .request_responder(
1026 None,
1027 Some("ab.controller".to_string()),
1028 |ClusterControllerPieceRequest { piece_index }| async move {
1029 piece_getter
1030 .get_piece(piece_index)
1031 .await
1032 .inspect_err(|error| warn!(%error, %piece_index, "Failed to get piece"))
1033 .ok()
1034 },
1035 )
1036 .await
1037}
1038
1039async fn pieces_responder<PG>(nats_client: &NatsClient, piece_getter: &PG) -> anyhow::Result<()>
1040where
1041 PG: PieceGetter + Sync,
1042{
1043 nats_client
1044 .stream_request_responder(
1045 None,
1046 Some("ab.controller".to_string()),
1047 |ClusterControllerPiecesRequest { piece_indices }| async move {
1048 piece_getter
1049 .get_pieces(piece_indices)
1050 .await
1051 .map(|stream| {
1052 Box::pin(stream.filter_map(
1053 |(piece_index, maybe_piece_result)| async move {
1054 match maybe_piece_result {
1055 Ok(Some(piece)) => Some((piece_index, piece)),
1056 Ok(None) => None,
1057 Err(error) => {
1058 warn!(%error, %piece_index, "Failed to get piece");
1059 None
1060 }
1061 }
1062 },
1063 ))
1064 })
1065 .inspect_err(|error| warn!(%error, "Failed to get pieces"))
1066 .ok()
1067 },
1068 )
1069 .await
1070}