Skip to main content

ab_farmer/cluster/
cache.rs

1//! Farming cluster cache
2//!
3//! Cache is responsible for caching pieces within allocated space to accelerate plotting and serve
4//! pieces in response to DSN requests.
5//!
6//! This module exposes some data structures for NATS communication, custom piece cache
7//! implementation designed to work with cluster cache and a service function to drive the backend
8//! part of the cache.
9
10use crate::cluster::controller::ClusterControllerCacheIdentifyBroadcast;
11use crate::cluster::nats_client::{
12    GenericBroadcast, GenericRequest, GenericStreamRequest, NatsClient,
13};
14use crate::farm::{FarmError, PieceCache, PieceCacheId, PieceCacheOffset};
15use ab_core_primitives::pieces::{Piece, PieceIndex};
16use anyhow::anyhow;
17use async_trait::async_trait;
18use derive_more::{Display, From};
19use futures::stream::FuturesUnordered;
20use futures::{FutureExt, Stream, StreamExt, select, stream};
21use parity_scale_codec::{Decode, Encode, EncodeLike, Input, Output};
22use std::collections::BTreeSet;
23use std::pin::Pin;
24use std::task::Poll;
25use std::time::{Duration, Instant};
26use tokio::time::MissedTickBehavior;
27use tracing::{Instrument, debug, error, info, info_span, trace, warn};
28use ulid::Ulid;
29
30const MIN_CACHE_IDENTIFICATION_INTERVAL: Duration = Duration::from_secs(1);
31
32/// An identifier for a cluster cache, can be used for in logs, thread names, etc.
33#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Display, From)]
34pub enum ClusterCacheId {
35    /// Cache ID
36    Ulid(Ulid),
37}
38
39impl Encode for ClusterCacheId {
40    #[inline]
41    fn size_hint(&self) -> usize {
42        1_usize
43            + match self {
44                ClusterCacheId::Ulid(ulid) => 0_usize.saturating_add(Encode::size_hint(&ulid.0)),
45            }
46    }
47
48    #[inline]
49    fn encode_to<O>(&self, dest: &mut O)
50    where
51        O: Output + ?Sized,
52    {
53        match self {
54            ClusterCacheId::Ulid(ulid) => {
55                dest.push_byte(0);
56                Encode::encode_to(&ulid.0, dest);
57            }
58        }
59    }
60}
61
62impl EncodeLike for ClusterCacheId {}
63
64impl Decode for ClusterCacheId {
65    #[inline]
66    fn decode<I>(input: &mut I) -> Result<Self, parity_scale_codec::Error>
67    where
68        I: Input,
69    {
70        match input.read_byte().map_err(|e| {
71            e.chain("Could not decode `ClusterCacheId`, failed to read variant byte")
72        })? {
73            0 => u128::decode(input)
74                .map(|ulid| ClusterCacheId::Ulid(Ulid(ulid)))
75                .map_err(|e| e.chain("Could not decode `ClusterCacheId::Ulid.0`")),
76            _ => Err("Could not decode `ClusterCacheId`, variant doesn't exist".into()),
77        }
78    }
79}
80
81#[expect(
82    clippy::new_without_default,
83    reason = "Default has different semantics"
84)]
85impl ClusterCacheId {
86    /// Creates new ID
87    #[inline]
88    pub fn new() -> Self {
89        Self::Ulid(Ulid::new())
90    }
91}
92
93/// Broadcast with identification details by caches
94#[derive(Debug, Clone, Encode, Decode)]
95pub struct ClusterCacheIdentifyBroadcast {
96    /// Cache ID
97    pub cluster_cache_id: ClusterCacheId,
98}
99
100impl GenericBroadcast for ClusterCacheIdentifyBroadcast {
101    /// `*` here stands for cache group
102    const SUBJECT: &'static str = "ab.cache.*.cache-identify";
103}
104
105/// Request cache details from cache
106#[derive(Debug, Clone, Encode, Decode)]
107pub struct ClusterCacheDetailsRequest;
108
109impl GenericStreamRequest for ClusterCacheDetailsRequest {
110    /// `*` here stands for piece cache ID
111    const SUBJECT: &'static str = "ab.cache.*.details";
112    type Response = ClusterPieceCacheDetails;
113}
114
115/// Cache details
116#[derive(Debug, Clone, Encode, Decode)]
117pub struct ClusterPieceCacheDetails {
118    /// Piece Cache ID
119    pub piece_cache_id: PieceCacheId,
120    /// Max number of elements in this cache
121    pub max_num_elements: u32,
122}
123
124/// Write piece into cache
125#[derive(Debug, Clone, Encode, Decode)]
126struct ClusterCacheWritePieceRequest {
127    offset: PieceCacheOffset,
128    piece_index: PieceIndex,
129    piece: Piece,
130}
131
132impl GenericRequest for ClusterCacheWritePieceRequest {
133    /// `*` here stands for piece cache ID
134    const SUBJECT: &'static str = "ab.cache.*.write-piece";
135    type Response = Result<(), String>;
136}
137
138/// Read piece index from cache
139#[derive(Debug, Clone, Encode, Decode)]
140struct ClusterCacheReadPieceIndexRequest {
141    offset: PieceCacheOffset,
142}
143
144impl GenericRequest for ClusterCacheReadPieceIndexRequest {
145    /// `*` here stands for piece cache ID
146    const SUBJECT: &'static str = "ab.cache.*.read-piece-index";
147    type Response = Result<Option<PieceIndex>, String>;
148}
149
150/// Read piece from cache
151#[derive(Debug, Clone, Encode, Decode)]
152pub(super) struct ClusterCacheReadPieceRequest {
153    pub(super) offset: PieceCacheOffset,
154}
155
156impl GenericRequest for ClusterCacheReadPieceRequest {
157    /// `*` here stands for piece cache ID
158    const SUBJECT: &'static str = "ab.cache.*.read-piece";
159    type Response = Result<Option<(PieceIndex, Piece)>, String>;
160}
161
162/// Read piece from cache
163#[derive(Debug, Clone, Encode, Decode)]
164pub(super) struct ClusterCacheReadPiecesRequest {
165    pub(super) offsets: Vec<PieceCacheOffset>,
166}
167
168impl GenericStreamRequest for ClusterCacheReadPiecesRequest {
169    /// `*` here stands for piece cache ID
170    const SUBJECT: &'static str = "ab.cache.*.read-pieces";
171    type Response = Result<(PieceCacheOffset, Option<(PieceIndex, Piece)>), String>;
172}
173
174/// Collect plotted pieces from farmer
175#[derive(Debug, Clone, Encode, Decode)]
176struct ClusterCacheContentsRequest;
177
178impl GenericStreamRequest for ClusterCacheContentsRequest {
179    /// `*` here stands for piece cache ID
180    const SUBJECT: &'static str = "ab.cache.*.contents";
181    type Response = Result<(PieceCacheOffset, Option<PieceIndex>), String>;
182}
183
184/// Cluster cache implementation
185#[derive(Debug)]
186pub struct ClusterPieceCache {
187    piece_cache_id: PieceCacheId,
188    piece_cache_id_string: String,
189    max_num_elements: u32,
190    nats_client: NatsClient,
191}
192
193#[async_trait]
194impl PieceCache for ClusterPieceCache {
195    fn id(&self) -> &PieceCacheId {
196        &self.piece_cache_id
197    }
198
199    #[inline]
200    fn max_num_elements(&self) -> u32 {
201        self.max_num_elements
202    }
203
204    async fn contents(
205        &self,
206    ) -> Result<
207        Box<
208            dyn Stream<Item = Result<(PieceCacheOffset, Option<PieceIndex>), FarmError>>
209                + Unpin
210                + Send
211                + '_,
212        >,
213        FarmError,
214    > {
215        Ok(Box::new(
216            self.nats_client
217                .stream_request(
218                    &ClusterCacheContentsRequest,
219                    Some(&self.piece_cache_id_string),
220                )
221                .await?
222                .map(|response| response.map_err(FarmError::from)),
223        ))
224    }
225
226    async fn write_piece(
227        &self,
228        offset: PieceCacheOffset,
229        piece_index: PieceIndex,
230        piece: &Piece,
231    ) -> Result<(), FarmError> {
232        Ok(self
233            .nats_client
234            .request(
235                &ClusterCacheWritePieceRequest {
236                    offset,
237                    piece_index,
238                    piece: piece.clone(),
239                },
240                Some(&self.piece_cache_id_string),
241            )
242            .await??)
243    }
244
245    async fn read_piece_index(
246        &self,
247        offset: PieceCacheOffset,
248    ) -> Result<Option<PieceIndex>, FarmError> {
249        Ok(self
250            .nats_client
251            .request(
252                &ClusterCacheReadPieceIndexRequest { offset },
253                Some(&self.piece_cache_id_string),
254            )
255            .await??)
256    }
257
258    async fn read_piece(
259        &self,
260        offset: PieceCacheOffset,
261    ) -> Result<Option<(PieceIndex, Piece)>, FarmError> {
262        Ok(self
263            .nats_client
264            .request(
265                &ClusterCacheReadPieceRequest { offset },
266                Some(&self.piece_cache_id_string),
267            )
268            .await??)
269    }
270
271    async fn read_pieces(
272        &self,
273        offsets: Box<dyn Iterator<Item = PieceCacheOffset> + Send>,
274    ) -> Result<
275        Box<
276            dyn Stream<Item = Result<(PieceCacheOffset, Option<(PieceIndex, Piece)>), FarmError>>
277                + Send
278                + Unpin
279                + '_,
280        >,
281        FarmError,
282    > {
283        let offsets = offsets.collect::<Vec<_>>();
284        let mut offsets_set = BTreeSet::from_iter(offsets.iter().copied());
285        let mut stream = self
286            .nats_client
287            .stream_request(
288                &ClusterCacheReadPiecesRequest { offsets },
289                Some(&self.piece_cache_id_string),
290            )
291            .await?
292            .map(|response| response.map_err(FarmError::from))
293            .fuse();
294        Ok(Box::new(stream::poll_fn(move |cx| {
295            if !stream.is_done() {
296                match stream.poll_next_unpin(cx) {
297                    Poll::Ready(Some(response)) => {
298                        return Poll::Ready(Some(response.inspect(|(offset, _)| {
299                            offsets_set.remove(offset);
300                        })));
301                    }
302                    Poll::Ready(None) => {
303                        // Handled as a general case below
304                    }
305                    Poll::Pending => {
306                        return Poll::Pending;
307                    }
308                }
309            }
310
311            // Uphold invariant of the trait that some result should be returned for every unique
312            // provided offset
313            match offsets_set.pop_first() {
314                Some(offset) => Poll::Ready(Some(Ok((offset, None)))),
315                None => Poll::Ready(None),
316            }
317        })))
318    }
319}
320
321impl ClusterPieceCache {
322    /// Create a new instance using information from previously received
323    /// [`ClusterCacheIdentifyBroadcast`]
324    #[inline]
325    pub fn new(
326        piece_cache_id: PieceCacheId,
327        max_num_elements: u32,
328        nats_client: NatsClient,
329    ) -> ClusterPieceCache {
330        Self {
331            piece_cache_id,
332            piece_cache_id_string: piece_cache_id.to_string(),
333            max_num_elements,
334            nats_client,
335        }
336    }
337}
338
339#[derive(Debug)]
340struct CacheDetails<'a, C> {
341    piece_cache_id: PieceCacheId,
342    piece_cache_id_string: String,
343    cache: &'a C,
344}
345
346/// Create cache service for specified caches that will be processing incoming requests and send
347/// periodic identify notifications
348pub async fn cache_service<C>(
349    nats_client: NatsClient,
350    caches: &[C],
351    cache_group: &str,
352    identification_broadcast_interval: Duration,
353    primary_instance: bool,
354) -> anyhow::Result<()>
355where
356    C: PieceCache,
357{
358    let cluster_cache_id = ClusterCacheId::new();
359    let cluster_cache_id_string = cluster_cache_id.to_string();
360
361    let caches_details = caches
362        .iter()
363        .map(|cache| {
364            let piece_cache_id = *cache.id();
365
366            if primary_instance {
367                info!(%piece_cache_id, max_num_elements = %cache.max_num_elements(), "Created piece cache");
368            }
369
370            CacheDetails {
371                piece_cache_id,
372                piece_cache_id_string: piece_cache_id.to_string(),
373                cache,
374            }
375        })
376        .collect::<Vec<_>>();
377
378    if primary_instance {
379        select! {
380            result = identify_responder(
381                &nats_client,
382                cluster_cache_id,
383                cache_group,
384                identification_broadcast_interval
385            ).fuse() => {
386                result
387            },
388            result = piece_cache_details_responder(
389                &nats_client,
390                &cluster_cache_id_string,
391                &caches_details
392            ).fuse() => {
393                result
394            },
395            result = write_piece_responder(&nats_client, &caches_details).fuse() => {
396                result
397            },
398            result = read_piece_index_responder(&nats_client, &caches_details).fuse() => {
399                result
400            },
401            result = read_piece_responder(&nats_client, &caches_details).fuse() => {
402                result
403            },
404            result = read_pieces_responder(&nats_client, &caches_details).fuse() => {
405                result
406            },
407            result = contents_responder(&nats_client, &caches_details).fuse() => {
408                result
409            },
410        }
411    } else {
412        select! {
413            result = write_piece_responder(&nats_client, &caches_details).fuse() => {
414                result
415            },
416            result = read_piece_index_responder(&nats_client, &caches_details).fuse() => {
417                result
418            },
419            result = read_piece_responder(&nats_client, &caches_details).fuse() => {
420                result
421            },
422            result = read_pieces_responder(&nats_client, &caches_details).fuse() => {
423                result
424            },
425            result = contents_responder(&nats_client, &caches_details).fuse() => {
426                result
427            },
428        }
429    }
430}
431
432/// Listen for cache identification broadcast from controller and publish identification
433/// broadcast in response, also send periodic notifications reminding that cache exists.
434///
435/// Implementation is using concurrency with multiple tokio tasks, but can be started multiple times
436/// per controller instance in order to parallelize more work across threads if needed.
437async fn identify_responder(
438    nats_client: &NatsClient,
439    cluster_cache_id: ClusterCacheId,
440    cache_group: &str,
441    identification_broadcast_interval: Duration,
442) -> anyhow::Result<()> {
443    let mut subscription = nats_client
444        .subscribe_to_broadcasts::<ClusterControllerCacheIdentifyBroadcast>(Some(cache_group), None)
445        .await
446        .map_err(|error| {
447            anyhow!("Failed to subscribe to cache identify broadcast requests: {error}")
448        })?
449        .fuse();
450
451    // Also send periodic updates in addition to the subscription response
452    let mut interval = tokio::time::interval(identification_broadcast_interval);
453    interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
454
455    let mut last_identification = Instant::now();
456
457    loop {
458        select! {
459            maybe_message = subscription.next() => {
460                let Some(message) = maybe_message else {
461                    debug!("Identify broadcast stream ended");
462                    break;
463                };
464
465                trace!(?message, "Cache received identify broadcast message");
466
467                if last_identification.elapsed() < MIN_CACHE_IDENTIFICATION_INTERVAL {
468                    // Skip too frequent identification requests
469                    continue;
470                }
471
472                last_identification = Instant::now();
473                send_identify_broadcast(nats_client, cluster_cache_id, cache_group).await;
474                interval.reset();
475            }
476            _ = interval.tick().fuse() => {
477                last_identification = Instant::now();
478                trace!("Cache self-identification");
479
480                send_identify_broadcast(nats_client, cluster_cache_id, cache_group).await;
481            }
482        }
483    }
484
485    Ok(())
486}
487
488async fn send_identify_broadcast(
489    nats_client: &NatsClient,
490    cluster_cache_id: ClusterCacheId,
491    cache_group: &str,
492) {
493    if let Err(error) = nats_client
494        .broadcast(
495            &ClusterCacheIdentifyBroadcast { cluster_cache_id },
496            cache_group,
497        )
498        .await
499    {
500        warn!(%cluster_cache_id, %error, "Failed to send cache identify notification");
501    }
502}
503
504async fn piece_cache_details_responder<C>(
505    nats_client: &NatsClient,
506    cluster_cache_id_string: &str,
507    caches_details: &[CacheDetails<'_, C>],
508) -> anyhow::Result<()>
509where
510    C: PieceCache,
511{
512    nats_client
513        .stream_request_responder(
514            Some(cluster_cache_id_string),
515            Some(cluster_cache_id_string.to_string()),
516            |_request: ClusterCacheDetailsRequest| async {
517                Some(stream::iter(caches_details.iter().map(|cache_details| {
518                    ClusterPieceCacheDetails {
519                        piece_cache_id: cache_details.piece_cache_id,
520                        max_num_elements: cache_details.cache.max_num_elements(),
521                    }
522                })))
523            },
524        )
525        .await
526}
527
528async fn write_piece_responder<C>(
529    nats_client: &NatsClient,
530    caches_details: &[CacheDetails<'_, C>],
531) -> anyhow::Result<()>
532where
533    C: PieceCache,
534{
535    caches_details
536        .iter()
537        .map(|cache_details| async move {
538            nats_client
539                .request_responder(
540                    Some(cache_details.piece_cache_id_string.as_str()),
541                    Some(cache_details.piece_cache_id_string.clone()),
542                    |request: ClusterCacheWritePieceRequest| async move {
543                        Some(
544                            cache_details
545                                .cache
546                                .write_piece(request.offset, request.piece_index, &request.piece)
547                                .await
548                                .map_err(|error| error.to_string()),
549                        )
550                    },
551                )
552                .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id))
553                .await
554        })
555        .collect::<FuturesUnordered<_>>()
556        .next()
557        .await
558        .ok_or_else(|| anyhow!("No caches"))?
559}
560
561async fn read_piece_index_responder<C>(
562    nats_client: &NatsClient,
563    caches_details: &[CacheDetails<'_, C>],
564) -> anyhow::Result<()>
565where
566    C: PieceCache,
567{
568    caches_details
569        .iter()
570        .map(|cache_details| async move {
571            nats_client
572                .request_responder(
573                    Some(cache_details.piece_cache_id_string.as_str()),
574                    Some(cache_details.piece_cache_id_string.clone()),
575                    |request: ClusterCacheReadPieceIndexRequest| async move {
576                        Some(
577                            cache_details
578                                .cache
579                                .read_piece_index(request.offset)
580                                .await
581                                .map_err(|error| error.to_string()),
582                        )
583                    },
584                )
585                .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id))
586                .await
587        })
588        .collect::<FuturesUnordered<_>>()
589        .next()
590        .await
591        .ok_or_else(|| anyhow!("No caches"))?
592}
593
594async fn read_piece_responder<C>(
595    nats_client: &NatsClient,
596    caches_details: &[CacheDetails<'_, C>],
597) -> anyhow::Result<()>
598where
599    C: PieceCache,
600{
601    caches_details
602        .iter()
603        .map(|cache_details| async move {
604            nats_client
605                .request_responder(
606                    Some(cache_details.piece_cache_id_string.as_str()),
607                    Some(cache_details.piece_cache_id_string.clone()),
608                    |request: ClusterCacheReadPieceRequest| async move {
609                        Some(
610                            cache_details
611                                .cache
612                                .read_piece(request.offset)
613                                .await
614                                .map_err(|error| error.to_string()),
615                        )
616                    },
617                )
618                .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id))
619                .await
620        })
621        .collect::<FuturesUnordered<_>>()
622        .next()
623        .await
624        .ok_or_else(|| anyhow!("No caches"))?
625}
626
627async fn read_pieces_responder<C>(
628    nats_client: &NatsClient,
629    caches_details: &[CacheDetails<'_, C>],
630) -> anyhow::Result<()>
631where
632    C: PieceCache,
633{
634    caches_details
635        .iter()
636        .map(|cache_details| async move {
637            nats_client
638                .stream_request_responder::<_, _, Pin<Box<dyn Stream<Item = _> + Send>>, _>(
639                    Some(cache_details.piece_cache_id_string.as_str()),
640                    Some(cache_details.piece_cache_id_string.clone()),
641                    |ClusterCacheReadPiecesRequest { offsets }| async move {
642                        Some(
643                            match cache_details
644                                .cache
645                                .read_pieces(Box::new(offsets.into_iter()))
646                                .await
647                            {
648                                Ok(contents) => Box::pin(contents.map(|maybe_cache_element| {
649                                    maybe_cache_element.map_err(|error| error.to_string())
650                                })) as _,
651                                Err(error) => {
652                                    error!(%error, "Failed to read pieces");
653
654                                    Box::pin(stream::once(async move {
655                                        Err(format!("Failed to read pieces: {error}"))
656                                    })) as _
657                                }
658                            },
659                        )
660                    },
661                )
662                .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id))
663                .await
664        })
665        .collect::<FuturesUnordered<_>>()
666        .next()
667        .await
668        .ok_or_else(|| anyhow!("No caches"))?
669}
670
671async fn contents_responder<C>(
672    nats_client: &NatsClient,
673    caches_details: &[CacheDetails<'_, C>],
674) -> anyhow::Result<()>
675where
676    C: PieceCache,
677{
678    caches_details
679        .iter()
680        .map(|cache_details| async move {
681            nats_client
682                .stream_request_responder::<_, _, Pin<Box<dyn Stream<Item = _> + Send>>, _>(
683                    Some(cache_details.piece_cache_id_string.as_str()),
684                    Some(cache_details.piece_cache_id_string.clone()),
685                    |_request: ClusterCacheContentsRequest| async move {
686                        Some(match cache_details.cache.contents().await {
687                            Ok(contents) => Box::pin(contents.map(|maybe_cache_element| {
688                                maybe_cache_element.map_err(|error| error.to_string())
689                            })) as _,
690                            Err(error) => {
691                                error!(%error, "Failed to get contents");
692
693                                Box::pin(stream::once(async move {
694                                    Err(format!("Failed to get contents: {error}"))
695                                })) as _
696                            }
697                        })
698                    },
699                )
700                .instrument(info_span!("", piece_cache_id = %cache_details.piece_cache_id))
701                .await
702        })
703        .collect::<FuturesUnordered<_>>()
704        .next()
705        .await
706        .ok_or_else(|| anyhow!("No caches"))?
707}