Skip to main content

ab_farmer/cluster/
cache.rs

1//! Farming cluster cache
2//!
3//! Cache is responsible for caching pieces within allocated space to accelerate plotting and serve
4//! pieces in response to DSN requests.
5//!
6//! This module exposes some data structures for NATS communication, custom piece cache
7//! implementation designed to work with cluster cache and a service function to drive the backend
8//! part of the cache.
9
10use crate::cluster::controller::ClusterControllerCacheIdentifyBroadcast;
11use crate::cluster::nats_client::{
12    GenericBroadcast, GenericRequest, GenericStreamRequest, NatsClient,
13};
14use crate::farm::{FarmError, PieceCache, PieceCacheId, PieceCacheOffset};
15use ab_core_primitives::pieces::{Piece, PieceIndex};
16use anyhow::anyhow;
17use async_trait::async_trait;
18use derive_more::{Display, From};
19use futures::stream::FuturesUnordered;
20use futures::{FutureExt, Stream, StreamExt, select, stream};
21use parity_scale_codec::{Decode, Encode, EncodeLike, Input, Output};
22use std::collections::BTreeSet;
23use std::pin::Pin;
24use std::task::Poll;
25use std::time::{Duration, Instant};
26use tokio::time::MissedTickBehavior;
27use tracing::{Instrument, debug, error, info, info_span, trace, warn};
28use ulid::Ulid;
29
30const MIN_CACHE_IDENTIFICATION_INTERVAL: Duration = Duration::from_secs(1);
31
32/// An identifier for a cluster cache, can be used for in logs, thread names, etc.
33#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Display, From)]
34pub enum ClusterCacheId {
35    /// Cache ID
36    Ulid(Ulid),
37}
38
39impl Encode for ClusterCacheId {
40    #[inline]
41    fn size_hint(&self) -> usize {
42        1_usize
43            + match self {
44                ClusterCacheId::Ulid(ulid) => 0_usize.saturating_add(Encode::size_hint(&ulid.0)),
45            }
46    }
47
48    #[inline]
49    fn encode_to<O>(&self, dest: &mut O)
50    where
51        O: Output + ?Sized,
52    {
53        match self {
54            ClusterCacheId::Ulid(ulid) => {
55                dest.push_byte(0);
56                Encode::encode_to(&ulid.0, dest);
57            }
58        }
59    }
60}
61
62impl EncodeLike for ClusterCacheId {}
63
64impl Decode for ClusterCacheId {
65    #[inline]
66    fn decode<I>(input: &mut I) -> Result<Self, parity_scale_codec::Error>
67    where
68        I: Input,
69    {
70        match input.read_byte().map_err(|e| {
71            e.chain("Could not decode `ClusterCacheId`, failed to read variant byte")
72        })? {
73            0 => u128::decode(input)
74                .map(|ulid| ClusterCacheId::Ulid(Ulid(ulid)))
75                .map_err(|e| e.chain("Could not decode `ClusterCacheId::Ulid.0`")),
76            _ => Err("Could not decode `ClusterCacheId`, variant doesn't exist".into()),
77        }
78    }
79}
80
81#[expect(clippy::new_without_default)]
82impl ClusterCacheId {
83    /// Creates new ID
84    #[inline]
85    pub fn new() -> Self {
86        Self::Ulid(Ulid::new())
87    }
88}
89
90/// Broadcast with identification details by caches
91#[derive(Debug, Clone, Encode, Decode)]
92pub struct ClusterCacheIdentifyBroadcast {
93    /// Cache ID
94    pub cluster_cache_id: ClusterCacheId,
95}
96
97impl GenericBroadcast for ClusterCacheIdentifyBroadcast {
98    /// `*` here stands for cache group
99    const SUBJECT: &'static str = "ab.cache.*.cache-identify";
100}
101
102/// Request cache details from cache
103#[derive(Debug, Clone, Encode, Decode)]
104pub struct ClusterCacheDetailsRequest;
105
106impl GenericStreamRequest for ClusterCacheDetailsRequest {
107    /// `*` here stands for piece cache ID
108    const SUBJECT: &'static str = "ab.cache.*.details";
109    type Response = ClusterPieceCacheDetails;
110}
111
112/// Cache details
113#[derive(Debug, Clone, Encode, Decode)]
114pub struct ClusterPieceCacheDetails {
115    /// Piece Cache ID
116    pub piece_cache_id: PieceCacheId,
117    /// Max number of elements in this cache
118    pub max_num_elements: u32,
119}
120
121/// Write piece into cache
122#[derive(Debug, Clone, Encode, Decode)]
123struct ClusterCacheWritePieceRequest {
124    offset: PieceCacheOffset,
125    piece_index: PieceIndex,
126    piece: Piece,
127}
128
129impl GenericRequest for ClusterCacheWritePieceRequest {
130    /// `*` here stands for piece cache ID
131    const SUBJECT: &'static str = "ab.cache.*.write-piece";
132    type Response = Result<(), String>;
133}
134
135/// Read piece index from cache
136#[derive(Debug, Clone, Encode, Decode)]
137struct ClusterCacheReadPieceIndexRequest {
138    offset: PieceCacheOffset,
139}
140
141impl GenericRequest for ClusterCacheReadPieceIndexRequest {
142    /// `*` here stands for piece cache ID
143    const SUBJECT: &'static str = "ab.cache.*.read-piece-index";
144    type Response = Result<Option<PieceIndex>, String>;
145}
146
147/// Read piece from cache
148#[derive(Debug, Clone, Encode, Decode)]
149pub(super) struct ClusterCacheReadPieceRequest {
150    pub(super) offset: PieceCacheOffset,
151}
152
153impl GenericRequest for ClusterCacheReadPieceRequest {
154    /// `*` here stands for piece cache ID
155    const SUBJECT: &'static str = "ab.cache.*.read-piece";
156    type Response = Result<Option<(PieceIndex, Piece)>, String>;
157}
158
159/// Read piece from cache
160#[derive(Debug, Clone, Encode, Decode)]
161pub(super) struct ClusterCacheReadPiecesRequest {
162    pub(super) offsets: Vec<PieceCacheOffset>,
163}
164
165impl GenericStreamRequest for ClusterCacheReadPiecesRequest {
166    /// `*` here stands for piece cache ID
167    const SUBJECT: &'static str = "ab.cache.*.read-pieces";
168    type Response = Result<(PieceCacheOffset, Option<(PieceIndex, Piece)>), String>;
169}
170
171/// Collect plotted pieces from farmer
172#[derive(Debug, Clone, Encode, Decode)]
173struct ClusterCacheContentsRequest;
174
175impl GenericStreamRequest for ClusterCacheContentsRequest {
176    /// `*` here stands for piece cache ID
177    const SUBJECT: &'static str = "ab.cache.*.contents";
178    type Response = Result<(PieceCacheOffset, Option<PieceIndex>), String>;
179}
180
181/// Cluster cache implementation
182#[derive(Debug)]
183pub struct ClusterPieceCache {
184    piece_cache_id: PieceCacheId,
185    piece_cache_id_string: String,
186    max_num_elements: u32,
187    nats_client: NatsClient,
188}
189
190#[async_trait]
191impl PieceCache for ClusterPieceCache {
192    fn id(&self) -> &PieceCacheId {
193        &self.piece_cache_id
194    }
195
196    #[inline]
197    fn max_num_elements(&self) -> u32 {
198        self.max_num_elements
199    }
200
201    async fn contents(
202        &self,
203    ) -> Result<
204        Box<
205            dyn Stream<Item = Result<(PieceCacheOffset, Option<PieceIndex>), FarmError>>
206                + Unpin
207                + Send
208                + '_,
209        >,
210        FarmError,
211    > {
212        Ok(Box::new(
213            self.nats_client
214                .stream_request(
215                    &ClusterCacheContentsRequest,
216                    Some(&self.piece_cache_id_string),
217                )
218                .await?
219                .map(|response| response.map_err(FarmError::from)),
220        ))
221    }
222
223    async fn write_piece(
224        &self,
225        offset: PieceCacheOffset,
226        piece_index: PieceIndex,
227        piece: &Piece,
228    ) -> Result<(), FarmError> {
229        Ok(self
230            .nats_client
231            .request(
232                &ClusterCacheWritePieceRequest {
233                    offset,
234                    piece_index,
235                    piece: piece.clone(),
236                },
237                Some(&self.piece_cache_id_string),
238            )
239            .await??)
240    }
241
242    async fn read_piece_index(
243        &self,
244        offset: PieceCacheOffset,
245    ) -> Result<Option<PieceIndex>, FarmError> {
246        Ok(self
247            .nats_client
248            .request(
249                &ClusterCacheReadPieceIndexRequest { offset },
250                Some(&self.piece_cache_id_string),
251            )
252            .await??)
253    }
254
255    async fn read_piece(
256        &self,
257        offset: PieceCacheOffset,
258    ) -> Result<Option<(PieceIndex, Piece)>, FarmError> {
259        Ok(self
260            .nats_client
261            .request(
262                &ClusterCacheReadPieceRequest { offset },
263                Some(&self.piece_cache_id_string),
264            )
265            .await??)
266    }
267
268    async fn read_pieces(
269        &self,
270        offsets: Box<dyn Iterator<Item = PieceCacheOffset> + Send>,
271    ) -> Result<
272        Box<
273            dyn Stream<Item = Result<(PieceCacheOffset, Option<(PieceIndex, Piece)>), FarmError>>
274                + Send
275                + Unpin
276                + '_,
277        >,
278        FarmError,
279    > {
280        let offsets = offsets.collect::<Vec<_>>();
281        let mut offsets_set = BTreeSet::from_iter(offsets.iter().copied());
282        let mut stream = self
283            .nats_client
284            .stream_request(
285                &ClusterCacheReadPiecesRequest { offsets },
286                Some(&self.piece_cache_id_string),
287            )
288            .await?
289            .map(|response| response.map_err(FarmError::from))
290            .fuse();
291        Ok(Box::new(stream::poll_fn(move |cx| {
292            if !stream.is_done() {
293                match stream.poll_next_unpin(cx) {
294                    Poll::Ready(Some(response)) => {
295                        return Poll::Ready(Some(response.inspect(|(offset, _)| {
296                            offsets_set.remove(offset);
297                        })));
298                    }
299                    Poll::Ready(None) => {
300                        // Handled as a general case below
301                    }
302                    Poll::Pending => {
303                        return Poll::Pending;
304                    }
305                }
306            }
307
308            // Uphold invariant of the trait that some result should be returned for every unique
309            // provided offset
310            match offsets_set.pop_first() {
311                Some(offset) => Poll::Ready(Some(Ok((offset, None)))),
312                None => Poll::Ready(None),
313            }
314        })))
315    }
316}
317
318impl ClusterPieceCache {
319    /// Create a new instance using information from previously received
320    /// [`ClusterCacheIdentifyBroadcast`]
321    #[inline]
322    pub fn new(
323        piece_cache_id: PieceCacheId,
324        max_num_elements: u32,
325        nats_client: NatsClient,
326    ) -> ClusterPieceCache {
327        Self {
328            piece_cache_id,
329            piece_cache_id_string: piece_cache_id.to_string(),
330            max_num_elements,
331            nats_client,
332        }
333    }
334}
335
336#[derive(Debug)]
337struct CacheDetails<'a, C> {
338    piece_cache_id: PieceCacheId,
339    piece_cache_id_string: String,
340    cache: &'a C,
341}
342
343/// Create cache service for specified caches that will be processing incoming requests and send
344/// periodic identify notifications
345pub async fn cache_service<C>(
346    nats_client: NatsClient,
347    caches: &[C],
348    cache_group: &str,
349    identification_broadcast_interval: Duration,
350    primary_instance: bool,
351) -> anyhow::Result<()>
352where
353    C: PieceCache,
354{
355    let cluster_cache_id = ClusterCacheId::new();
356    let cluster_cache_id_string = cluster_cache_id.to_string();
357
358    let caches_details = caches
359        .iter()
360        .map(|cache| {
361            let piece_cache_id = *cache.id();
362
363            if primary_instance {
364                info!(%piece_cache_id, max_num_elements = %cache.max_num_elements(), "Created piece cache");
365            }
366
367            CacheDetails {
368                piece_cache_id,
369                piece_cache_id_string: piece_cache_id.to_string(),
370                cache,
371            }
372        })
373        .collect::<Vec<_>>();
374
375    if primary_instance {
376        select! {
377            result = identify_responder(
378                &nats_client,
379                cluster_cache_id,
380                cache_group,
381                identification_broadcast_interval
382            ).fuse() => {
383                result
384            },
385            result = piece_cache_details_responder(
386                &nats_client,
387                &cluster_cache_id_string,
388                &caches_details
389            ).fuse() => {
390                result
391            },
392            result = write_piece_responder(&nats_client, &caches_details).fuse() => {
393                result
394            },
395            result = read_piece_index_responder(&nats_client, &caches_details).fuse() => {
396                result
397            },
398            result = read_piece_responder(&nats_client, &caches_details).fuse() => {
399                result
400            },
401            result = read_pieces_responder(&nats_client, &caches_details).fuse() => {
402                result
403            },
404            result = contents_responder(&nats_client, &caches_details).fuse() => {
405                result
406            },
407        }
408    } else {
409        select! {
410            result = write_piece_responder(&nats_client, &caches_details).fuse() => {
411                result
412            },
413            result = read_piece_index_responder(&nats_client, &caches_details).fuse() => {
414                result
415            },
416            result = read_piece_responder(&nats_client, &caches_details).fuse() => {
417                result
418            },
419            result = read_pieces_responder(&nats_client, &caches_details).fuse() => {
420                result
421            },
422            result = contents_responder(&nats_client, &caches_details).fuse() => {
423                result
424            },
425        }
426    }
427}
428
429/// Listen for cache identification broadcast from controller and publish identification
430/// broadcast in response, also send periodic notifications reminding that cache exists.
431///
432/// Implementation is using concurrency with multiple tokio tasks, but can be started multiple times
433/// per controller instance in order to parallelize more work across threads if needed.
434async fn identify_responder(
435    nats_client: &NatsClient,
436    cluster_cache_id: ClusterCacheId,
437    cache_group: &str,
438    identification_broadcast_interval: Duration,
439) -> anyhow::Result<()> {
440    let mut subscription = nats_client
441        .subscribe_to_broadcasts::<ClusterControllerCacheIdentifyBroadcast>(Some(cache_group), None)
442        .await
443        .map_err(|error| {
444            anyhow!("Failed to subscribe to cache identify broadcast requests: {error}")
445        })?
446        .fuse();
447
448    // Also send periodic updates in addition to the subscription response
449    let mut interval = tokio::time::interval(identification_broadcast_interval);
450    interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
451
452    let mut last_identification = Instant::now();
453
454    loop {
455        select! {
456            maybe_message = subscription.next() => {
457                let Some(message) = maybe_message else {
458                    debug!("Identify broadcast stream ended");
459                    break;
460                };
461
462                trace!(?message, "Cache received identify broadcast message");
463
464                if last_identification.elapsed() < MIN_CACHE_IDENTIFICATION_INTERVAL {
465                    // Skip too frequent identification requests
466                    continue;
467                }
468
469                last_identification = Instant::now();
470                send_identify_broadcast(nats_client, cluster_cache_id, cache_group).await;
471                interval.reset();
472            }
473            _ = interval.tick().fuse() => {
474                last_identification = Instant::now();
475                trace!("Cache self-identification");
476
477                send_identify_broadcast(nats_client, cluster_cache_id, cache_group).await;
478            }
479        }
480    }
481
482    Ok(())
483}
484
485async fn send_identify_broadcast(
486    nats_client: &NatsClient,
487    cluster_cache_id: ClusterCacheId,
488    cache_group: &str,
489) {
490    if let Err(error) = nats_client
491        .broadcast(
492            &ClusterCacheIdentifyBroadcast { cluster_cache_id },
493            cache_group,
494        )
495        .await
496    {
497        warn!(%cluster_cache_id, %error, "Failed to send cache identify notification");
498    }
499}
500
501async fn piece_cache_details_responder<C>(
502    nats_client: &NatsClient,
503    cluster_cache_id_string: &str,
504    caches_details: &[CacheDetails<'_, C>],
505) -> anyhow::Result<()>
506where
507    C: PieceCache,
508{
509    nats_client
510        .stream_request_responder(
511            Some(cluster_cache_id_string),
512            Some(cluster_cache_id_string.to_string()),
513            |_request: ClusterCacheDetailsRequest| async {
514                Some(stream::iter(caches_details.iter().map(|cache_details| {
515                    ClusterPieceCacheDetails {
516                        piece_cache_id: cache_details.piece_cache_id,
517                        max_num_elements: cache_details.cache.max_num_elements(),
518                    }
519                })))
520            },
521        )
522        .await
523}
524
525async fn write_piece_responder<C>(
526    nats_client: &NatsClient,
527    caches_details: &[CacheDetails<'_, C>],
528) -> anyhow::Result<()>
529where
530    C: PieceCache,
531{
532    caches_details
533        .iter()
534        .map(|cache_details| async move {
535            nats_client
536                .request_responder(
537                    Some(cache_details.piece_cache_id_string.as_str()),
538                    Some(cache_details.piece_cache_id_string.clone()),
539                    |request: ClusterCacheWritePieceRequest| async move {
540                        Some(
541                            cache_details
542                                .cache
543                                .write_piece(request.offset, request.piece_index, &request.piece)
544                                .await
545                                .map_err(|error| error.to_string()),
546                        )
547                    },
548                )
549                .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id))
550                .await
551        })
552        .collect::<FuturesUnordered<_>>()
553        .next()
554        .await
555        .ok_or_else(|| anyhow!("No caches"))?
556}
557
558async fn read_piece_index_responder<C>(
559    nats_client: &NatsClient,
560    caches_details: &[CacheDetails<'_, C>],
561) -> anyhow::Result<()>
562where
563    C: PieceCache,
564{
565    caches_details
566        .iter()
567        .map(|cache_details| async move {
568            nats_client
569                .request_responder(
570                    Some(cache_details.piece_cache_id_string.as_str()),
571                    Some(cache_details.piece_cache_id_string.clone()),
572                    |request: ClusterCacheReadPieceIndexRequest| async move {
573                        Some(
574                            cache_details
575                                .cache
576                                .read_piece_index(request.offset)
577                                .await
578                                .map_err(|error| error.to_string()),
579                        )
580                    },
581                )
582                .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id))
583                .await
584        })
585        .collect::<FuturesUnordered<_>>()
586        .next()
587        .await
588        .ok_or_else(|| anyhow!("No caches"))?
589}
590
591async fn read_piece_responder<C>(
592    nats_client: &NatsClient,
593    caches_details: &[CacheDetails<'_, C>],
594) -> anyhow::Result<()>
595where
596    C: PieceCache,
597{
598    caches_details
599        .iter()
600        .map(|cache_details| async move {
601            nats_client
602                .request_responder(
603                    Some(cache_details.piece_cache_id_string.as_str()),
604                    Some(cache_details.piece_cache_id_string.clone()),
605                    |request: ClusterCacheReadPieceRequest| async move {
606                        Some(
607                            cache_details
608                                .cache
609                                .read_piece(request.offset)
610                                .await
611                                .map_err(|error| error.to_string()),
612                        )
613                    },
614                )
615                .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id))
616                .await
617        })
618        .collect::<FuturesUnordered<_>>()
619        .next()
620        .await
621        .ok_or_else(|| anyhow!("No caches"))?
622}
623
624async fn read_pieces_responder<C>(
625    nats_client: &NatsClient,
626    caches_details: &[CacheDetails<'_, C>],
627) -> anyhow::Result<()>
628where
629    C: PieceCache,
630{
631    caches_details
632        .iter()
633        .map(|cache_details| async move {
634            nats_client
635                .stream_request_responder::<_, _, Pin<Box<dyn Stream<Item = _> + Send>>, _>(
636                    Some(cache_details.piece_cache_id_string.as_str()),
637                    Some(cache_details.piece_cache_id_string.clone()),
638                    |ClusterCacheReadPiecesRequest { offsets }| async move {
639                        Some(
640                            match cache_details
641                                .cache
642                                .read_pieces(Box::new(offsets.into_iter()))
643                                .await
644                            {
645                                Ok(contents) => Box::pin(contents.map(|maybe_cache_element| {
646                                    maybe_cache_element.map_err(|error| error.to_string())
647                                })) as _,
648                                Err(error) => {
649                                    error!(%error, "Failed to read pieces");
650
651                                    Box::pin(stream::once(async move {
652                                        Err(format!("Failed to read pieces: {error}"))
653                                    })) as _
654                                }
655                            },
656                        )
657                    },
658                )
659                .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id))
660                .await
661        })
662        .collect::<FuturesUnordered<_>>()
663        .next()
664        .await
665        .ok_or_else(|| anyhow!("No caches"))?
666}
667
668async fn contents_responder<C>(
669    nats_client: &NatsClient,
670    caches_details: &[CacheDetails<'_, C>],
671) -> anyhow::Result<()>
672where
673    C: PieceCache,
674{
675    caches_details
676        .iter()
677        .map(|cache_details| async move {
678            nats_client
679                .stream_request_responder::<_, _, Pin<Box<dyn Stream<Item = _> + Send>>, _>(
680                    Some(cache_details.piece_cache_id_string.as_str()),
681                    Some(cache_details.piece_cache_id_string.clone()),
682                    |_request: ClusterCacheContentsRequest| async move {
683                        Some(match cache_details.cache.contents().await {
684                            Ok(contents) => Box::pin(contents.map(|maybe_cache_element| {
685                                maybe_cache_element.map_err(|error| error.to_string())
686                            })) as _,
687                            Err(error) => {
688                                error!(%error, "Failed to get contents");
689
690                                Box::pin(stream::once(async move {
691                                    Err(format!("Failed to get contents: {error}"))
692                                })) as _
693                            }
694                        })
695                    },
696                )
697                .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id))
698                .await
699        })
700        .collect::<FuturesUnordered<_>>()
701        .next()
702        .await
703        .ok_or_else(|| anyhow!("No caches"))?
704}