1pub mod caches;
10pub mod farms;
11mod stream_map;
12
13use crate::cluster::cache::{ClusterCacheReadPieceRequest, ClusterCacheReadPiecesRequest};
14use crate::cluster::nats_client::{
15 GenericBroadcast, GenericNotification, GenericRequest, GenericStreamRequest, NatsClient,
16};
17use crate::farm::{PieceCacheId, PieceCacheOffset};
18use crate::farmer_cache::FarmerCache;
19use crate::node_client::NodeClient;
20use ab_core_primitives::pieces::{Piece, PieceIndex};
21use ab_core_primitives::segments::{SegmentHeader, SegmentIndex};
22use ab_data_retrieval::piece_getter::PieceGetter;
23use ab_farmer_rpc_primitives::{
24 BlockSealInfo, BlockSealResponse, FarmerAppInfo, FarmerShardMembershipInfo, SlotInfo,
25 SolutionResponse,
26};
27use anyhow::anyhow;
28use async_nats::HeaderValue;
29use async_trait::async_trait;
30use futures::channel::mpsc;
31use futures::future::FusedFuture;
32use futures::stream::FuturesUnordered;
33use futures::{FutureExt, Stream, StreamExt, select, stream};
34use parity_scale_codec::{Decode, Encode};
35use parking_lot::Mutex;
36use rand::prelude::*;
37use std::collections::{HashMap, HashSet};
38use std::pin::Pin;
39use std::sync::Arc;
40use std::task::Poll;
41use tracing::{debug, error, trace, warn};
42
43const GLOBAL_CACHE_GROUP: &str = "_";
46
47#[derive(Debug, Copy, Clone, Encode, Decode)]
49pub struct ClusterControllerFarmerIdentifyBroadcast;
50
51impl GenericBroadcast for ClusterControllerFarmerIdentifyBroadcast {
52 const SUBJECT: &'static str = "ab.controller.farmer-identify";
53}
54
55#[derive(Debug, Copy, Clone, Encode, Decode)]
57pub struct ClusterControllerCacheIdentifyBroadcast;
58
59impl GenericBroadcast for ClusterControllerCacheIdentifyBroadcast {
60 const SUBJECT: &'static str = "ab.controller.*.cache-identify";
62}
63
64#[derive(Debug, Clone, Encode, Decode)]
66struct ClusterControllerSlotInfoBroadcast {
67 slot_info: SlotInfo,
68 instance: String,
69}
70
71impl GenericBroadcast for ClusterControllerSlotInfoBroadcast {
72 const SUBJECT: &'static str = "ab.controller.slot-info";
73
74 fn deterministic_message_id(&self) -> Option<HeaderValue> {
75 Some(HeaderValue::from(format!(
78 "slot-info-{}",
79 self.slot_info.slot
80 )))
81 }
82}
83
84#[derive(Debug, Clone, Encode, Decode)]
86struct ClusterControllerBlockSealingBroadcast {
87 block_sealing_info: BlockSealInfo,
88}
89
90impl GenericBroadcast for ClusterControllerBlockSealingBroadcast {
91 const SUBJECT: &'static str = "ab.controller.block-sealing-info";
92}
93
94#[derive(Debug, Clone, Encode, Decode)]
96struct ClusterControllerArchivedSegmentHeaderBroadcast {
97 archived_segment_header: SegmentHeader,
98}
99
100impl GenericBroadcast for ClusterControllerArchivedSegmentHeaderBroadcast {
101 const SUBJECT: &'static str = "ab.controller.archived-segment-header";
102
103 fn deterministic_message_id(&self) -> Option<HeaderValue> {
104 Some(HeaderValue::from(format!(
107 "archived-segment-{}",
108 self.archived_segment_header.segment_index
109 )))
110 }
111}
112
113#[derive(Debug, Clone, Encode, Decode)]
115struct ClusterControllerSolutionNotification {
116 solution_response: SolutionResponse,
117}
118
119impl GenericNotification for ClusterControllerSolutionNotification {
120 const SUBJECT: &'static str = "ab.controller.*.solution";
121}
122
123#[derive(Debug, Clone, Encode, Decode)]
125struct ClusterControllerBlockSealNotification {
126 block_seal: BlockSealResponse,
127}
128
129impl GenericNotification for ClusterControllerBlockSealNotification {
130 const SUBJECT: &'static str = "ab.controller.block-seal";
131}
132
133#[derive(Debug, Clone, Encode, Decode)]
135struct ClusterControllerFarmerAppInfoRequest;
136
137impl GenericRequest for ClusterControllerFarmerAppInfoRequest {
138 const SUBJECT: &'static str = "ab.controller.farmer-app-info";
139 type Response = Result<FarmerAppInfo, String>;
140}
141
142#[derive(Debug, Clone, Encode, Decode)]
144struct ClusterControllerSegmentHeadersRequest {
145 segment_indices: Vec<SegmentIndex>,
146}
147
148impl GenericRequest for ClusterControllerSegmentHeadersRequest {
149 const SUBJECT: &'static str = "ab.controller.segment-headers";
150 type Response = Vec<Option<SegmentHeader>>;
151}
152
153#[derive(Debug, Clone, Encode, Decode)]
155struct ClusterControllerFindPieceInCacheRequest {
156 piece_index: PieceIndex,
157}
158
159impl GenericRequest for ClusterControllerFindPieceInCacheRequest {
160 const SUBJECT: &'static str = "ab.controller.*.find-piece-in-cache";
161 type Response = Option<(PieceCacheId, PieceCacheOffset)>;
162}
163
164#[derive(Debug, Clone, Encode, Decode)]
166struct ClusterControllerFindPiecesInCacheRequest {
167 piece_indices: Vec<PieceIndex>,
168}
169
170impl GenericStreamRequest for ClusterControllerFindPiecesInCacheRequest {
171 const SUBJECT: &'static str = "ab.controller.*.find-pieces-in-cache";
172 type Response = (PieceIndex, PieceCacheId, PieceCacheOffset);
174}
175
176#[derive(Debug, Clone, Encode, Decode)]
178struct ClusterControllerPieceRequest {
179 piece_index: PieceIndex,
180}
181
182impl GenericRequest for ClusterControllerPieceRequest {
183 const SUBJECT: &'static str = "ab.controller.piece";
184 type Response = Option<Piece>;
185}
186
187#[derive(Debug, Clone, Encode, Decode)]
189struct ClusterControllerPiecesRequest {
190 piece_indices: Vec<PieceIndex>,
191}
192
193impl GenericStreamRequest for ClusterControllerPiecesRequest {
194 const SUBJECT: &'static str = "ab.controller.pieces";
195 type Response = (PieceIndex, Piece);
197}
198
199#[derive(Debug, Clone)]
201pub struct ClusterPieceGetter {
202 nats_client: NatsClient,
203 cache_group: String,
204}
205
206#[async_trait]
207impl PieceGetter for ClusterPieceGetter {
208 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
209 if let Some((piece_cache_id, piece_cache_offset)) = self
210 .nats_client
211 .request(
212 &ClusterControllerFindPieceInCacheRequest { piece_index },
213 Some(&self.cache_group),
214 )
215 .await?
216 {
217 trace!(
218 %piece_index,
219 %piece_cache_id,
220 %piece_cache_offset,
221 "Found piece in cache, retrieving"
222 );
223
224 match self
225 .nats_client
226 .request(
227 &ClusterCacheReadPieceRequest {
228 offset: piece_cache_offset,
229 },
230 Some(&piece_cache_id.to_string()),
231 )
232 .await
233 .map_err(|error| error.to_string())
234 .flatten()
235 {
236 Ok(Some((retrieved_piece_index, piece))) => {
237 if retrieved_piece_index == piece_index {
238 trace!(
239 %piece_index,
240 %piece_cache_id,
241 %piece_cache_offset,
242 "Retrieved piece from cache successfully"
243 );
244
245 return Ok(Some(piece));
246 } else {
247 trace!(
248 %piece_index,
249 %piece_cache_id,
250 %piece_cache_offset,
251 "Retrieving piece was replaced in cache during retrieval"
252 );
253 }
254 }
255 Ok(None) => {
256 trace!(
257 %piece_index,
258 %piece_cache_id,
259 %piece_cache_offset,
260 "Piece cache didn't have piece at offset"
261 );
262 }
263 Err(error) => {
264 debug!(
265 %piece_index,
266 %piece_cache_id,
267 %piece_cache_offset,
268 %error,
269 "Retrieving piece from cache failed"
270 );
271 }
272 }
273 } else {
274 trace!(%piece_index, "Piece not found in cache");
275 }
276
277 Ok(self
278 .nats_client
279 .request(&ClusterControllerPieceRequest { piece_index }, None)
280 .await?)
281 }
282
283 async fn get_pieces<'a>(
284 &'a self,
285 piece_indices: Vec<PieceIndex>,
286 ) -> anyhow::Result<
287 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
288 > {
289 let (tx, mut rx) = mpsc::unbounded();
290
291 let piece_indices_to_get =
292 Mutex::new(piece_indices.iter().copied().collect::<HashSet<_>>());
293
294 let mut cached_pieces_by_cache_id = HashMap::<PieceCacheId, Vec<PieceCacheOffset>>::new();
295
296 {
297 let mut cached_pieces = self
298 .nats_client
299 .stream_request(
300 &ClusterControllerFindPiecesInCacheRequest { piece_indices },
301 Some(&self.cache_group),
302 )
303 .await?;
304
305 while let Some((_piece_index, piece_cache_id, piece_cache_offset)) =
306 cached_pieces.next().await
307 {
308 cached_pieces_by_cache_id
309 .entry(piece_cache_id)
310 .or_default()
311 .push(piece_cache_offset);
312 }
313 }
314
315 let fut = async move {
316 let tx = &tx;
317
318 cached_pieces_by_cache_id
319 .into_iter()
320 .map(|(piece_cache_id, offsets)| {
321 let piece_indices_to_get = &piece_indices_to_get;
322
323 async move {
324 let mut pieces_stream = match self
325 .nats_client
326 .stream_request(
327 &ClusterCacheReadPiecesRequest { offsets },
328 Some(&piece_cache_id.to_string()),
329 )
330 .await
331 {
332 Ok(pieces) => pieces,
333 Err(error) => {
334 warn!(
335 %error,
336 %piece_cache_id,
337 "Failed to request pieces from cache"
338 );
339
340 return;
341 }
342 };
343
344 while let Some(piece_result) = pieces_stream.next().await {
345 let (piece_offset, maybe_piece) = match piece_result {
346 Ok(result) => result,
347 Err(error) => {
348 warn!(%error, "Failed to get piece from cache");
349 continue;
350 }
351 };
352
353 if let Some((piece_index, piece)) = maybe_piece {
354 piece_indices_to_get.lock().remove(&piece_index);
355
356 tx.unbounded_send((piece_index, Ok(Some(piece)))).expect(
357 "This future isn't polled after receiver is dropped; qed",
358 );
359 } else {
360 warn!(
361 %piece_cache_id,
362 %piece_offset,
363 "Failed to get piece from cache, it was missing or already gone"
364 );
365 }
366 }
367 }
368 })
369 .collect::<FuturesUnordered<_>>()
370 .for_each(|()| async {})
372 .await;
373
374 let mut piece_indices_to_get = piece_indices_to_get.into_inner();
375 if piece_indices_to_get.is_empty() {
376 return;
377 }
378
379 let mut pieces_from_controller = match self
380 .nats_client
381 .stream_request(
382 &ClusterControllerPiecesRequest {
383 piece_indices: piece_indices_to_get.iter().copied().collect(),
384 },
385 None,
386 )
387 .await
388 {
389 Ok(pieces_from_controller) => pieces_from_controller,
390 Err(error) => {
391 error!(%error, "Failed to get pieces from controller");
392
393 for piece_index in piece_indices_to_get {
394 tx.unbounded_send((
395 piece_index,
396 Err(anyhow::anyhow!("Failed to get piece from controller")),
397 ))
398 .expect("This future isn't polled after receiver is dropped; qed");
399 }
400 return;
401 }
402 };
403
404 while let Some((piece_index, piece)) = pieces_from_controller.next().await {
405 piece_indices_to_get.remove(&piece_index);
406 tx.unbounded_send((piece_index, Ok(Some(piece))))
407 .expect("This future isn't polled after receiver is dropped; qed");
408 }
409
410 for piece_index in piece_indices_to_get {
411 tx.unbounded_send((piece_index, Err(anyhow::anyhow!("Failed to get piece"))))
412 .expect("This future isn't polled after receiver is dropped; qed");
413 }
414 };
415 let mut fut = Box::pin(fut.fuse());
416
417 Ok(Box::new(stream::poll_fn(move |cx| {
419 if !fut.is_terminated() {
420 let _ = fut.poll_unpin(cx);
422 }
423
424 if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) {
425 return Poll::Ready(maybe_result);
426 }
427
428 Poll::Pending
430 })))
431 }
432}
433
434impl ClusterPieceGetter {
435 #[inline]
437 pub fn new(nats_client: NatsClient, cache_group: Option<String>) -> Self {
438 Self {
439 nats_client,
440 cache_group: cache_group.unwrap_or_else(|| GLOBAL_CACHE_GROUP.to_string()),
441 }
442 }
443}
444
445#[derive(Debug, Clone)]
448pub struct ClusterNodeClient {
449 nats_client: NatsClient,
450 last_slot_info_instance: Arc<Mutex<String>>,
453}
454
455impl ClusterNodeClient {
456 pub fn new(nats_client: NatsClient) -> Self {
458 Self {
459 nats_client,
460 last_slot_info_instance: Arc::default(),
461 }
462 }
463}
464
465#[async_trait]
466impl NodeClient for ClusterNodeClient {
467 async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
468 Ok(self
469 .nats_client
470 .request(&ClusterControllerFarmerAppInfoRequest, None)
471 .await?
472 .map_err(anyhow::Error::msg)?)
473 }
474
475 async fn subscribe_slot_info(
476 &self,
477 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
478 let subscription = self
479 .nats_client
480 .subscribe_to_broadcasts::<ClusterControllerSlotInfoBroadcast>(None, None)
481 .await?
482 .filter_map({
483 let mut last_slot_number = None;
484 let last_slot_info_instance = Arc::clone(&self.last_slot_info_instance);
485
486 move |broadcast| {
487 let slot_info = broadcast.slot_info;
488
489 let maybe_slot_info = if let Some(last_slot_number) = last_slot_number
490 && last_slot_number >= slot_info.slot
491 {
492 None
493 } else {
494 last_slot_number.replace(slot_info.slot);
495 *last_slot_info_instance.lock() = broadcast.instance;
496
497 Some(slot_info)
498 };
499
500 async move { maybe_slot_info }
501 }
502 });
503
504 Ok(Box::pin(subscription))
505 }
506
507 async fn submit_solution_response(
508 &self,
509 solution_response: SolutionResponse,
510 ) -> anyhow::Result<()> {
511 let last_slot_info_instance = self.last_slot_info_instance.lock().clone();
512 Ok(self
513 .nats_client
514 .notification(
515 &ClusterControllerSolutionNotification { solution_response },
516 Some(&last_slot_info_instance),
517 )
518 .await?)
519 }
520
521 async fn subscribe_block_sealing(
522 &self,
523 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = BlockSealInfo> + Send + 'static>>> {
524 let subscription = self
525 .nats_client
526 .subscribe_to_broadcasts::<ClusterControllerBlockSealingBroadcast>(None, None)
527 .await?
528 .map(|broadcast| broadcast.block_sealing_info);
529
530 Ok(Box::pin(subscription))
531 }
532
533 async fn submit_block_seal(&self, block_seal: BlockSealResponse) -> anyhow::Result<()> {
534 Ok(self
535 .nats_client
536 .notification(&ClusterControllerBlockSealNotification { block_seal }, None)
537 .await?)
538 }
539
540 async fn subscribe_archived_segment_headers(
541 &self,
542 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> {
543 let subscription = self
544 .nats_client
545 .subscribe_to_broadcasts::<ClusterControllerArchivedSegmentHeaderBroadcast>(None, None)
546 .await?
547 .filter_map({
548 let mut last_archived_segment_index = None;
549
550 move |broadcast| {
551 let archived_segment_header = broadcast.archived_segment_header;
552 let segment_index = archived_segment_header.segment_index();
553
554 let maybe_archived_segment_header = if let Some(last_archived_segment_index) =
555 last_archived_segment_index
556 && last_archived_segment_index >= segment_index
557 {
558 None
559 } else {
560 last_archived_segment_index.replace(segment_index);
561
562 Some(archived_segment_header)
563 };
564
565 async move { maybe_archived_segment_header }
566 }
567 });
568
569 Ok(Box::pin(subscription))
570 }
571
572 async fn segment_headers(
573 &self,
574 segment_indices: Vec<SegmentIndex>,
575 ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
576 Ok(self
577 .nats_client
578 .request(
579 &ClusterControllerSegmentHeadersRequest { segment_indices },
580 None,
581 )
582 .await?)
583 }
584
585 async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
586 Ok(self
587 .nats_client
588 .request(&ClusterControllerPieceRequest { piece_index }, None)
589 .await?)
590 }
591
592 async fn acknowledge_archived_segment_header(
593 &self,
594 _segment_index: SegmentIndex,
595 ) -> anyhow::Result<()> {
596 Ok(())
598 }
599
600 async fn update_shard_membership_info(
601 &self,
602 _info: FarmerShardMembershipInfo,
603 ) -> anyhow::Result<()> {
604 Ok(())
606 }
607}
608
609pub async fn controller_service<NC, PG>(
615 nats_client: &NatsClient,
616 node_client: &NC,
617 piece_getter: &PG,
618 farmer_caches: &[(&str, &FarmerCache)],
619 instance: &str,
620 primary_instance: bool,
621) -> anyhow::Result<()>
622where
623 NC: NodeClient,
624 PG: PieceGetter + Sync,
625{
626 if primary_instance {
627 select! {
628 result = slot_info_broadcaster(nats_client, node_client, instance).fuse() => {
629 result
630 },
631 result = block_sealing_broadcaster(nats_client, node_client, instance).fuse() => {
632 result
633 },
634 result = archived_segment_headers_broadcaster(nats_client, node_client, instance).fuse() => {
635 result
636 },
637 result = solution_response_forwarder(nats_client, node_client, instance).fuse() => {
638 result
639 },
640 result = block_seal_forwarder(nats_client, node_client, instance).fuse() => {
641 result
642 },
643 result = farmer_app_info_responder(nats_client, node_client).fuse() => {
644 result
645 },
646 result = segment_headers_responder(nats_client, node_client).fuse() => {
647 result
648 },
649 result = find_piece_responder(nats_client, farmer_caches).fuse() => {
650 result
651 },
652 result = find_pieces_responder(nats_client, farmer_caches).fuse() => {
653 result
654 },
655 result = piece_responder(nats_client, piece_getter).fuse() => {
656 result
657 },
658 result = pieces_responder(nats_client, piece_getter).fuse() => {
659 result
660 },
661 }
662 } else {
663 select! {
664 result = farmer_app_info_responder(nats_client, node_client).fuse() => {
665 result
666 },
667 result = segment_headers_responder(nats_client, node_client).fuse() => {
668 result
669 },
670 result = find_piece_responder(nats_client, farmer_caches).fuse() => {
671 result
672 },
673 result = find_pieces_responder(nats_client, farmer_caches).fuse() => {
674 result
675 },
676 result = piece_responder(nats_client, piece_getter).fuse() => {
677 result
678 },
679 result = pieces_responder(nats_client, piece_getter).fuse() => {
680 result
681 },
682 }
683 }
684}
685
686async fn slot_info_broadcaster<NC>(
687 nats_client: &NatsClient,
688 node_client: &NC,
689 instance: &str,
690) -> anyhow::Result<()>
691where
692 NC: NodeClient,
693{
694 let mut slot_info_notifications = node_client
695 .subscribe_slot_info()
696 .await
697 .map_err(|error| anyhow!("Failed to subscribe to slot info notifications: {error}"))?;
698
699 while let Some(slot_info) = slot_info_notifications.next().await {
700 debug!(?slot_info, "New slot");
701
702 let slot = slot_info.slot;
703
704 if let Err(error) = nats_client
705 .broadcast(
706 &ClusterControllerSlotInfoBroadcast {
707 slot_info,
708 instance: instance.to_string(),
709 },
710 instance,
711 )
712 .await
713 {
714 warn!(%slot, %error, "Failed to broadcast slot info");
715 }
716 }
717
718 Ok(())
719}
720
721async fn block_sealing_broadcaster<NC>(
722 nats_client: &NatsClient,
723 node_client: &NC,
724 instance: &str,
725) -> anyhow::Result<()>
726where
727 NC: NodeClient,
728{
729 let mut block_sealing_subscription = node_client
730 .subscribe_block_sealing()
731 .await
732 .map_err(|error| anyhow!("Failed to subscribe to block sealing notifications: {error}"))?;
733
734 while let Some(block_sealing_info) = block_sealing_subscription.next().await {
735 trace!(?block_sealing_info, "New block sealing notification");
736
737 if let Err(error) = nats_client
738 .broadcast(
739 &ClusterControllerBlockSealingBroadcast { block_sealing_info },
740 instance,
741 )
742 .await
743 {
744 warn!(%error, "Failed to broadcast block sealing info");
745 }
746 }
747
748 Ok(())
749}
750
751async fn archived_segment_headers_broadcaster<NC>(
752 nats_client: &NatsClient,
753 node_client: &NC,
754 instance: &str,
755) -> anyhow::Result<()>
756where
757 NC: NodeClient,
758{
759 let mut archived_segments_notifications = node_client
760 .subscribe_archived_segment_headers()
761 .await
762 .map_err(|error| {
763 anyhow!("Failed to subscribe to archived segment header notifications: {error}")
764 })?;
765
766 while let Some(archived_segment_header) = archived_segments_notifications.next().await {
767 trace!(
768 ?archived_segment_header,
769 "New archived archived segment header notification"
770 );
771
772 node_client
773 .acknowledge_archived_segment_header(archived_segment_header.segment_index())
774 .await
775 .map_err(|error| anyhow!("Failed to acknowledge archived segment header: {error}"))?;
776
777 if let Err(error) = nats_client
778 .broadcast(
779 &ClusterControllerArchivedSegmentHeaderBroadcast {
780 archived_segment_header,
781 },
782 instance,
783 )
784 .await
785 {
786 warn!(%error, "Failed to broadcast archived segment header info");
787 }
788 }
789
790 Ok(())
791}
792
793async fn solution_response_forwarder<NC>(
794 nats_client: &NatsClient,
795 node_client: &NC,
796 instance: &str,
797) -> anyhow::Result<()>
798where
799 NC: NodeClient,
800{
801 let mut subscription = nats_client
802 .subscribe_to_notifications::<ClusterControllerSolutionNotification>(
803 Some(instance),
804 Some(instance.to_string()),
805 )
806 .await
807 .map_err(|error| anyhow!("Failed to subscribe to solution notifications: {error}"))?;
808
809 while let Some(notification) = subscription.next().await {
810 debug!(?notification, "Solution notification");
811
812 let slot = notification.solution_response.slot_number;
813 let public_key_hash = notification.solution_response.solution.public_key_hash;
814 let sector_index = notification.solution_response.solution.sector_index;
815
816 if let Err(error) = node_client
817 .submit_solution_response(notification.solution_response)
818 .await
819 {
820 warn!(
821 %error,
822 %slot,
823 %public_key_hash,
824 %sector_index,
825 "Failed to send solution response"
826 );
827 }
828 }
829
830 Ok(())
831}
832
833async fn block_seal_forwarder<NC>(
834 nats_client: &NatsClient,
835 node_client: &NC,
836 instance: &str,
837) -> anyhow::Result<()>
838where
839 NC: NodeClient,
840{
841 let mut subscription = nats_client
842 .subscribe_to_notifications::<ClusterControllerBlockSealNotification>(
843 None,
844 Some(instance.to_string()),
845 )
846 .await
847 .map_err(|error| anyhow!("Failed to subscribe to block seal notifications: {error}"))?;
848
849 while let Some(notification) = subscription.next().await {
850 debug!(?notification, "Block seal notification");
851
852 if let Err(error) = node_client.submit_block_seal(notification.block_seal).await {
853 warn!(%error, "Failed to send block seal");
854 }
855 }
856
857 Ok(())
858}
859
860async fn farmer_app_info_responder<NC>(
861 nats_client: &NatsClient,
862 node_client: &NC,
863) -> anyhow::Result<()>
864where
865 NC: NodeClient,
866{
867 nats_client
868 .request_responder(
869 None,
870 Some("ab.controller".to_string()),
871 |_: ClusterControllerFarmerAppInfoRequest| async move {
872 Some(
873 node_client
874 .farmer_app_info()
875 .await
876 .map_err(|error| error.to_string()),
877 )
878 },
879 )
880 .await
881}
882
883async fn segment_headers_responder<NC>(
884 nats_client: &NatsClient,
885 node_client: &NC,
886) -> anyhow::Result<()>
887where
888 NC: NodeClient,
889{
890 nats_client
891 .request_responder(
892 None,
893 Some("ab.controller".to_string()),
894 |ClusterControllerSegmentHeadersRequest { segment_indices }| async move {
895 node_client
896 .segment_headers(segment_indices.clone())
897 .await
898 .inspect_err(|error| {
899 warn!(%error, ?segment_indices, "Failed to get segment headers");
900 })
901 .ok()
902 },
903 )
904 .await
905}
906
907async fn find_piece_responder(
908 nats_client: &NatsClient,
909 farmer_caches: &[(&str, &FarmerCache)],
910) -> anyhow::Result<()> {
911 futures::future::try_join(
912 farmer_caches
913 .iter()
914 .map(|(cache_group, farmer_cache)| {
915 nats_client.request_responder(
916 Some(cache_group),
917 Some("ab.controller".to_string()),
918 move |ClusterControllerFindPieceInCacheRequest { piece_index }| async move {
919 Some(farmer_cache.find_piece(piece_index).await)
920 },
921 )
922 })
923 .collect::<FuturesUnordered<_>>()
924 .next()
925 .map(|result| result.unwrap_or(Ok(()))),
926 nats_client.request_responder(
927 Some(GLOBAL_CACHE_GROUP),
928 Some("ab.controller".to_string()),
929 |ClusterControllerFindPieceInCacheRequest { piece_index }| async move {
930 let (_cache_group, farmer_cache) = farmer_caches.iter().choose(&mut rand::rng())?;
931 Some(farmer_cache.find_piece(piece_index).await)
932 },
933 ),
934 )
935 .await
936 .map(|((), ())| ())
937}
938
939async fn find_pieces_responder(
940 nats_client: &NatsClient,
941 farmer_caches: &[(&str, &FarmerCache)],
942) -> anyhow::Result<()> {
943 futures::future::try_join(
944 farmer_caches
945 .iter()
946 .map(|(cache_group, farmer_cache)| {
947 nats_client.stream_request_responder(
948 Some(cache_group),
949 Some("ab.controller".to_string()),
950 move |ClusterControllerFindPiecesInCacheRequest { piece_indices }| async move {
951 Some(stream::iter(farmer_cache.find_pieces(piece_indices).await))
952 },
953 )
954 })
955 .collect::<FuturesUnordered<_>>()
956 .next()
957 .map(|result| result.unwrap_or(Ok(()))),
958 nats_client.stream_request_responder(
959 Some(GLOBAL_CACHE_GROUP),
960 Some("ab.controller".to_string()),
961 |ClusterControllerFindPiecesInCacheRequest { piece_indices }| async move {
962 let (_cache_group, farmer_cache) = farmer_caches.iter().choose(&mut rand::rng())?;
963 Some(stream::iter(farmer_cache.find_pieces(piece_indices).await))
964 },
965 ),
966 )
967 .await
968 .map(|((), ())| ())
969}
970
971async fn piece_responder<PG>(nats_client: &NatsClient, piece_getter: &PG) -> anyhow::Result<()>
972where
973 PG: PieceGetter + Sync,
974{
975 nats_client
976 .request_responder(
977 None,
978 Some("ab.controller".to_string()),
979 |ClusterControllerPieceRequest { piece_index }| async move {
980 piece_getter
981 .get_piece(piece_index)
982 .await
983 .inspect_err(|error| warn!(%error, %piece_index, "Failed to get piece"))
984 .ok()
985 },
986 )
987 .await
988}
989
990async fn pieces_responder<PG>(nats_client: &NatsClient, piece_getter: &PG) -> anyhow::Result<()>
991where
992 PG: PieceGetter + Sync,
993{
994 nats_client
995 .stream_request_responder(
996 None,
997 Some("ab.controller".to_string()),
998 |ClusterControllerPiecesRequest { piece_indices }| async move {
999 piece_getter
1000 .get_pieces(piece_indices)
1001 .await
1002 .map(|stream| {
1003 Box::pin(stream.filter_map(
1004 |(piece_index, maybe_piece_result)| async move {
1005 match maybe_piece_result {
1006 Ok(Some(piece)) => Some((piece_index, piece)),
1007 Ok(None) => None,
1008 Err(error) => {
1009 warn!(%error, %piece_index, "Failed to get piece");
1010 None
1011 }
1012 }
1013 },
1014 ))
1015 })
1016 .inspect_err(|error| warn!(%error, "Failed to get pieces"))
1017 .ok()
1018 },
1019 )
1020 .await
1021}