Skip to main content

ab_farmer/
farm.rs

1//! Abstract farm API
2//!
3//! This module provides a bunch of traits and simple data structures that serve as a layer of
4//! abstraction that improves composition without having assumptions about implementation details.
5//!
6//! Implementations can be local (backed by local disk) and remote (connected via network in some
7//! way). This crate provides a few of such implementations, but more can be created externally as
8//! well if needed without modifying the library itself.
9
10use ab_core_primitives::pieces::{Piece, PieceIndex, PieceOffset};
11use ab_core_primitives::sectors::SectorIndex;
12use ab_core_primitives::segments::SegmentIndex;
13use ab_farmer_components::auditing::AuditingError;
14use ab_farmer_components::plotting::PlottedSector;
15use ab_farmer_components::proving::ProvingError;
16use ab_farmer_rpc_primitives::SolutionResponse;
17use ab_networking::libp2p::kad::RecordKey;
18use async_trait::async_trait;
19use derive_more::{Display, From};
20use futures::Stream;
21use parity_scale_codec::{Decode, Encode, EncodeLike, Input, Output};
22use serde::{Deserialize, Serialize};
23use std::future::Future;
24use std::pin::Pin;
25use std::sync::Arc;
26use std::time::Duration;
27use std::{fmt, io};
28use thiserror::Error;
29use ulid::Ulid;
30
31pub mod plotted_pieces;
32
33/// Erased error type
34pub type FarmError = Box<dyn std::error::Error + Send + Sync + 'static>;
35/// Type alias used for event handlers
36pub type HandlerFn<A> = Arc<dyn Fn(&A) + Send + Sync + 'static>;
37
38/// Getter for plotted sectors
39#[async_trait]
40pub trait PlottedSectors: Send + Sync + fmt::Debug {
41    /// Get already plotted sectors
42    async fn get(
43        &self,
44    ) -> Result<
45        Box<dyn Stream<Item = Result<PlottedSector, FarmError>> + Unpin + Send + '_>,
46        FarmError,
47    >;
48}
49
50/// An identifier for a cache, can be used for in logs, thread names, etc.
51#[derive(
52    Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize, Display, From,
53)]
54#[serde(untagged)]
55pub enum PieceCacheId {
56    /// Cache ID
57    Ulid(Ulid),
58}
59
60impl Encode for PieceCacheId {
61    #[inline]
62    fn size_hint(&self) -> usize {
63        1_usize
64            + match self {
65                PieceCacheId::Ulid(ulid) => 0_usize.saturating_add(Encode::size_hint(&ulid.0)),
66            }
67    }
68
69    #[inline]
70    fn encode_to<O>(&self, dest: &mut O)
71    where
72        O: Output + ?Sized,
73    {
74        match self {
75            PieceCacheId::Ulid(ulid) => {
76                dest.push_byte(0);
77                Encode::encode_to(&ulid.0, dest);
78            }
79        }
80    }
81}
82
83impl EncodeLike for PieceCacheId {}
84
85impl Decode for PieceCacheId {
86    #[inline]
87    fn decode<I>(input: &mut I) -> Result<Self, parity_scale_codec::Error>
88    where
89        I: Input,
90    {
91        match input
92            .read_byte()
93            .map_err(|e| e.chain("Could not decode `PieceCacheId`, failed to read variant byte"))?
94        {
95            0 => u128::decode(input)
96                .map(|ulid| PieceCacheId::Ulid(Ulid(ulid)))
97                .map_err(|e| e.chain("Could not decode `PieceCacheId::Ulid.0`")),
98            _ => Err("Could not decode `PieceCacheId`, variant doesn't exist".into()),
99        }
100    }
101}
102
103#[expect(
104    clippy::new_without_default,
105    reason = "Default has different semantics"
106)]
107impl PieceCacheId {
108    /// Creates new ID
109    #[inline]
110    pub fn new() -> Self {
111        Self::Ulid(Ulid::new())
112    }
113}
114
115/// Offset wrapper for pieces in [`PieceCache`]
116#[derive(Debug, Display, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Encode, Decode)]
117#[repr(transparent)]
118pub struct PieceCacheOffset(pub(crate) u32);
119
120/// Abstract piece cache implementation.
121///
122/// Piece cache is a simple container that stores concatenated pieces in a flat file at specific
123/// offsets. Implementation doesn't have to be local though, cache can be remote somewhere on the
124/// network, APIs are intentionally async to account for that.
125#[async_trait]
126pub trait PieceCache: Send + Sync + fmt::Debug {
127    /// ID of this cache
128    fn id(&self) -> &PieceCacheId;
129
130    /// Max number of elements in this cache
131    fn max_num_elements(&self) -> u32;
132
133    /// Contents of this piece cache.
134    ///
135    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
136    /// doesn't happen for the same piece being accessed!
137    async fn contents(
138        &self,
139    ) -> Result<
140        Box<
141            dyn Stream<Item = Result<(PieceCacheOffset, Option<PieceIndex>), FarmError>>
142                + Unpin
143                + Send
144                + '_,
145        >,
146        FarmError,
147    >;
148
149    /// Store piece in cache at specified offset, replacing existing piece if there is one.
150    ///
151    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
152    /// doesn't happen for the same piece being accessed!
153    async fn write_piece(
154        &self,
155        offset: PieceCacheOffset,
156        piece_index: PieceIndex,
157        piece: &Piece,
158    ) -> Result<(), FarmError>;
159
160    /// Read piece index from cache at specified offset.
161    ///
162    /// Returns `None` if offset is out of range.
163    ///
164    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
165    /// doesn't happen for the same piece being accessed!
166    async fn read_piece_index(
167        &self,
168        offset: PieceCacheOffset,
169    ) -> Result<Option<PieceIndex>, FarmError>;
170
171    /// Read piece from cache at specified offset.
172    ///
173    /// Returns `None` if offset is out of range.
174    ///
175    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
176    /// doesn't happen for the same piece being accessed!
177    async fn read_piece(
178        &self,
179        offset: PieceCacheOffset,
180    ) -> Result<Option<(PieceIndex, Piece)>, FarmError>;
181
182    /// Read pieces from cache at specified offsets.
183    ///
184    /// Number of elements in returned stream is the same as number of unique `offsets`.
185    /// Returns `None` for offsets that are out of range.
186    ///
187    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
188    /// doesn't happen for the same pieces being accessed!
189    async fn read_pieces(
190        &self,
191        offsets: Box<dyn Iterator<Item = PieceCacheOffset> + Send>,
192    ) -> Result<
193        Box<
194            dyn Stream<Item = Result<(PieceCacheOffset, Option<(PieceIndex, Piece)>), FarmError>>
195                + Send
196                + Unpin
197                + '_,
198        >,
199        FarmError,
200    >;
201}
202
203/// Result of piece storing check
204#[derive(Debug, Copy, Clone, Encode, Decode)]
205pub enum MaybePieceStoredResult {
206    /// Piece is not stored already, and can't be added because the cache/plot is full.
207    No,
208    /// Cache might have a vacant slot to store this piece.
209    /// Vacant slots are not guaranteed, they can be overwritten by another piece or newly plotted
210    /// sector at any time.
211    Vacant,
212    /// Piece is already stored in the cache.
213    Yes,
214}
215
216/// Abstract plot cache implementation.
217///
218/// Plot cache is a cache that exploits space towards the end of the plot that is not yet occupied
219/// by sectors in order to increase effective caching space, which helps with plotting speed for
220/// small farmers since they don't need to retrieve the same pieces from the network over and over
221/// again, which is slower and uses a lot of Internet bandwidth.
222#[async_trait]
223pub trait PlotCache: Send + Sync + fmt::Debug {
224    /// Check if a piece is already stored in this cache, or it can be added to this cache.
225    /// The piece is not guaranteed to be stored, because it might be overwritten with a new
226    /// sector any time.
227    async fn is_piece_maybe_stored(
228        &self,
229        key: &RecordKey,
230    ) -> Result<MaybePieceStoredResult, FarmError>;
231
232    /// Store piece in cache if there is free space, and return `Ok(true)`.
233    /// Returns `Ok(false)` if there is no free space, or the farm or process is shutting down.
234    async fn try_store_piece(
235        &self,
236        piece_index: PieceIndex,
237        piece: &Piece,
238    ) -> Result<bool, FarmError>;
239
240    /// Read piece from cache.
241    ///
242    /// Returns `None` if not cached.
243    async fn read_piece(&self, key: &RecordKey) -> Result<Option<Piece>, FarmError>;
244}
245
246/// Auditing details
247#[derive(Debug, Copy, Clone, Encode, Decode)]
248pub struct AuditingDetails {
249    /// Number of sectors that were audited
250    pub sectors_count: u16,
251    /// Audit duration
252    pub time: Duration,
253}
254
255/// Result of the proving
256#[derive(Debug, Copy, Clone, Encode, Decode)]
257pub enum ProvingResult {
258    /// Proved successfully and accepted by the node
259    Success,
260    /// Proving took too long
261    Timeout,
262    /// Managed to prove within time limit, but node rejected solution, likely due to timeout on its
263    /// end
264    Rejected,
265    /// Proving failed altogether
266    Failed,
267}
268
269impl fmt::Display for ProvingResult {
270    #[inline]
271    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
272        f.write_str(match self {
273            Self::Success => "Success",
274            Self::Timeout => "Timeout",
275            Self::Rejected => "Rejected",
276            Self::Failed => "Failed",
277        })
278    }
279}
280
281/// Proving details
282#[derive(Debug, Copy, Clone, Encode, Decode)]
283pub struct ProvingDetails {
284    /// Whether proving ended up being successful
285    pub result: ProvingResult,
286    /// Audit duration
287    pub time: Duration,
288}
289
290/// Special decoded farming error
291#[derive(Debug, Encode, Decode)]
292pub struct DecodedFarmingError {
293    /// String representation of an error
294    error: String,
295    /// Whether error is fatal
296    is_fatal: bool,
297}
298
299impl fmt::Display for DecodedFarmingError {
300    #[inline]
301    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
302        fmt::Display::fmt(&self.error, f)
303    }
304}
305
306/// Errors that happen during farming
307#[derive(Debug, Error)]
308pub enum FarmingError {
309    /// Failed to subscribe to slot info notifications
310    #[error("Failed to subscribe to slot info notifications: {error}")]
311    FailedToSubscribeSlotInfo {
312        /// Lower-level error
313        error: anyhow::Error,
314    },
315    /// Failed to retrieve farmer info
316    #[error("Failed to retrieve farmer info: {error}")]
317    FailedToGetFarmerInfo {
318        /// Lower-level error
319        error: anyhow::Error,
320    },
321    /// Slot info notification stream ended
322    #[error("Slot info notification stream ended")]
323    SlotNotificationStreamEnded,
324    /// Low-level auditing error
325    #[error("Low-level auditing error: {0}")]
326    LowLevelAuditing(#[from] AuditingError),
327    /// Low-level proving error
328    #[error("Low-level proving error: {0}")]
329    LowLevelProving(#[from] ProvingError),
330    /// I/O error occurred
331    #[error("Farming I/O error: {0}")]
332    Io(#[from] io::Error),
333    /// Decoded farming error
334    #[error("Decoded farming error {0}")]
335    Decoded(DecodedFarmingError),
336}
337
338impl Encode for FarmingError {
339    #[inline]
340    fn encode_to<O>(&self, dest: &mut O)
341    where
342        O: Output + ?Sized,
343    {
344        let error = DecodedFarmingError {
345            error: self.to_string(),
346            is_fatal: self.is_fatal(),
347        };
348
349        error.encode_to(dest);
350    }
351}
352
353impl Decode for FarmingError {
354    #[inline]
355    fn decode<I>(input: &mut I) -> Result<Self, parity_scale_codec::Error>
356    where
357        I: Input,
358    {
359        DecodedFarmingError::decode(input).map(FarmingError::Decoded)
360    }
361}
362
363impl FarmingError {
364    /// String variant of the error, primarily for monitoring purposes
365    #[inline]
366    pub fn str_variant(&self) -> &str {
367        match self {
368            FarmingError::FailedToSubscribeSlotInfo { .. } => "FailedToSubscribeSlotInfo",
369            FarmingError::FailedToGetFarmerInfo { .. } => "FailedToGetFarmerInfo",
370            FarmingError::LowLevelAuditing(_) => "LowLevelAuditing",
371            FarmingError::LowLevelProving(_) => "LowLevelProving",
372            FarmingError::Io(_) => "Io",
373            FarmingError::Decoded(_) => "Decoded",
374            FarmingError::SlotNotificationStreamEnded => "SlotNotificationStreamEnded",
375        }
376    }
377
378    /// Whether this error is fatal and makes farm unusable
379    pub fn is_fatal(&self) -> bool {
380        match self {
381            FarmingError::FailedToSubscribeSlotInfo { .. } => true,
382            FarmingError::FailedToGetFarmerInfo { .. } => true,
383            FarmingError::LowLevelAuditing(_) => true,
384            FarmingError::LowLevelProving(error) => error.is_fatal(),
385            FarmingError::Io(_) => true,
386            FarmingError::Decoded(error) => error.is_fatal,
387            FarmingError::SlotNotificationStreamEnded => true,
388        }
389    }
390}
391
392/// Various farming notifications
393#[derive(Debug, Clone, Encode, Decode)]
394pub enum FarmingNotification {
395    /// Auditing
396    Auditing(AuditingDetails),
397    /// Proving
398    Proving(ProvingDetails),
399    /// Non-fatal farming error
400    NonFatalError(Arc<FarmingError>),
401}
402
403/// Details about sector currently being plotted
404#[derive(Debug, Clone, Encode, Decode)]
405pub enum SectorPlottingDetails {
406    /// Starting plotting of a sector
407    Starting {
408        /// Progress so far in % (not including this sector)
409        progress: f32,
410        /// Whether sector is being replotted
411        replotting: bool,
412        /// Whether this is the last sector queued so far
413        last_queued: bool,
414    },
415    /// Downloading sector pieces
416    Downloading,
417    /// Downloaded sector pieces
418    Downloaded(Duration),
419    /// Encoding sector pieces
420    Encoding,
421    /// Encoded sector pieces
422    Encoded(Duration),
423    /// Writing sector
424    Writing,
425    /// Written sector
426    Written(Duration),
427    /// Finished plotting
428    Finished {
429        /// Information about plotted sector
430        plotted_sector: PlottedSector,
431        /// Information about old plotted sector that was replaced
432        old_plotted_sector: Option<PlottedSector>,
433        /// How much time it took to plot a sector
434        time: Duration,
435    },
436    /// Plotting failed
437    Error(String),
438}
439
440/// Details about sector expiration
441#[derive(Debug, Clone, Encode, Decode)]
442pub enum SectorExpirationDetails {
443    /// Sector expiration became known
444    Determined {
445        /// Segment index at which sector expires
446        expires_at: SegmentIndex,
447    },
448    /// Sector will expire at the next segment index and should be replotted
449    AboutToExpire,
450    /// Sector already expired
451    Expired,
452}
453
454/// Various sector updates
455#[derive(Debug, Clone, Encode, Decode)]
456pub enum SectorUpdate {
457    /// Sector is being plotted
458    Plotting(SectorPlottingDetails),
459    /// Sector expiration information updated
460    Expiration(SectorExpirationDetails),
461}
462
463/// Abstract piece reader implementation
464#[async_trait]
465pub trait PieceReader: Send + Sync + fmt::Debug {
466    /// Read piece from sector by offset, `None` means input parameters are incorrect or piece
467    /// reader was shut down
468    async fn read_piece(
469        &self,
470        sector_index: SectorIndex,
471        piece_offset: PieceOffset,
472    ) -> Result<Option<Piece>, FarmError>;
473}
474
475/// Opaque handler ID for event handlers, once dropped handler will be removed automatically
476pub trait HandlerId: Send + Sync + fmt::Debug {
477    /// Consumes [`HandlerId`] and prevents handler from being removed automatically.
478    fn detach(&self);
479}
480
481impl HandlerId for event_listener_primitives::HandlerId {
482    #[inline]
483    fn detach(&self) {
484        self.detach();
485    }
486}
487
488/// An identifier for a farm, can be used for in logs, thread names, etc.
489#[derive(
490    Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize, Display, From,
491)]
492#[serde(untagged)]
493pub enum FarmId {
494    /// Farm ID
495    Ulid(Ulid),
496}
497
498impl Encode for FarmId {
499    #[inline]
500    fn size_hint(&self) -> usize {
501        1_usize
502            + match self {
503                FarmId::Ulid(ulid) => 0_usize.saturating_add(Encode::size_hint(&ulid.0)),
504            }
505    }
506
507    #[inline]
508    fn encode_to<O>(&self, dest: &mut O)
509    where
510        O: Output + ?Sized,
511    {
512        match self {
513            FarmId::Ulid(ulid) => {
514                dest.push_byte(0);
515                Encode::encode_to(&ulid.0, dest);
516            }
517        }
518    }
519}
520
521impl EncodeLike for FarmId {}
522
523impl Decode for FarmId {
524    #[inline]
525    fn decode<I>(input: &mut I) -> Result<Self, parity_scale_codec::Error>
526    where
527        I: Input,
528    {
529        match input
530            .read_byte()
531            .map_err(|e| e.chain("Could not decode `FarmId`, failed to read variant byte"))?
532        {
533            0 => u128::decode(input)
534                .map(|ulid| FarmId::Ulid(Ulid(ulid)))
535                .map_err(|e| e.chain("Could not decode `FarmId::Ulid.0`")),
536            _ => Err("Could not decode `FarmId`, variant doesn't exist".into()),
537        }
538    }
539}
540
541#[expect(
542    clippy::new_without_default,
543    reason = "Default has different semantics"
544)]
545impl FarmId {
546    /// Creates new ID
547    #[inline]
548    pub fn new() -> Self {
549        Self::Ulid(Ulid::new())
550    }
551}
552
553/// Abstract farm implementation
554#[async_trait(?Send)]
555pub trait Farm {
556    /// ID of this farm
557    fn id(&self) -> &FarmId;
558
559    /// Number of sectors in this farm
560    fn total_sectors_count(&self) -> u16;
561
562    /// Get plotted sectors instance
563    fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static>;
564
565    /// Get piece reader to read plotted pieces later
566    fn piece_reader(&self) -> Arc<dyn PieceReader + 'static>;
567
568    /// Subscribe to sector updates
569    fn on_sector_update(
570        &self,
571        callback: HandlerFn<(SectorIndex, SectorUpdate)>,
572    ) -> Box<dyn HandlerId>;
573
574    /// Subscribe to farming notifications
575    fn on_farming_notification(
576        &self,
577        callback: HandlerFn<FarmingNotification>,
578    ) -> Box<dyn HandlerId>;
579
580    /// Subscribe to new solution notification
581    fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn HandlerId>;
582
583    /// Run and wait for background threads to exit or return an error
584    fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>;
585}
586
587#[async_trait]
588impl<T> Farm for Box<T>
589where
590    T: Farm + ?Sized,
591{
592    #[inline]
593    fn id(&self) -> &FarmId {
594        self.as_ref().id()
595    }
596
597    #[inline]
598    fn total_sectors_count(&self) -> u16 {
599        self.as_ref().total_sectors_count()
600    }
601
602    #[inline]
603    fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static> {
604        self.as_ref().plotted_sectors()
605    }
606
607    #[inline]
608    fn piece_reader(&self) -> Arc<dyn PieceReader + 'static> {
609        self.as_ref().piece_reader()
610    }
611
612    #[inline]
613    fn on_sector_update(
614        &self,
615        callback: HandlerFn<(SectorIndex, SectorUpdate)>,
616    ) -> Box<dyn HandlerId> {
617        self.as_ref().on_sector_update(callback)
618    }
619
620    #[inline]
621    fn on_farming_notification(
622        &self,
623        callback: HandlerFn<FarmingNotification>,
624    ) -> Box<dyn HandlerId> {
625        self.as_ref().on_farming_notification(callback)
626    }
627
628    #[inline]
629    fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn HandlerId> {
630        self.as_ref().on_solution(callback)
631    }
632
633    #[inline]
634    fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
635        (*self).run()
636    }
637}