1pub mod caches;
10pub mod farms;
11mod stream_map;
12
13use crate::cluster::cache::{ClusterCacheReadPieceRequest, ClusterCacheReadPiecesRequest};
14use crate::cluster::nats_client::{
15 GenericBroadcast, GenericNotification, GenericRequest, GenericStreamRequest, NatsClient,
16};
17use crate::farm::{PieceCacheId, PieceCacheOffset};
18use crate::farmer_cache::FarmerCache;
19use crate::node_client::NodeClient;
20use ab_core_primitives::pieces::{Piece, PieceIndex};
21use ab_core_primitives::segments::{SegmentHeader, SegmentIndex};
22use ab_data_retrieval::piece_getter::PieceGetter;
23use ab_farmer_rpc_primitives::{
24 BlockSealInfo, BlockSealResponse, FarmerAppInfo, SlotInfo, SolutionResponse,
25};
26use anyhow::anyhow;
27use async_nats::HeaderValue;
28use async_trait::async_trait;
29use futures::channel::mpsc;
30use futures::future::FusedFuture;
31use futures::stream::FuturesUnordered;
32use futures::{FutureExt, Stream, StreamExt, select, stream};
33use parity_scale_codec::{Decode, Encode};
34use parking_lot::Mutex;
35use rand::prelude::*;
36use std::collections::{HashMap, HashSet};
37use std::pin::Pin;
38use std::sync::Arc;
39use std::task::Poll;
40use tracing::{debug, error, trace, warn};
41
42const GLOBAL_CACHE_GROUP: &str = "_";
45
46#[derive(Debug, Copy, Clone, Encode, Decode)]
48pub struct ClusterControllerFarmerIdentifyBroadcast;
49
50impl GenericBroadcast for ClusterControllerFarmerIdentifyBroadcast {
51 const SUBJECT: &'static str = "ab.controller.farmer-identify";
52}
53
54#[derive(Debug, Copy, Clone, Encode, Decode)]
56pub struct ClusterControllerCacheIdentifyBroadcast;
57
58impl GenericBroadcast for ClusterControllerCacheIdentifyBroadcast {
59 const SUBJECT: &'static str = "ab.controller.*.cache-identify";
61}
62
63#[derive(Debug, Clone, Encode, Decode)]
65struct ClusterControllerSlotInfoBroadcast {
66 slot_info: SlotInfo,
67 instance: String,
68}
69
70impl GenericBroadcast for ClusterControllerSlotInfoBroadcast {
71 const SUBJECT: &'static str = "ab.controller.slot-info";
72
73 fn deterministic_message_id(&self) -> Option<HeaderValue> {
74 Some(HeaderValue::from(format!(
77 "slot-info-{}",
78 self.slot_info.slot_number
79 )))
80 }
81}
82
83#[derive(Debug, Clone, Encode, Decode)]
85struct ClusterControllerBlockSealingBroadcast {
86 block_sealing_info: BlockSealInfo,
87}
88
89impl GenericBroadcast for ClusterControllerBlockSealingBroadcast {
90 const SUBJECT: &'static str = "ab.controller.block-sealing-info";
91}
92
93#[derive(Debug, Clone, Encode, Decode)]
95struct ClusterControllerArchivedSegmentHeaderBroadcast {
96 archived_segment_header: SegmentHeader,
97}
98
99impl GenericBroadcast for ClusterControllerArchivedSegmentHeaderBroadcast {
100 const SUBJECT: &'static str = "ab.controller.archived-segment-header";
101
102 fn deterministic_message_id(&self) -> Option<HeaderValue> {
103 Some(HeaderValue::from(format!(
106 "archived-segment-{}",
107 self.archived_segment_header.segment_index
108 )))
109 }
110}
111
112#[derive(Debug, Clone, Encode, Decode)]
114struct ClusterControllerSolutionNotification {
115 solution_response: SolutionResponse,
116}
117
118impl GenericNotification for ClusterControllerSolutionNotification {
119 const SUBJECT: &'static str = "ab.controller.*.solution";
120}
121
122#[derive(Debug, Clone, Encode, Decode)]
124struct ClusterControllerBlockSealNotification {
125 block_seal: BlockSealResponse,
126}
127
128impl GenericNotification for ClusterControllerBlockSealNotification {
129 const SUBJECT: &'static str = "ab.controller.block-seal";
130}
131
132#[derive(Debug, Clone, Encode, Decode)]
134struct ClusterControllerFarmerAppInfoRequest;
135
136impl GenericRequest for ClusterControllerFarmerAppInfoRequest {
137 const SUBJECT: &'static str = "ab.controller.farmer-app-info";
138 type Response = Result<FarmerAppInfo, String>;
139}
140
141#[derive(Debug, Clone, Encode, Decode)]
143struct ClusterControllerSegmentHeadersRequest {
144 segment_indices: Vec<SegmentIndex>,
145}
146
147impl GenericRequest for ClusterControllerSegmentHeadersRequest {
148 const SUBJECT: &'static str = "ab.controller.segment-headers";
149 type Response = Vec<Option<SegmentHeader>>;
150}
151
152#[derive(Debug, Clone, Encode, Decode)]
154struct ClusterControllerFindPieceInCacheRequest {
155 piece_index: PieceIndex,
156}
157
158impl GenericRequest for ClusterControllerFindPieceInCacheRequest {
159 const SUBJECT: &'static str = "ab.controller.*.find-piece-in-cache";
160 type Response = Option<(PieceCacheId, PieceCacheOffset)>;
161}
162
163#[derive(Debug, Clone, Encode, Decode)]
165struct ClusterControllerFindPiecesInCacheRequest {
166 piece_indices: Vec<PieceIndex>,
167}
168
169impl GenericStreamRequest for ClusterControllerFindPiecesInCacheRequest {
170 const SUBJECT: &'static str = "ab.controller.*.find-pieces-in-cache";
171 type Response = (PieceIndex, PieceCacheId, PieceCacheOffset);
173}
174
175#[derive(Debug, Clone, Encode, Decode)]
177struct ClusterControllerPieceRequest {
178 piece_index: PieceIndex,
179}
180
181impl GenericRequest for ClusterControllerPieceRequest {
182 const SUBJECT: &'static str = "ab.controller.piece";
183 type Response = Option<Piece>;
184}
185
186#[derive(Debug, Clone, Encode, Decode)]
188struct ClusterControllerPiecesRequest {
189 piece_indices: Vec<PieceIndex>,
190}
191
192impl GenericStreamRequest for ClusterControllerPiecesRequest {
193 const SUBJECT: &'static str = "ab.controller.pieces";
194 type Response = (PieceIndex, Piece);
196}
197
198#[derive(Debug, Clone)]
200pub struct ClusterPieceGetter {
201 nats_client: NatsClient,
202 cache_group: String,
203}
204
205#[async_trait]
206impl PieceGetter for ClusterPieceGetter {
207 async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
208 if let Some((piece_cache_id, piece_cache_offset)) = self
209 .nats_client
210 .request(
211 &ClusterControllerFindPieceInCacheRequest { piece_index },
212 Some(&self.cache_group),
213 )
214 .await?
215 {
216 trace!(
217 %piece_index,
218 %piece_cache_id,
219 %piece_cache_offset,
220 "Found piece in cache, retrieving"
221 );
222
223 match self
224 .nats_client
225 .request(
226 &ClusterCacheReadPieceRequest {
227 offset: piece_cache_offset,
228 },
229 Some(&piece_cache_id.to_string()),
230 )
231 .await
232 .map_err(|error| error.to_string())
233 .flatten()
234 {
235 Ok(Some((retrieved_piece_index, piece))) => {
236 if retrieved_piece_index == piece_index {
237 trace!(
238 %piece_index,
239 %piece_cache_id,
240 %piece_cache_offset,
241 "Retrieved piece from cache successfully"
242 );
243
244 return Ok(Some(piece));
245 } else {
246 trace!(
247 %piece_index,
248 %piece_cache_id,
249 %piece_cache_offset,
250 "Retrieving piece was replaced in cache during retrieval"
251 );
252 }
253 }
254 Ok(None) => {
255 trace!(
256 %piece_index,
257 %piece_cache_id,
258 %piece_cache_offset,
259 "Piece cache didn't have piece at offset"
260 );
261 }
262 Err(error) => {
263 debug!(
264 %piece_index,
265 %piece_cache_id,
266 %piece_cache_offset,
267 %error,
268 "Retrieving piece from cache failed"
269 );
270 }
271 }
272 } else {
273 trace!(%piece_index, "Piece not found in cache");
274 }
275
276 Ok(self
277 .nats_client
278 .request(&ClusterControllerPieceRequest { piece_index }, None)
279 .await?)
280 }
281
282 async fn get_pieces<'a>(
283 &'a self,
284 piece_indices: Vec<PieceIndex>,
285 ) -> anyhow::Result<
286 Box<dyn Stream<Item = (PieceIndex, anyhow::Result<Option<Piece>>)> + Send + Unpin + 'a>,
287 > {
288 let (tx, mut rx) = mpsc::unbounded();
289
290 let piece_indices_to_get =
291 Mutex::new(piece_indices.iter().copied().collect::<HashSet<_>>());
292
293 let mut cached_pieces_by_cache_id = HashMap::<PieceCacheId, Vec<PieceCacheOffset>>::new();
294
295 {
296 let mut cached_pieces = self
297 .nats_client
298 .stream_request(
299 &ClusterControllerFindPiecesInCacheRequest { piece_indices },
300 Some(&self.cache_group),
301 )
302 .await?;
303
304 while let Some((_piece_index, piece_cache_id, piece_cache_offset)) =
305 cached_pieces.next().await
306 {
307 cached_pieces_by_cache_id
308 .entry(piece_cache_id)
309 .or_default()
310 .push(piece_cache_offset);
311 }
312 }
313
314 let fut = async move {
315 let tx = &tx;
316
317 cached_pieces_by_cache_id
318 .into_iter()
319 .map(|(piece_cache_id, offsets)| {
320 let piece_indices_to_get = &piece_indices_to_get;
321
322 async move {
323 let mut pieces_stream = match self
324 .nats_client
325 .stream_request(
326 &ClusterCacheReadPiecesRequest { offsets },
327 Some(&piece_cache_id.to_string()),
328 )
329 .await
330 {
331 Ok(pieces) => pieces,
332 Err(error) => {
333 warn!(
334 %error,
335 %piece_cache_id,
336 "Failed to request pieces from cache"
337 );
338
339 return;
340 }
341 };
342
343 while let Some(piece_result) = pieces_stream.next().await {
344 let (piece_offset, maybe_piece) = match piece_result {
345 Ok(result) => result,
346 Err(error) => {
347 warn!(%error, "Failed to get piece from cache");
348 continue;
349 }
350 };
351
352 if let Some((piece_index, piece)) = maybe_piece {
353 piece_indices_to_get.lock().remove(&piece_index);
354
355 tx.unbounded_send((piece_index, Ok(Some(piece)))).expect(
356 "This future isn't polled after receiver is dropped; qed",
357 );
358 } else {
359 warn!(
360 %piece_cache_id,
361 %piece_offset,
362 "Failed to get piece from cache, it was missing or already gone"
363 );
364 }
365 }
366 }
367 })
368 .collect::<FuturesUnordered<_>>()
369 .for_each(|()| async {})
371 .await;
372
373 let mut piece_indices_to_get = piece_indices_to_get.into_inner();
374 if piece_indices_to_get.is_empty() {
375 return;
376 }
377
378 let mut pieces_from_controller = match self
379 .nats_client
380 .stream_request(
381 &ClusterControllerPiecesRequest {
382 piece_indices: piece_indices_to_get.iter().copied().collect(),
383 },
384 None,
385 )
386 .await
387 {
388 Ok(pieces_from_controller) => pieces_from_controller,
389 Err(error) => {
390 error!(%error, "Failed to get pieces from controller");
391
392 for piece_index in piece_indices_to_get {
393 tx.unbounded_send((
394 piece_index,
395 Err(anyhow::anyhow!("Failed to get piece from controller")),
396 ))
397 .expect("This future isn't polled after receiver is dropped; qed");
398 }
399 return;
400 }
401 };
402
403 while let Some((piece_index, piece)) = pieces_from_controller.next().await {
404 piece_indices_to_get.remove(&piece_index);
405 tx.unbounded_send((piece_index, Ok(Some(piece))))
406 .expect("This future isn't polled after receiver is dropped; qed");
407 }
408
409 for piece_index in piece_indices_to_get {
410 tx.unbounded_send((piece_index, Err(anyhow::anyhow!("Failed to get piece"))))
411 .expect("This future isn't polled after receiver is dropped; qed");
412 }
413 };
414 let mut fut = Box::pin(fut.fuse());
415
416 Ok(Box::new(stream::poll_fn(move |cx| {
418 if !fut.is_terminated() {
419 let _ = fut.poll_unpin(cx);
421 }
422
423 if let Poll::Ready(maybe_result) = rx.poll_next_unpin(cx) {
424 return Poll::Ready(maybe_result);
425 }
426
427 Poll::Pending
429 })))
430 }
431}
432
433impl ClusterPieceGetter {
434 #[inline]
436 pub fn new(nats_client: NatsClient, cache_group: Option<String>) -> Self {
437 Self {
438 nats_client,
439 cache_group: cache_group.unwrap_or_else(|| GLOBAL_CACHE_GROUP.to_string()),
440 }
441 }
442}
443
444#[derive(Debug, Clone)]
447pub struct ClusterNodeClient {
448 nats_client: NatsClient,
449 last_slot_info_instance: Arc<Mutex<String>>,
452}
453
454impl ClusterNodeClient {
455 pub fn new(nats_client: NatsClient) -> Self {
457 Self {
458 nats_client,
459 last_slot_info_instance: Arc::default(),
460 }
461 }
462}
463
464#[async_trait]
465impl NodeClient for ClusterNodeClient {
466 async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
467 Ok(self
468 .nats_client
469 .request(&ClusterControllerFarmerAppInfoRequest, None)
470 .await?
471 .map_err(anyhow::Error::msg)?)
472 }
473
474 async fn subscribe_slot_info(
475 &self,
476 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
477 let subscription = self
478 .nats_client
479 .subscribe_to_broadcasts::<ClusterControllerSlotInfoBroadcast>(None, None)
480 .await?
481 .filter_map({
482 let mut last_slot_number = None;
483 let last_slot_info_instance = Arc::clone(&self.last_slot_info_instance);
484
485 move |broadcast| {
486 let slot_info = broadcast.slot_info;
487
488 let maybe_slot_info = if let Some(last_slot_number) = last_slot_number
489 && last_slot_number >= slot_info.slot_number
490 {
491 None
492 } else {
493 last_slot_number.replace(slot_info.slot_number);
494 *last_slot_info_instance.lock() = broadcast.instance;
495
496 Some(slot_info)
497 };
498
499 async move { maybe_slot_info }
500 }
501 });
502
503 Ok(Box::pin(subscription))
504 }
505
506 async fn submit_solution_response(
507 &self,
508 solution_response: SolutionResponse,
509 ) -> anyhow::Result<()> {
510 let last_slot_info_instance = self.last_slot_info_instance.lock().clone();
511 Ok(self
512 .nats_client
513 .notification(
514 &ClusterControllerSolutionNotification { solution_response },
515 Some(&last_slot_info_instance),
516 )
517 .await?)
518 }
519
520 async fn subscribe_block_sealing(
521 &self,
522 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = BlockSealInfo> + Send + 'static>>> {
523 let subscription = self
524 .nats_client
525 .subscribe_to_broadcasts::<ClusterControllerBlockSealingBroadcast>(None, None)
526 .await?
527 .map(|broadcast| broadcast.block_sealing_info);
528
529 Ok(Box::pin(subscription))
530 }
531
532 async fn submit_block_seal(&self, block_seal: BlockSealResponse) -> anyhow::Result<()> {
533 Ok(self
534 .nats_client
535 .notification(&ClusterControllerBlockSealNotification { block_seal }, None)
536 .await?)
537 }
538
539 async fn subscribe_archived_segment_headers(
540 &self,
541 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> {
542 let subscription = self
543 .nats_client
544 .subscribe_to_broadcasts::<ClusterControllerArchivedSegmentHeaderBroadcast>(None, None)
545 .await?
546 .filter_map({
547 let mut last_archived_segment_index = None;
548
549 move |broadcast| {
550 let archived_segment_header = broadcast.archived_segment_header;
551 let segment_index = archived_segment_header.segment_index();
552
553 let maybe_archived_segment_header = if let Some(last_archived_segment_index) =
554 last_archived_segment_index
555 && last_archived_segment_index >= segment_index
556 {
557 None
558 } else {
559 last_archived_segment_index.replace(segment_index);
560
561 Some(archived_segment_header)
562 };
563
564 async move { maybe_archived_segment_header }
565 }
566 });
567
568 Ok(Box::pin(subscription))
569 }
570
571 async fn segment_headers(
572 &self,
573 segment_indices: Vec<SegmentIndex>,
574 ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
575 Ok(self
576 .nats_client
577 .request(
578 &ClusterControllerSegmentHeadersRequest { segment_indices },
579 None,
580 )
581 .await?)
582 }
583
584 async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
585 Ok(self
586 .nats_client
587 .request(&ClusterControllerPieceRequest { piece_index }, None)
588 .await?)
589 }
590
591 async fn acknowledge_archived_segment_header(
592 &self,
593 _segment_index: SegmentIndex,
594 ) -> anyhow::Result<()> {
595 Ok(())
597 }
598}
599
600pub async fn controller_service<NC, PG>(
606 nats_client: &NatsClient,
607 node_client: &NC,
608 piece_getter: &PG,
609 farmer_caches: &[(&str, &FarmerCache)],
610 instance: &str,
611 primary_instance: bool,
612) -> anyhow::Result<()>
613where
614 NC: NodeClient,
615 PG: PieceGetter + Sync,
616{
617 if primary_instance {
618 select! {
619 result = slot_info_broadcaster(nats_client, node_client, instance).fuse() => {
620 result
621 },
622 result = block_sealing_broadcaster(nats_client, node_client, instance).fuse() => {
623 result
624 },
625 result = archived_segment_headers_broadcaster(nats_client, node_client, instance).fuse() => {
626 result
627 },
628 result = solution_response_forwarder(nats_client, node_client, instance).fuse() => {
629 result
630 },
631 result = block_seal_forwarder(nats_client, node_client, instance).fuse() => {
632 result
633 },
634 result = farmer_app_info_responder(nats_client, node_client).fuse() => {
635 result
636 },
637 result = segment_headers_responder(nats_client, node_client).fuse() => {
638 result
639 },
640 result = find_piece_responder(nats_client, farmer_caches).fuse() => {
641 result
642 },
643 result = find_pieces_responder(nats_client, farmer_caches).fuse() => {
644 result
645 },
646 result = piece_responder(nats_client, piece_getter).fuse() => {
647 result
648 },
649 result = pieces_responder(nats_client, piece_getter).fuse() => {
650 result
651 },
652 }
653 } else {
654 select! {
655 result = farmer_app_info_responder(nats_client, node_client).fuse() => {
656 result
657 },
658 result = segment_headers_responder(nats_client, node_client).fuse() => {
659 result
660 },
661 result = find_piece_responder(nats_client, farmer_caches).fuse() => {
662 result
663 },
664 result = find_pieces_responder(nats_client, farmer_caches).fuse() => {
665 result
666 },
667 result = piece_responder(nats_client, piece_getter).fuse() => {
668 result
669 },
670 result = pieces_responder(nats_client, piece_getter).fuse() => {
671 result
672 },
673 }
674 }
675}
676
677async fn slot_info_broadcaster<NC>(
678 nats_client: &NatsClient,
679 node_client: &NC,
680 instance: &str,
681) -> anyhow::Result<()>
682where
683 NC: NodeClient,
684{
685 let mut slot_info_notifications = node_client
686 .subscribe_slot_info()
687 .await
688 .map_err(|error| anyhow!("Failed to subscribe to slot info notifications: {error}"))?;
689
690 while let Some(slot_info) = slot_info_notifications.next().await {
691 debug!(?slot_info, "New slot");
692
693 let slot = slot_info.slot_number;
694
695 if let Err(error) = nats_client
696 .broadcast(
697 &ClusterControllerSlotInfoBroadcast {
698 slot_info,
699 instance: instance.to_string(),
700 },
701 instance,
702 )
703 .await
704 {
705 warn!(%slot, %error, "Failed to broadcast slot info");
706 }
707 }
708
709 Ok(())
710}
711
712async fn block_sealing_broadcaster<NC>(
713 nats_client: &NatsClient,
714 node_client: &NC,
715 instance: &str,
716) -> anyhow::Result<()>
717where
718 NC: NodeClient,
719{
720 let mut block_sealing_subscription = node_client
721 .subscribe_block_sealing()
722 .await
723 .map_err(|error| anyhow!("Failed to subscribe to block sealing notifications: {error}"))?;
724
725 while let Some(block_sealing_info) = block_sealing_subscription.next().await {
726 trace!(?block_sealing_info, "New block sealing notification");
727
728 if let Err(error) = nats_client
729 .broadcast(
730 &ClusterControllerBlockSealingBroadcast { block_sealing_info },
731 instance,
732 )
733 .await
734 {
735 warn!(%error, "Failed to broadcast block sealing info");
736 }
737 }
738
739 Ok(())
740}
741
742async fn archived_segment_headers_broadcaster<NC>(
743 nats_client: &NatsClient,
744 node_client: &NC,
745 instance: &str,
746) -> anyhow::Result<()>
747where
748 NC: NodeClient,
749{
750 let mut archived_segments_notifications = node_client
751 .subscribe_archived_segment_headers()
752 .await
753 .map_err(|error| {
754 anyhow!("Failed to subscribe to archived segment header notifications: {error}")
755 })?;
756
757 while let Some(archived_segment_header) = archived_segments_notifications.next().await {
758 trace!(
759 ?archived_segment_header,
760 "New archived archived segment header notification"
761 );
762
763 node_client
764 .acknowledge_archived_segment_header(archived_segment_header.segment_index())
765 .await
766 .map_err(|error| anyhow!("Failed to acknowledge archived segment header: {error}"))?;
767
768 if let Err(error) = nats_client
769 .broadcast(
770 &ClusterControllerArchivedSegmentHeaderBroadcast {
771 archived_segment_header,
772 },
773 instance,
774 )
775 .await
776 {
777 warn!(%error, "Failed to broadcast archived segment header info");
778 }
779 }
780
781 Ok(())
782}
783
784async fn solution_response_forwarder<NC>(
785 nats_client: &NatsClient,
786 node_client: &NC,
787 instance: &str,
788) -> anyhow::Result<()>
789where
790 NC: NodeClient,
791{
792 let mut subscription = nats_client
793 .subscribe_to_notifications::<ClusterControllerSolutionNotification>(
794 Some(instance),
795 Some(instance.to_string()),
796 )
797 .await
798 .map_err(|error| anyhow!("Failed to subscribe to solution notifications: {error}"))?;
799
800 while let Some(notification) = subscription.next().await {
801 debug!(?notification, "Solution notification");
802
803 let slot = notification.solution_response.slot_number;
804 let public_key_hash = notification.solution_response.solution.public_key_hash;
805 let sector_index = notification.solution_response.solution.sector_index;
806
807 if let Err(error) = node_client
808 .submit_solution_response(notification.solution_response)
809 .await
810 {
811 warn!(
812 %error,
813 %slot,
814 %public_key_hash,
815 %sector_index,
816 "Failed to send solution response"
817 );
818 }
819 }
820
821 Ok(())
822}
823
824async fn block_seal_forwarder<NC>(
825 nats_client: &NatsClient,
826 node_client: &NC,
827 instance: &str,
828) -> anyhow::Result<()>
829where
830 NC: NodeClient,
831{
832 let mut subscription = nats_client
833 .subscribe_to_notifications::<ClusterControllerBlockSealNotification>(
834 None,
835 Some(instance.to_string()),
836 )
837 .await
838 .map_err(|error| anyhow!("Failed to subscribe to block seal notifications: {error}"))?;
839
840 while let Some(notification) = subscription.next().await {
841 debug!(?notification, "Block seal notification");
842
843 if let Err(error) = node_client.submit_block_seal(notification.block_seal).await {
844 warn!(%error, "Failed to send block seal");
845 }
846 }
847
848 Ok(())
849}
850
851async fn farmer_app_info_responder<NC>(
852 nats_client: &NatsClient,
853 node_client: &NC,
854) -> anyhow::Result<()>
855where
856 NC: NodeClient,
857{
858 nats_client
859 .request_responder(
860 None,
861 Some("ab.controller".to_string()),
862 |_: ClusterControllerFarmerAppInfoRequest| async move {
863 Some(
864 node_client
865 .farmer_app_info()
866 .await
867 .map_err(|error| error.to_string()),
868 )
869 },
870 )
871 .await
872}
873
874async fn segment_headers_responder<NC>(
875 nats_client: &NatsClient,
876 node_client: &NC,
877) -> anyhow::Result<()>
878where
879 NC: NodeClient,
880{
881 nats_client
882 .request_responder(
883 None,
884 Some("ab.controller".to_string()),
885 |ClusterControllerSegmentHeadersRequest { segment_indices }| async move {
886 node_client
887 .segment_headers(segment_indices.clone())
888 .await
889 .inspect_err(|error| {
890 warn!(%error, ?segment_indices, "Failed to get segment headers");
891 })
892 .ok()
893 },
894 )
895 .await
896}
897
898async fn find_piece_responder(
899 nats_client: &NatsClient,
900 farmer_caches: &[(&str, &FarmerCache)],
901) -> anyhow::Result<()> {
902 futures::future::try_join(
903 farmer_caches
904 .iter()
905 .map(|(cache_group, farmer_cache)| {
906 nats_client.request_responder(
907 Some(cache_group),
908 Some("ab.controller".to_string()),
909 move |ClusterControllerFindPieceInCacheRequest { piece_index }| async move {
910 Some(farmer_cache.find_piece(piece_index).await)
911 },
912 )
913 })
914 .collect::<FuturesUnordered<_>>()
915 .next()
916 .map(|result| result.unwrap_or(Ok(()))),
917 nats_client.request_responder(
918 Some(GLOBAL_CACHE_GROUP),
919 Some("ab.controller".to_string()),
920 |ClusterControllerFindPieceInCacheRequest { piece_index }| async move {
921 let (_cache_group, farmer_cache) = farmer_caches.iter().choose(&mut rand::rng())?;
922 Some(farmer_cache.find_piece(piece_index).await)
923 },
924 ),
925 )
926 .await
927 .map(|((), ())| ())
928}
929
930async fn find_pieces_responder(
931 nats_client: &NatsClient,
932 farmer_caches: &[(&str, &FarmerCache)],
933) -> anyhow::Result<()> {
934 futures::future::try_join(
935 farmer_caches
936 .iter()
937 .map(|(cache_group, farmer_cache)| {
938 nats_client.stream_request_responder(
939 Some(cache_group),
940 Some("ab.controller".to_string()),
941 move |ClusterControllerFindPiecesInCacheRequest { piece_indices }| async move {
942 Some(stream::iter(farmer_cache.find_pieces(piece_indices).await))
943 },
944 )
945 })
946 .collect::<FuturesUnordered<_>>()
947 .next()
948 .map(|result| result.unwrap_or(Ok(()))),
949 nats_client.stream_request_responder(
950 Some(GLOBAL_CACHE_GROUP),
951 Some("ab.controller".to_string()),
952 |ClusterControllerFindPiecesInCacheRequest { piece_indices }| async move {
953 let (_cache_group, farmer_cache) = farmer_caches.iter().choose(&mut rand::rng())?;
954 Some(stream::iter(farmer_cache.find_pieces(piece_indices).await))
955 },
956 ),
957 )
958 .await
959 .map(|((), ())| ())
960}
961
962async fn piece_responder<PG>(nats_client: &NatsClient, piece_getter: &PG) -> anyhow::Result<()>
963where
964 PG: PieceGetter + Sync,
965{
966 nats_client
967 .request_responder(
968 None,
969 Some("ab.controller".to_string()),
970 |ClusterControllerPieceRequest { piece_index }| async move {
971 piece_getter
972 .get_piece(piece_index)
973 .await
974 .inspect_err(|error| warn!(%error, %piece_index, "Failed to get piece"))
975 .ok()
976 },
977 )
978 .await
979}
980
981async fn pieces_responder<PG>(nats_client: &NatsClient, piece_getter: &PG) -> anyhow::Result<()>
982where
983 PG: PieceGetter + Sync,
984{
985 nats_client
986 .stream_request_responder(
987 None,
988 Some("ab.controller".to_string()),
989 |ClusterControllerPiecesRequest { piece_indices }| async move {
990 piece_getter
991 .get_pieces(piece_indices)
992 .await
993 .map(|stream| {
994 Box::pin(stream.filter_map(
995 |(piece_index, maybe_piece_result)| async move {
996 match maybe_piece_result {
997 Ok(Some(piece)) => Some((piece_index, piece)),
998 Ok(None) => None,
999 Err(error) => {
1000 warn!(%error, %piece_index, "Failed to get piece");
1001 None
1002 }
1003 }
1004 },
1005 ))
1006 })
1007 .inspect_err(|error| warn!(%error, "Failed to get pieces"))
1008 .ok()
1009 },
1010 )
1011 .await
1012}