1use crate::cluster::controller::ClusterControllerCacheIdentifyBroadcast;
11use crate::cluster::nats_client::{
12 GenericBroadcast, GenericRequest, GenericStreamRequest, NatsClient,
13};
14use crate::farm::{FarmError, PieceCache, PieceCacheId, PieceCacheOffset};
15use ab_core_primitives::pieces::{Piece, PieceIndex};
16use anyhow::anyhow;
17use async_trait::async_trait;
18use derive_more::{Display, From};
19use futures::stream::FuturesUnordered;
20use futures::{FutureExt, Stream, StreamExt, select, stream};
21use parity_scale_codec::{Decode, Encode, EncodeLike, Input, Output};
22use std::collections::BTreeSet;
23use std::pin::Pin;
24use std::task::Poll;
25use std::time::{Duration, Instant};
26use tokio::time::MissedTickBehavior;
27use tracing::{Instrument, debug, error, info, info_span, trace, warn};
28use ulid::Ulid;
29
30const MIN_CACHE_IDENTIFICATION_INTERVAL: Duration = Duration::from_secs(1);
31
32#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Display, From)]
34pub enum ClusterCacheId {
35 Ulid(Ulid),
37}
38
39impl Encode for ClusterCacheId {
40 #[inline]
41 fn size_hint(&self) -> usize {
42 1_usize
43 + match self {
44 ClusterCacheId::Ulid(ulid) => 0_usize.saturating_add(Encode::size_hint(&ulid.0)),
45 }
46 }
47
48 #[inline]
49 fn encode_to<O>(&self, dest: &mut O)
50 where
51 O: Output + ?Sized,
52 {
53 match self {
54 ClusterCacheId::Ulid(ulid) => {
55 dest.push_byte(0);
56 Encode::encode_to(&ulid.0, dest);
57 }
58 }
59 }
60}
61
62impl EncodeLike for ClusterCacheId {}
63
64impl Decode for ClusterCacheId {
65 #[inline]
66 fn decode<I>(input: &mut I) -> Result<Self, parity_scale_codec::Error>
67 where
68 I: Input,
69 {
70 match input.read_byte().map_err(|e| {
71 e.chain("Could not decode `ClusterCacheId`, failed to read variant byte")
72 })? {
73 0 => u128::decode(input)
74 .map(|ulid| ClusterCacheId::Ulid(Ulid(ulid)))
75 .map_err(|e| e.chain("Could not decode `ClusterCacheId::Ulid.0`")),
76 _ => Err("Could not decode `ClusterCacheId`, variant doesn't exist".into()),
77 }
78 }
79}
80
81#[expect(clippy::new_without_default)]
82impl ClusterCacheId {
83 #[inline]
85 pub fn new() -> Self {
86 Self::Ulid(Ulid::new())
87 }
88}
89
90#[derive(Debug, Clone, Encode, Decode)]
92pub struct ClusterCacheIdentifyBroadcast {
93 pub cluster_cache_id: ClusterCacheId,
95}
96
97impl GenericBroadcast for ClusterCacheIdentifyBroadcast {
98 const SUBJECT: &'static str = "ab.cache.*.cache-identify";
100}
101
102#[derive(Debug, Clone, Encode, Decode)]
104pub struct ClusterCacheDetailsRequest;
105
106impl GenericStreamRequest for ClusterCacheDetailsRequest {
107 const SUBJECT: &'static str = "ab.cache.*.details";
109 type Response = ClusterPieceCacheDetails;
110}
111
112#[derive(Debug, Clone, Encode, Decode)]
114pub struct ClusterPieceCacheDetails {
115 pub piece_cache_id: PieceCacheId,
117 pub max_num_elements: u32,
119}
120
121#[derive(Debug, Clone, Encode, Decode)]
123struct ClusterCacheWritePieceRequest {
124 offset: PieceCacheOffset,
125 piece_index: PieceIndex,
126 piece: Piece,
127}
128
129impl GenericRequest for ClusterCacheWritePieceRequest {
130 const SUBJECT: &'static str = "ab.cache.*.write-piece";
132 type Response = Result<(), String>;
133}
134
135#[derive(Debug, Clone, Encode, Decode)]
137struct ClusterCacheReadPieceIndexRequest {
138 offset: PieceCacheOffset,
139}
140
141impl GenericRequest for ClusterCacheReadPieceIndexRequest {
142 const SUBJECT: &'static str = "ab.cache.*.read-piece-index";
144 type Response = Result<Option<PieceIndex>, String>;
145}
146
147#[derive(Debug, Clone, Encode, Decode)]
149pub(super) struct ClusterCacheReadPieceRequest {
150 pub(super) offset: PieceCacheOffset,
151}
152
153impl GenericRequest for ClusterCacheReadPieceRequest {
154 const SUBJECT: &'static str = "ab.cache.*.read-piece";
156 type Response = Result<Option<(PieceIndex, Piece)>, String>;
157}
158
159#[derive(Debug, Clone, Encode, Decode)]
161pub(super) struct ClusterCacheReadPiecesRequest {
162 pub(super) offsets: Vec<PieceCacheOffset>,
163}
164
165impl GenericStreamRequest for ClusterCacheReadPiecesRequest {
166 const SUBJECT: &'static str = "ab.cache.*.read-pieces";
168 type Response = Result<(PieceCacheOffset, Option<(PieceIndex, Piece)>), String>;
169}
170
171#[derive(Debug, Clone, Encode, Decode)]
173struct ClusterCacheContentsRequest;
174
175impl GenericStreamRequest for ClusterCacheContentsRequest {
176 const SUBJECT: &'static str = "ab.cache.*.contents";
178 type Response = Result<(PieceCacheOffset, Option<PieceIndex>), String>;
179}
180
181#[derive(Debug)]
183pub struct ClusterPieceCache {
184 piece_cache_id: PieceCacheId,
185 piece_cache_id_string: String,
186 max_num_elements: u32,
187 nats_client: NatsClient,
188}
189
190#[async_trait]
191impl PieceCache for ClusterPieceCache {
192 fn id(&self) -> &PieceCacheId {
193 &self.piece_cache_id
194 }
195
196 #[inline]
197 fn max_num_elements(&self) -> u32 {
198 self.max_num_elements
199 }
200
201 async fn contents(
202 &self,
203 ) -> Result<
204 Box<
205 dyn Stream<Item = Result<(PieceCacheOffset, Option<PieceIndex>), FarmError>>
206 + Unpin
207 + Send
208 + '_,
209 >,
210 FarmError,
211 > {
212 Ok(Box::new(
213 self.nats_client
214 .stream_request(
215 &ClusterCacheContentsRequest,
216 Some(&self.piece_cache_id_string),
217 )
218 .await?
219 .map(|response| response.map_err(FarmError::from)),
220 ))
221 }
222
223 async fn write_piece(
224 &self,
225 offset: PieceCacheOffset,
226 piece_index: PieceIndex,
227 piece: &Piece,
228 ) -> Result<(), FarmError> {
229 Ok(self
230 .nats_client
231 .request(
232 &ClusterCacheWritePieceRequest {
233 offset,
234 piece_index,
235 piece: piece.clone(),
236 },
237 Some(&self.piece_cache_id_string),
238 )
239 .await??)
240 }
241
242 async fn read_piece_index(
243 &self,
244 offset: PieceCacheOffset,
245 ) -> Result<Option<PieceIndex>, FarmError> {
246 Ok(self
247 .nats_client
248 .request(
249 &ClusterCacheReadPieceIndexRequest { offset },
250 Some(&self.piece_cache_id_string),
251 )
252 .await??)
253 }
254
255 async fn read_piece(
256 &self,
257 offset: PieceCacheOffset,
258 ) -> Result<Option<(PieceIndex, Piece)>, FarmError> {
259 Ok(self
260 .nats_client
261 .request(
262 &ClusterCacheReadPieceRequest { offset },
263 Some(&self.piece_cache_id_string),
264 )
265 .await??)
266 }
267
268 async fn read_pieces(
269 &self,
270 offsets: Box<dyn Iterator<Item = PieceCacheOffset> + Send>,
271 ) -> Result<
272 Box<
273 dyn Stream<Item = Result<(PieceCacheOffset, Option<(PieceIndex, Piece)>), FarmError>>
274 + Send
275 + Unpin
276 + '_,
277 >,
278 FarmError,
279 > {
280 let offsets = offsets.collect::<Vec<_>>();
281 let mut offsets_set = BTreeSet::from_iter(offsets.iter().copied());
282 let mut stream = self
283 .nats_client
284 .stream_request(
285 &ClusterCacheReadPiecesRequest { offsets },
286 Some(&self.piece_cache_id_string),
287 )
288 .await?
289 .map(|response| response.map_err(FarmError::from))
290 .fuse();
291 Ok(Box::new(stream::poll_fn(move |cx| {
292 if !stream.is_done() {
293 match stream.poll_next_unpin(cx) {
294 Poll::Ready(Some(response)) => {
295 return Poll::Ready(Some(response.inspect(|(offset, _)| {
296 offsets_set.remove(offset);
297 })));
298 }
299 Poll::Ready(None) => {
300 }
302 Poll::Pending => {
303 return Poll::Pending;
304 }
305 }
306 }
307
308 match offsets_set.pop_first() {
311 Some(offset) => Poll::Ready(Some(Ok((offset, None)))),
312 None => Poll::Ready(None),
313 }
314 })))
315 }
316}
317
318impl ClusterPieceCache {
319 #[inline]
322 pub fn new(
323 piece_cache_id: PieceCacheId,
324 max_num_elements: u32,
325 nats_client: NatsClient,
326 ) -> ClusterPieceCache {
327 Self {
328 piece_cache_id,
329 piece_cache_id_string: piece_cache_id.to_string(),
330 max_num_elements,
331 nats_client,
332 }
333 }
334}
335
336#[derive(Debug)]
337struct CacheDetails<'a, C> {
338 piece_cache_id: PieceCacheId,
339 piece_cache_id_string: String,
340 cache: &'a C,
341}
342
343pub async fn cache_service<C>(
346 nats_client: NatsClient,
347 caches: &[C],
348 cache_group: &str,
349 identification_broadcast_interval: Duration,
350 primary_instance: bool,
351) -> anyhow::Result<()>
352where
353 C: PieceCache,
354{
355 let cluster_cache_id = ClusterCacheId::new();
356 let cluster_cache_id_string = cluster_cache_id.to_string();
357
358 let caches_details = caches
359 .iter()
360 .map(|cache| {
361 let piece_cache_id = *cache.id();
362
363 if primary_instance {
364 info!(%piece_cache_id, max_num_elements = %cache.max_num_elements(), "Created piece cache");
365 }
366
367 CacheDetails {
368 piece_cache_id,
369 piece_cache_id_string: piece_cache_id.to_string(),
370 cache,
371 }
372 })
373 .collect::<Vec<_>>();
374
375 if primary_instance {
376 select! {
377 result = identify_responder(
378 &nats_client,
379 cluster_cache_id,
380 cache_group,
381 identification_broadcast_interval
382 ).fuse() => {
383 result
384 },
385 result = piece_cache_details_responder(
386 &nats_client,
387 &cluster_cache_id_string,
388 &caches_details
389 ).fuse() => {
390 result
391 },
392 result = write_piece_responder(&nats_client, &caches_details).fuse() => {
393 result
394 },
395 result = read_piece_index_responder(&nats_client, &caches_details).fuse() => {
396 result
397 },
398 result = read_piece_responder(&nats_client, &caches_details).fuse() => {
399 result
400 },
401 result = read_pieces_responder(&nats_client, &caches_details).fuse() => {
402 result
403 },
404 result = contents_responder(&nats_client, &caches_details).fuse() => {
405 result
406 },
407 }
408 } else {
409 select! {
410 result = write_piece_responder(&nats_client, &caches_details).fuse() => {
411 result
412 },
413 result = read_piece_index_responder(&nats_client, &caches_details).fuse() => {
414 result
415 },
416 result = read_piece_responder(&nats_client, &caches_details).fuse() => {
417 result
418 },
419 result = read_pieces_responder(&nats_client, &caches_details).fuse() => {
420 result
421 },
422 result = contents_responder(&nats_client, &caches_details).fuse() => {
423 result
424 },
425 }
426 }
427}
428
429async fn identify_responder(
435 nats_client: &NatsClient,
436 cluster_cache_id: ClusterCacheId,
437 cache_group: &str,
438 identification_broadcast_interval: Duration,
439) -> anyhow::Result<()> {
440 let mut subscription = nats_client
441 .subscribe_to_broadcasts::<ClusterControllerCacheIdentifyBroadcast>(Some(cache_group), None)
442 .await
443 .map_err(|error| {
444 anyhow!("Failed to subscribe to cache identify broadcast requests: {error}")
445 })?
446 .fuse();
447
448 let mut interval = tokio::time::interval(identification_broadcast_interval);
450 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
451
452 let mut last_identification = Instant::now();
453
454 loop {
455 select! {
456 maybe_message = subscription.next() => {
457 let Some(message) = maybe_message else {
458 debug!("Identify broadcast stream ended");
459 break;
460 };
461
462 trace!(?message, "Cache received identify broadcast message");
463
464 if last_identification.elapsed() < MIN_CACHE_IDENTIFICATION_INTERVAL {
465 continue;
467 }
468
469 last_identification = Instant::now();
470 send_identify_broadcast(nats_client, cluster_cache_id, cache_group).await;
471 interval.reset();
472 }
473 _ = interval.tick().fuse() => {
474 last_identification = Instant::now();
475 trace!("Cache self-identification");
476
477 send_identify_broadcast(nats_client, cluster_cache_id, cache_group).await;
478 }
479 }
480 }
481
482 Ok(())
483}
484
485async fn send_identify_broadcast(
486 nats_client: &NatsClient,
487 cluster_cache_id: ClusterCacheId,
488 cache_group: &str,
489) {
490 if let Err(error) = nats_client
491 .broadcast(
492 &ClusterCacheIdentifyBroadcast { cluster_cache_id },
493 cache_group,
494 )
495 .await
496 {
497 warn!(%cluster_cache_id, %error, "Failed to send cache identify notification");
498 }
499}
500
501async fn piece_cache_details_responder<C>(
502 nats_client: &NatsClient,
503 cluster_cache_id_string: &str,
504 caches_details: &[CacheDetails<'_, C>],
505) -> anyhow::Result<()>
506where
507 C: PieceCache,
508{
509 nats_client
510 .stream_request_responder(
511 Some(cluster_cache_id_string),
512 Some(cluster_cache_id_string.to_string()),
513 |_request: ClusterCacheDetailsRequest| async {
514 Some(stream::iter(caches_details.iter().map(|cache_details| {
515 ClusterPieceCacheDetails {
516 piece_cache_id: cache_details.piece_cache_id,
517 max_num_elements: cache_details.cache.max_num_elements(),
518 }
519 })))
520 },
521 )
522 .await
523}
524
525async fn write_piece_responder<C>(
526 nats_client: &NatsClient,
527 caches_details: &[CacheDetails<'_, C>],
528) -> anyhow::Result<()>
529where
530 C: PieceCache,
531{
532 caches_details
533 .iter()
534 .map(|cache_details| async move {
535 nats_client
536 .request_responder(
537 Some(cache_details.piece_cache_id_string.as_str()),
538 Some(cache_details.piece_cache_id_string.clone()),
539 |request: ClusterCacheWritePieceRequest| async move {
540 Some(
541 cache_details
542 .cache
543 .write_piece(request.offset, request.piece_index, &request.piece)
544 .await
545 .map_err(|error| error.to_string()),
546 )
547 },
548 )
549 .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id))
550 .await
551 })
552 .collect::<FuturesUnordered<_>>()
553 .next()
554 .await
555 .ok_or_else(|| anyhow!("No caches"))?
556}
557
558async fn read_piece_index_responder<C>(
559 nats_client: &NatsClient,
560 caches_details: &[CacheDetails<'_, C>],
561) -> anyhow::Result<()>
562where
563 C: PieceCache,
564{
565 caches_details
566 .iter()
567 .map(|cache_details| async move {
568 nats_client
569 .request_responder(
570 Some(cache_details.piece_cache_id_string.as_str()),
571 Some(cache_details.piece_cache_id_string.clone()),
572 |request: ClusterCacheReadPieceIndexRequest| async move {
573 Some(
574 cache_details
575 .cache
576 .read_piece_index(request.offset)
577 .await
578 .map_err(|error| error.to_string()),
579 )
580 },
581 )
582 .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id))
583 .await
584 })
585 .collect::<FuturesUnordered<_>>()
586 .next()
587 .await
588 .ok_or_else(|| anyhow!("No caches"))?
589}
590
591async fn read_piece_responder<C>(
592 nats_client: &NatsClient,
593 caches_details: &[CacheDetails<'_, C>],
594) -> anyhow::Result<()>
595where
596 C: PieceCache,
597{
598 caches_details
599 .iter()
600 .map(|cache_details| async move {
601 nats_client
602 .request_responder(
603 Some(cache_details.piece_cache_id_string.as_str()),
604 Some(cache_details.piece_cache_id_string.clone()),
605 |request: ClusterCacheReadPieceRequest| async move {
606 Some(
607 cache_details
608 .cache
609 .read_piece(request.offset)
610 .await
611 .map_err(|error| error.to_string()),
612 )
613 },
614 )
615 .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id))
616 .await
617 })
618 .collect::<FuturesUnordered<_>>()
619 .next()
620 .await
621 .ok_or_else(|| anyhow!("No caches"))?
622}
623
624async fn read_pieces_responder<C>(
625 nats_client: &NatsClient,
626 caches_details: &[CacheDetails<'_, C>],
627) -> anyhow::Result<()>
628where
629 C: PieceCache,
630{
631 caches_details
632 .iter()
633 .map(|cache_details| async move {
634 nats_client
635 .stream_request_responder::<_, _, Pin<Box<dyn Stream<Item = _> + Send>>, _>(
636 Some(cache_details.piece_cache_id_string.as_str()),
637 Some(cache_details.piece_cache_id_string.clone()),
638 |ClusterCacheReadPiecesRequest { offsets }| async move {
639 Some(
640 match cache_details
641 .cache
642 .read_pieces(Box::new(offsets.into_iter()))
643 .await
644 {
645 Ok(contents) => Box::pin(contents.map(|maybe_cache_element| {
646 maybe_cache_element.map_err(|error| error.to_string())
647 })) as _,
648 Err(error) => {
649 error!(%error, "Failed to read pieces");
650
651 Box::pin(stream::once(async move {
652 Err(format!("Failed to read pieces: {error}"))
653 })) as _
654 }
655 },
656 )
657 },
658 )
659 .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id))
660 .await
661 })
662 .collect::<FuturesUnordered<_>>()
663 .next()
664 .await
665 .ok_or_else(|| anyhow!("No caches"))?
666}
667
668async fn contents_responder<C>(
669 nats_client: &NatsClient,
670 caches_details: &[CacheDetails<'_, C>],
671) -> anyhow::Result<()>
672where
673 C: PieceCache,
674{
675 caches_details
676 .iter()
677 .map(|cache_details| async move {
678 nats_client
679 .stream_request_responder::<_, _, Pin<Box<dyn Stream<Item = _> + Send>>, _>(
680 Some(cache_details.piece_cache_id_string.as_str()),
681 Some(cache_details.piece_cache_id_string.clone()),
682 |_request: ClusterCacheContentsRequest| async move {
683 Some(match cache_details.cache.contents().await {
684 Ok(contents) => Box::pin(contents.map(|maybe_cache_element| {
685 maybe_cache_element.map_err(|error| error.to_string())
686 })) as _,
687 Err(error) => {
688 error!(%error, "Failed to get contents");
689
690 Box::pin(stream::once(async move {
691 Err(format!("Failed to get contents: {error}"))
692 })) as _
693 }
694 })
695 },
696 )
697 .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id))
698 .await
699 })
700 .collect::<FuturesUnordered<_>>()
701 .next()
702 .await
703 .ok_or_else(|| anyhow!("No caches"))?
704}