1use crate::cluster::controller::ClusterControllerCacheIdentifyBroadcast;
11use crate::cluster::nats_client::{
12 GenericBroadcast, GenericRequest, GenericStreamRequest, NatsClient,
13};
14use crate::farm::{FarmError, PieceCache, PieceCacheId, PieceCacheOffset};
15use ab_core_primitives::pieces::{Piece, PieceIndex};
16use anyhow::anyhow;
17use async_trait::async_trait;
18use derive_more::{Display, From};
19use futures::stream::FuturesUnordered;
20use futures::{FutureExt, Stream, StreamExt, select, stream};
21use parity_scale_codec::{Decode, Encode, EncodeLike, Input, Output};
22use std::collections::BTreeSet;
23use std::pin::Pin;
24use std::task::Poll;
25use std::time::{Duration, Instant};
26use tokio::time::MissedTickBehavior;
27use tracing::{Instrument, debug, error, info, info_span, trace, warn};
28use ulid::Ulid;
29
30const MIN_CACHE_IDENTIFICATION_INTERVAL: Duration = Duration::from_secs(1);
31
32#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Display, From)]
34pub enum ClusterCacheId {
35 Ulid(Ulid),
37}
38
39impl Encode for ClusterCacheId {
40 #[inline]
41 fn size_hint(&self) -> usize {
42 1_usize
43 + match self {
44 ClusterCacheId::Ulid(ulid) => 0_usize.saturating_add(Encode::size_hint(&ulid.0)),
45 }
46 }
47
48 #[inline]
49 fn encode_to<O>(&self, dest: &mut O)
50 where
51 O: Output + ?Sized,
52 {
53 match self {
54 ClusterCacheId::Ulid(ulid) => {
55 dest.push_byte(0);
56 Encode::encode_to(&ulid.0, dest);
57 }
58 }
59 }
60}
61
62impl EncodeLike for ClusterCacheId {}
63
64impl Decode for ClusterCacheId {
65 #[inline]
66 fn decode<I>(input: &mut I) -> Result<Self, parity_scale_codec::Error>
67 where
68 I: Input,
69 {
70 match input.read_byte().map_err(|e| {
71 e.chain("Could not decode `ClusterCacheId`, failed to read variant byte")
72 })? {
73 0 => u128::decode(input)
74 .map(|ulid| ClusterCacheId::Ulid(Ulid(ulid)))
75 .map_err(|e| e.chain("Could not decode `ClusterCacheId::Ulid.0`")),
76 _ => Err("Could not decode `ClusterCacheId`, variant doesn't exist".into()),
77 }
78 }
79}
80
81#[expect(
82 clippy::new_without_default,
83 reason = "Default has different semantics"
84)]
85impl ClusterCacheId {
86 #[inline]
88 pub fn new() -> Self {
89 Self::Ulid(Ulid::new())
90 }
91}
92
93#[derive(Debug, Clone, Encode, Decode)]
95pub struct ClusterCacheIdentifyBroadcast {
96 pub cluster_cache_id: ClusterCacheId,
98}
99
100impl GenericBroadcast for ClusterCacheIdentifyBroadcast {
101 const SUBJECT: &'static str = "ab.cache.*.cache-identify";
103}
104
105#[derive(Debug, Clone, Encode, Decode)]
107pub struct ClusterCacheDetailsRequest;
108
109impl GenericStreamRequest for ClusterCacheDetailsRequest {
110 const SUBJECT: &'static str = "ab.cache.*.details";
112 type Response = ClusterPieceCacheDetails;
113}
114
115#[derive(Debug, Clone, Encode, Decode)]
117pub struct ClusterPieceCacheDetails {
118 pub piece_cache_id: PieceCacheId,
120 pub max_num_elements: u32,
122}
123
124#[derive(Debug, Clone, Encode, Decode)]
126struct ClusterCacheWritePieceRequest {
127 offset: PieceCacheOffset,
128 piece_index: PieceIndex,
129 piece: Piece,
130}
131
132impl GenericRequest for ClusterCacheWritePieceRequest {
133 const SUBJECT: &'static str = "ab.cache.*.write-piece";
135 type Response = Result<(), String>;
136}
137
138#[derive(Debug, Clone, Encode, Decode)]
140struct ClusterCacheReadPieceIndexRequest {
141 offset: PieceCacheOffset,
142}
143
144impl GenericRequest for ClusterCacheReadPieceIndexRequest {
145 const SUBJECT: &'static str = "ab.cache.*.read-piece-index";
147 type Response = Result<Option<PieceIndex>, String>;
148}
149
150#[derive(Debug, Clone, Encode, Decode)]
152pub(super) struct ClusterCacheReadPieceRequest {
153 pub(super) offset: PieceCacheOffset,
154}
155
156impl GenericRequest for ClusterCacheReadPieceRequest {
157 const SUBJECT: &'static str = "ab.cache.*.read-piece";
159 type Response = Result<Option<(PieceIndex, Piece)>, String>;
160}
161
162#[derive(Debug, Clone, Encode, Decode)]
164pub(super) struct ClusterCacheReadPiecesRequest {
165 pub(super) offsets: Vec<PieceCacheOffset>,
166}
167
168impl GenericStreamRequest for ClusterCacheReadPiecesRequest {
169 const SUBJECT: &'static str = "ab.cache.*.read-pieces";
171 type Response = Result<(PieceCacheOffset, Option<(PieceIndex, Piece)>), String>;
172}
173
174#[derive(Debug, Clone, Encode, Decode)]
176struct ClusterCacheContentsRequest;
177
178impl GenericStreamRequest for ClusterCacheContentsRequest {
179 const SUBJECT: &'static str = "ab.cache.*.contents";
181 type Response = Result<(PieceCacheOffset, Option<PieceIndex>), String>;
182}
183
184#[derive(Debug)]
186pub struct ClusterPieceCache {
187 piece_cache_id: PieceCacheId,
188 piece_cache_id_string: String,
189 max_num_elements: u32,
190 nats_client: NatsClient,
191}
192
193#[async_trait]
194impl PieceCache for ClusterPieceCache {
195 fn id(&self) -> &PieceCacheId {
196 &self.piece_cache_id
197 }
198
199 #[inline]
200 fn max_num_elements(&self) -> u32 {
201 self.max_num_elements
202 }
203
204 async fn contents(
205 &self,
206 ) -> Result<
207 Box<
208 dyn Stream<Item = Result<(PieceCacheOffset, Option<PieceIndex>), FarmError>>
209 + Unpin
210 + Send
211 + '_,
212 >,
213 FarmError,
214 > {
215 Ok(Box::new(
216 self.nats_client
217 .stream_request(
218 &ClusterCacheContentsRequest,
219 Some(&self.piece_cache_id_string),
220 )
221 .await?
222 .map(|response| response.map_err(FarmError::from)),
223 ))
224 }
225
226 async fn write_piece(
227 &self,
228 offset: PieceCacheOffset,
229 piece_index: PieceIndex,
230 piece: &Piece,
231 ) -> Result<(), FarmError> {
232 Ok(self
233 .nats_client
234 .request(
235 &ClusterCacheWritePieceRequest {
236 offset,
237 piece_index,
238 piece: piece.clone(),
239 },
240 Some(&self.piece_cache_id_string),
241 )
242 .await??)
243 }
244
245 async fn read_piece_index(
246 &self,
247 offset: PieceCacheOffset,
248 ) -> Result<Option<PieceIndex>, FarmError> {
249 Ok(self
250 .nats_client
251 .request(
252 &ClusterCacheReadPieceIndexRequest { offset },
253 Some(&self.piece_cache_id_string),
254 )
255 .await??)
256 }
257
258 async fn read_piece(
259 &self,
260 offset: PieceCacheOffset,
261 ) -> Result<Option<(PieceIndex, Piece)>, FarmError> {
262 Ok(self
263 .nats_client
264 .request(
265 &ClusterCacheReadPieceRequest { offset },
266 Some(&self.piece_cache_id_string),
267 )
268 .await??)
269 }
270
271 async fn read_pieces(
272 &self,
273 offsets: Box<dyn Iterator<Item = PieceCacheOffset> + Send>,
274 ) -> Result<
275 Box<
276 dyn Stream<Item = Result<(PieceCacheOffset, Option<(PieceIndex, Piece)>), FarmError>>
277 + Send
278 + Unpin
279 + '_,
280 >,
281 FarmError,
282 > {
283 let offsets = offsets.collect::<Vec<_>>();
284 let mut offsets_set = BTreeSet::from_iter(offsets.iter().copied());
285 let mut stream = self
286 .nats_client
287 .stream_request(
288 &ClusterCacheReadPiecesRequest { offsets },
289 Some(&self.piece_cache_id_string),
290 )
291 .await?
292 .map(|response| response.map_err(FarmError::from))
293 .fuse();
294 Ok(Box::new(stream::poll_fn(move |cx| {
295 if !stream.is_done() {
296 match stream.poll_next_unpin(cx) {
297 Poll::Ready(Some(response)) => {
298 return Poll::Ready(Some(response.inspect(|(offset, _)| {
299 offsets_set.remove(offset);
300 })));
301 }
302 Poll::Ready(None) => {
303 }
305 Poll::Pending => {
306 return Poll::Pending;
307 }
308 }
309 }
310
311 match offsets_set.pop_first() {
314 Some(offset) => Poll::Ready(Some(Ok((offset, None)))),
315 None => Poll::Ready(None),
316 }
317 })))
318 }
319}
320
321impl ClusterPieceCache {
322 #[inline]
325 pub fn new(
326 piece_cache_id: PieceCacheId,
327 max_num_elements: u32,
328 nats_client: NatsClient,
329 ) -> ClusterPieceCache {
330 Self {
331 piece_cache_id,
332 piece_cache_id_string: piece_cache_id.to_string(),
333 max_num_elements,
334 nats_client,
335 }
336 }
337}
338
339#[derive(Debug)]
340struct CacheDetails<'a, C> {
341 piece_cache_id: PieceCacheId,
342 piece_cache_id_string: String,
343 cache: &'a C,
344}
345
346pub async fn cache_service<C>(
349 nats_client: NatsClient,
350 caches: &[C],
351 cache_group: &str,
352 identification_broadcast_interval: Duration,
353 primary_instance: bool,
354) -> anyhow::Result<()>
355where
356 C: PieceCache,
357{
358 let cluster_cache_id = ClusterCacheId::new();
359 let cluster_cache_id_string = cluster_cache_id.to_string();
360
361 let caches_details = caches
362 .iter()
363 .map(|cache| {
364 let piece_cache_id = *cache.id();
365
366 if primary_instance {
367 info!(%piece_cache_id, max_num_elements = %cache.max_num_elements(), "Created piece cache");
368 }
369
370 CacheDetails {
371 piece_cache_id,
372 piece_cache_id_string: piece_cache_id.to_string(),
373 cache,
374 }
375 })
376 .collect::<Vec<_>>();
377
378 if primary_instance {
379 select! {
380 result = identify_responder(
381 &nats_client,
382 cluster_cache_id,
383 cache_group,
384 identification_broadcast_interval
385 ).fuse() => {
386 result
387 },
388 result = piece_cache_details_responder(
389 &nats_client,
390 &cluster_cache_id_string,
391 &caches_details
392 ).fuse() => {
393 result
394 },
395 result = write_piece_responder(&nats_client, &caches_details).fuse() => {
396 result
397 },
398 result = read_piece_index_responder(&nats_client, &caches_details).fuse() => {
399 result
400 },
401 result = read_piece_responder(&nats_client, &caches_details).fuse() => {
402 result
403 },
404 result = read_pieces_responder(&nats_client, &caches_details).fuse() => {
405 result
406 },
407 result = contents_responder(&nats_client, &caches_details).fuse() => {
408 result
409 },
410 }
411 } else {
412 select! {
413 result = write_piece_responder(&nats_client, &caches_details).fuse() => {
414 result
415 },
416 result = read_piece_index_responder(&nats_client, &caches_details).fuse() => {
417 result
418 },
419 result = read_piece_responder(&nats_client, &caches_details).fuse() => {
420 result
421 },
422 result = read_pieces_responder(&nats_client, &caches_details).fuse() => {
423 result
424 },
425 result = contents_responder(&nats_client, &caches_details).fuse() => {
426 result
427 },
428 }
429 }
430}
431
432async fn identify_responder(
438 nats_client: &NatsClient,
439 cluster_cache_id: ClusterCacheId,
440 cache_group: &str,
441 identification_broadcast_interval: Duration,
442) -> anyhow::Result<()> {
443 let mut subscription = nats_client
444 .subscribe_to_broadcasts::<ClusterControllerCacheIdentifyBroadcast>(Some(cache_group), None)
445 .await
446 .map_err(|error| {
447 anyhow!("Failed to subscribe to cache identify broadcast requests: {error}")
448 })?
449 .fuse();
450
451 let mut interval = tokio::time::interval(identification_broadcast_interval);
453 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
454
455 let mut last_identification = Instant::now();
456
457 loop {
458 select! {
459 maybe_message = subscription.next() => {
460 let Some(message) = maybe_message else {
461 debug!("Identify broadcast stream ended");
462 break;
463 };
464
465 trace!(?message, "Cache received identify broadcast message");
466
467 if last_identification.elapsed() < MIN_CACHE_IDENTIFICATION_INTERVAL {
468 continue;
470 }
471
472 last_identification = Instant::now();
473 send_identify_broadcast(nats_client, cluster_cache_id, cache_group).await;
474 interval.reset();
475 }
476 _ = interval.tick().fuse() => {
477 last_identification = Instant::now();
478 trace!("Cache self-identification");
479
480 send_identify_broadcast(nats_client, cluster_cache_id, cache_group).await;
481 }
482 }
483 }
484
485 Ok(())
486}
487
488async fn send_identify_broadcast(
489 nats_client: &NatsClient,
490 cluster_cache_id: ClusterCacheId,
491 cache_group: &str,
492) {
493 if let Err(error) = nats_client
494 .broadcast(
495 &ClusterCacheIdentifyBroadcast { cluster_cache_id },
496 cache_group,
497 )
498 .await
499 {
500 warn!(%cluster_cache_id, %error, "Failed to send cache identify notification");
501 }
502}
503
504async fn piece_cache_details_responder<C>(
505 nats_client: &NatsClient,
506 cluster_cache_id_string: &str,
507 caches_details: &[CacheDetails<'_, C>],
508) -> anyhow::Result<()>
509where
510 C: PieceCache,
511{
512 nats_client
513 .stream_request_responder(
514 Some(cluster_cache_id_string),
515 Some(cluster_cache_id_string.to_string()),
516 |_request: ClusterCacheDetailsRequest| async {
517 Some(stream::iter(caches_details.iter().map(|cache_details| {
518 ClusterPieceCacheDetails {
519 piece_cache_id: cache_details.piece_cache_id,
520 max_num_elements: cache_details.cache.max_num_elements(),
521 }
522 })))
523 },
524 )
525 .await
526}
527
528async fn write_piece_responder<C>(
529 nats_client: &NatsClient,
530 caches_details: &[CacheDetails<'_, C>],
531) -> anyhow::Result<()>
532where
533 C: PieceCache,
534{
535 caches_details
536 .iter()
537 .map(|cache_details| async move {
538 nats_client
539 .request_responder(
540 Some(cache_details.piece_cache_id_string.as_str()),
541 Some(cache_details.piece_cache_id_string.clone()),
542 |request: ClusterCacheWritePieceRequest| async move {
543 Some(
544 cache_details
545 .cache
546 .write_piece(request.offset, request.piece_index, &request.piece)
547 .await
548 .map_err(|error| error.to_string()),
549 )
550 },
551 )
552 .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id))
553 .await
554 })
555 .collect::<FuturesUnordered<_>>()
556 .next()
557 .await
558 .ok_or_else(|| anyhow!("No caches"))?
559}
560
561async fn read_piece_index_responder<C>(
562 nats_client: &NatsClient,
563 caches_details: &[CacheDetails<'_, C>],
564) -> anyhow::Result<()>
565where
566 C: PieceCache,
567{
568 caches_details
569 .iter()
570 .map(|cache_details| async move {
571 nats_client
572 .request_responder(
573 Some(cache_details.piece_cache_id_string.as_str()),
574 Some(cache_details.piece_cache_id_string.clone()),
575 |request: ClusterCacheReadPieceIndexRequest| async move {
576 Some(
577 cache_details
578 .cache
579 .read_piece_index(request.offset)
580 .await
581 .map_err(|error| error.to_string()),
582 )
583 },
584 )
585 .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id))
586 .await
587 })
588 .collect::<FuturesUnordered<_>>()
589 .next()
590 .await
591 .ok_or_else(|| anyhow!("No caches"))?
592}
593
594async fn read_piece_responder<C>(
595 nats_client: &NatsClient,
596 caches_details: &[CacheDetails<'_, C>],
597) -> anyhow::Result<()>
598where
599 C: PieceCache,
600{
601 caches_details
602 .iter()
603 .map(|cache_details| async move {
604 nats_client
605 .request_responder(
606 Some(cache_details.piece_cache_id_string.as_str()),
607 Some(cache_details.piece_cache_id_string.clone()),
608 |request: ClusterCacheReadPieceRequest| async move {
609 Some(
610 cache_details
611 .cache
612 .read_piece(request.offset)
613 .await
614 .map_err(|error| error.to_string()),
615 )
616 },
617 )
618 .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id))
619 .await
620 })
621 .collect::<FuturesUnordered<_>>()
622 .next()
623 .await
624 .ok_or_else(|| anyhow!("No caches"))?
625}
626
627async fn read_pieces_responder<C>(
628 nats_client: &NatsClient,
629 caches_details: &[CacheDetails<'_, C>],
630) -> anyhow::Result<()>
631where
632 C: PieceCache,
633{
634 caches_details
635 .iter()
636 .map(|cache_details| async move {
637 nats_client
638 .stream_request_responder::<_, _, Pin<Box<dyn Stream<Item = _> + Send>>, _>(
639 Some(cache_details.piece_cache_id_string.as_str()),
640 Some(cache_details.piece_cache_id_string.clone()),
641 |ClusterCacheReadPiecesRequest { offsets }| async move {
642 Some(
643 match cache_details
644 .cache
645 .read_pieces(Box::new(offsets.into_iter()))
646 .await
647 {
648 Ok(contents) => Box::pin(contents.map(|maybe_cache_element| {
649 maybe_cache_element.map_err(|error| error.to_string())
650 })) as _,
651 Err(error) => {
652 error!(%error, "Failed to read pieces");
653
654 Box::pin(stream::once(async move {
655 Err(format!("Failed to read pieces: {error}"))
656 })) as _
657 }
658 },
659 )
660 },
661 )
662 .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id))
663 .await
664 })
665 .collect::<FuturesUnordered<_>>()
666 .next()
667 .await
668 .ok_or_else(|| anyhow!("No caches"))?
669}
670
671async fn contents_responder<C>(
672 nats_client: &NatsClient,
673 caches_details: &[CacheDetails<'_, C>],
674) -> anyhow::Result<()>
675where
676 C: PieceCache,
677{
678 caches_details
679 .iter()
680 .map(|cache_details| async move {
681 nats_client
682 .stream_request_responder::<_, _, Pin<Box<dyn Stream<Item = _> + Send>>, _>(
683 Some(cache_details.piece_cache_id_string.as_str()),
684 Some(cache_details.piece_cache_id_string.clone()),
685 |_request: ClusterCacheContentsRequest| async move {
686 Some(match cache_details.cache.contents().await {
687 Ok(contents) => Box::pin(contents.map(|maybe_cache_element| {
688 maybe_cache_element.map_err(|error| error.to_string())
689 })) as _,
690 Err(error) => {
691 error!(%error, "Failed to get contents");
692
693 Box::pin(stream::once(async move {
694 Err(format!("Failed to get contents: {error}"))
695 })) as _
696 }
697 })
698 },
699 )
700 .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id))
701 .await
702 })
703 .collect::<FuturesUnordered<_>>()
704 .next()
705 .await
706 .ok_or_else(|| anyhow!("No caches"))?
707}