Skip to main content

ab_farmer/
farm.rs

1//! Abstract farm API
2//!
3//! This module provides a bunch of traits and simple data structures that serve as a layer of
4//! abstraction that improves composition without having assumptions about implementation details.
5//!
6//! Implementations can be local (backed by local disk) and remote (connected via network in some
7//! way). This crate provides a few of such implementations, but more can be created externally as
8//! well if needed without modifying the library itself.
9
10use ab_core_primitives::pieces::{Piece, PieceIndex, PieceOffset};
11use ab_core_primitives::sectors::SectorIndex;
12use ab_core_primitives::segments::SegmentIndex;
13use ab_farmer_components::auditing::AuditingError;
14use ab_farmer_components::plotting::PlottedSector;
15use ab_farmer_components::proving::ProvingError;
16use ab_farmer_rpc_primitives::SolutionResponse;
17use ab_networking::libp2p::kad::RecordKey;
18use async_trait::async_trait;
19use derive_more::{Display, From};
20use futures::Stream;
21use parity_scale_codec::{Decode, Encode, EncodeLike, Input, Output};
22use serde::{Deserialize, Serialize};
23use std::future::Future;
24use std::pin::Pin;
25use std::sync::Arc;
26use std::time::Duration;
27use std::{fmt, io};
28use thiserror::Error;
29use ulid::Ulid;
30
31pub mod plotted_pieces;
32
33/// Erased error type
34pub type FarmError = Box<dyn std::error::Error + Send + Sync + 'static>;
35/// Type alias used for event handlers
36pub type HandlerFn<A> = Arc<dyn Fn(&A) + Send + Sync + 'static>;
37
38/// Getter for plotted sectors
39#[async_trait]
40pub trait PlottedSectors: Send + Sync + fmt::Debug {
41    /// Get already plotted sectors
42    async fn get(
43        &self,
44    ) -> Result<
45        Box<dyn Stream<Item = Result<PlottedSector, FarmError>> + Unpin + Send + '_>,
46        FarmError,
47    >;
48}
49
50/// An identifier for a cache, can be used for in logs, thread names, etc.
51#[derive(
52    Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize, Display, From,
53)]
54#[serde(untagged)]
55pub enum PieceCacheId {
56    /// Cache ID
57    Ulid(Ulid),
58}
59
60impl Encode for PieceCacheId {
61    #[inline]
62    fn size_hint(&self) -> usize {
63        1_usize
64            + match self {
65                PieceCacheId::Ulid(ulid) => 0_usize.saturating_add(Encode::size_hint(&ulid.0)),
66            }
67    }
68
69    #[inline]
70    fn encode_to<O>(&self, dest: &mut O)
71    where
72        O: Output + ?Sized,
73    {
74        match self {
75            PieceCacheId::Ulid(ulid) => {
76                dest.push_byte(0);
77                Encode::encode_to(&ulid.0, dest);
78            }
79        }
80    }
81}
82
83impl EncodeLike for PieceCacheId {}
84
85impl Decode for PieceCacheId {
86    #[inline]
87    fn decode<I>(input: &mut I) -> Result<Self, parity_scale_codec::Error>
88    where
89        I: Input,
90    {
91        match input
92            .read_byte()
93            .map_err(|e| e.chain("Could not decode `PieceCacheId`, failed to read variant byte"))?
94        {
95            0 => u128::decode(input)
96                .map(|ulid| PieceCacheId::Ulid(Ulid(ulid)))
97                .map_err(|e| e.chain("Could not decode `PieceCacheId::Ulid.0`")),
98            _ => Err("Could not decode `PieceCacheId`, variant doesn't exist".into()),
99        }
100    }
101}
102
103#[expect(clippy::new_without_default)]
104impl PieceCacheId {
105    /// Creates new ID
106    #[inline]
107    pub fn new() -> Self {
108        Self::Ulid(Ulid::new())
109    }
110}
111
112/// Offset wrapper for pieces in [`PieceCache`]
113#[derive(Debug, Display, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Encode, Decode)]
114#[repr(transparent)]
115pub struct PieceCacheOffset(pub(crate) u32);
116
117/// Abstract piece cache implementation.
118///
119/// Piece cache is a simple container that stores concatenated pieces in a flat file at specific
120/// offsets. Implementation doesn't have to be local though, cache can be remote somewhere on the
121/// network, APIs are intentionally async to account for that.
122#[async_trait]
123pub trait PieceCache: Send + Sync + fmt::Debug {
124    /// ID of this cache
125    fn id(&self) -> &PieceCacheId;
126
127    /// Max number of elements in this cache
128    fn max_num_elements(&self) -> u32;
129
130    /// Contents of this piece cache.
131    ///
132    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
133    /// doesn't happen for the same piece being accessed!
134    async fn contents(
135        &self,
136    ) -> Result<
137        Box<
138            dyn Stream<Item = Result<(PieceCacheOffset, Option<PieceIndex>), FarmError>>
139                + Unpin
140                + Send
141                + '_,
142        >,
143        FarmError,
144    >;
145
146    /// Store piece in cache at specified offset, replacing existing piece if there is one.
147    ///
148    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
149    /// doesn't happen for the same piece being accessed!
150    async fn write_piece(
151        &self,
152        offset: PieceCacheOffset,
153        piece_index: PieceIndex,
154        piece: &Piece,
155    ) -> Result<(), FarmError>;
156
157    /// Read piece index from cache at specified offset.
158    ///
159    /// Returns `None` if offset is out of range.
160    ///
161    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
162    /// doesn't happen for the same piece being accessed!
163    async fn read_piece_index(
164        &self,
165        offset: PieceCacheOffset,
166    ) -> Result<Option<PieceIndex>, FarmError>;
167
168    /// Read piece from cache at specified offset.
169    ///
170    /// Returns `None` if offset is out of range.
171    ///
172    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
173    /// doesn't happen for the same piece being accessed!
174    async fn read_piece(
175        &self,
176        offset: PieceCacheOffset,
177    ) -> Result<Option<(PieceIndex, Piece)>, FarmError>;
178
179    /// Read pieces from cache at specified offsets.
180    ///
181    /// Number of elements in returned stream is the same as number of unique `offsets`.
182    /// Returns `None` for offsets that are out of range.
183    ///
184    /// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
185    /// doesn't happen for the same pieces being accessed!
186    async fn read_pieces(
187        &self,
188        offsets: Box<dyn Iterator<Item = PieceCacheOffset> + Send>,
189    ) -> Result<
190        Box<
191            dyn Stream<Item = Result<(PieceCacheOffset, Option<(PieceIndex, Piece)>), FarmError>>
192                + Send
193                + Unpin
194                + '_,
195        >,
196        FarmError,
197    >;
198}
199
200/// Result of piece storing check
201#[derive(Debug, Copy, Clone, Encode, Decode)]
202pub enum MaybePieceStoredResult {
203    /// Piece is not stored already, and can't be added because the cache/plot is full.
204    No,
205    /// Cache might have a vacant slot to store this piece.
206    /// Vacant slots are not guaranteed, they can be overwritten by another piece or newly plotted
207    /// sector at any time.
208    Vacant,
209    /// Piece is already stored in the cache.
210    Yes,
211}
212
213/// Abstract plot cache implementation.
214///
215/// Plot cache is a cache that exploits space towards the end of the plot that is not yet occupied
216/// by sectors in order to increase effective caching space, which helps with plotting speed for
217/// small farmers since they don't need to retrieve the same pieces from the network over and over
218/// again, which is slower and uses a lot of Internet bandwidth.
219#[async_trait]
220pub trait PlotCache: Send + Sync + fmt::Debug {
221    /// Check if a piece is already stored in this cache, or it can be added to this cache.
222    /// The piece is not guaranteed to be stored, because it might be overwritten with a new
223    /// sector any time.
224    async fn is_piece_maybe_stored(
225        &self,
226        key: &RecordKey,
227    ) -> Result<MaybePieceStoredResult, FarmError>;
228
229    /// Store piece in cache if there is free space, and return `Ok(true)`.
230    /// Returns `Ok(false)` if there is no free space, or the farm or process is shutting down.
231    async fn try_store_piece(
232        &self,
233        piece_index: PieceIndex,
234        piece: &Piece,
235    ) -> Result<bool, FarmError>;
236
237    /// Read piece from cache.
238    ///
239    /// Returns `None` if not cached.
240    async fn read_piece(&self, key: &RecordKey) -> Result<Option<Piece>, FarmError>;
241}
242
243/// Auditing details
244#[derive(Debug, Copy, Clone, Encode, Decode)]
245pub struct AuditingDetails {
246    /// Number of sectors that were audited
247    pub sectors_count: u16,
248    /// Audit duration
249    pub time: Duration,
250}
251
252/// Result of the proving
253#[derive(Debug, Copy, Clone, Encode, Decode)]
254pub enum ProvingResult {
255    /// Proved successfully and accepted by the node
256    Success,
257    /// Proving took too long
258    Timeout,
259    /// Managed to prove within time limit, but node rejected solution, likely due to timeout on its
260    /// end
261    Rejected,
262    /// Proving failed altogether
263    Failed,
264}
265
266impl fmt::Display for ProvingResult {
267    #[inline]
268    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
269        f.write_str(match self {
270            Self::Success => "Success",
271            Self::Timeout => "Timeout",
272            Self::Rejected => "Rejected",
273            Self::Failed => "Failed",
274        })
275    }
276}
277
278/// Proving details
279#[derive(Debug, Copy, Clone, Encode, Decode)]
280pub struct ProvingDetails {
281    /// Whether proving ended up being successful
282    pub result: ProvingResult,
283    /// Audit duration
284    pub time: Duration,
285}
286
287/// Special decoded farming error
288#[derive(Debug, Encode, Decode)]
289pub struct DecodedFarmingError {
290    /// String representation of an error
291    error: String,
292    /// Whether error is fatal
293    is_fatal: bool,
294}
295
296impl fmt::Display for DecodedFarmingError {
297    #[inline]
298    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
299        fmt::Display::fmt(&self.error, f)
300    }
301}
302
303/// Errors that happen during farming
304#[derive(Debug, Error)]
305pub enum FarmingError {
306    /// Failed to subscribe to slot info notifications
307    #[error("Failed to subscribe to slot info notifications: {error}")]
308    FailedToSubscribeSlotInfo {
309        /// Lower-level error
310        error: anyhow::Error,
311    },
312    /// Failed to retrieve farmer info
313    #[error("Failed to retrieve farmer info: {error}")]
314    FailedToGetFarmerInfo {
315        /// Lower-level error
316        error: anyhow::Error,
317    },
318    /// Slot info notification stream ended
319    #[error("Slot info notification stream ended")]
320    SlotNotificationStreamEnded,
321    /// Low-level auditing error
322    #[error("Low-level auditing error: {0}")]
323    LowLevelAuditing(#[from] AuditingError),
324    /// Low-level proving error
325    #[error("Low-level proving error: {0}")]
326    LowLevelProving(#[from] ProvingError),
327    /// I/O error occurred
328    #[error("Farming I/O error: {0}")]
329    Io(#[from] io::Error),
330    /// Decoded farming error
331    #[error("Decoded farming error {0}")]
332    Decoded(DecodedFarmingError),
333}
334
335impl Encode for FarmingError {
336    #[inline]
337    fn encode_to<O>(&self, dest: &mut O)
338    where
339        O: Output + ?Sized,
340    {
341        let error = DecodedFarmingError {
342            error: self.to_string(),
343            is_fatal: self.is_fatal(),
344        };
345
346        error.encode_to(dest);
347    }
348}
349
350impl Decode for FarmingError {
351    #[inline]
352    fn decode<I>(input: &mut I) -> Result<Self, parity_scale_codec::Error>
353    where
354        I: Input,
355    {
356        DecodedFarmingError::decode(input).map(FarmingError::Decoded)
357    }
358}
359
360impl FarmingError {
361    /// String variant of the error, primarily for monitoring purposes
362    #[inline]
363    pub fn str_variant(&self) -> &str {
364        match self {
365            FarmingError::FailedToSubscribeSlotInfo { .. } => "FailedToSubscribeSlotInfo",
366            FarmingError::FailedToGetFarmerInfo { .. } => "FailedToGetFarmerInfo",
367            FarmingError::LowLevelAuditing(_) => "LowLevelAuditing",
368            FarmingError::LowLevelProving(_) => "LowLevelProving",
369            FarmingError::Io(_) => "Io",
370            FarmingError::Decoded(_) => "Decoded",
371            FarmingError::SlotNotificationStreamEnded => "SlotNotificationStreamEnded",
372        }
373    }
374
375    /// Whether this error is fatal and makes farm unusable
376    pub fn is_fatal(&self) -> bool {
377        match self {
378            FarmingError::FailedToSubscribeSlotInfo { .. } => true,
379            FarmingError::FailedToGetFarmerInfo { .. } => true,
380            FarmingError::LowLevelAuditing(_) => true,
381            FarmingError::LowLevelProving(error) => error.is_fatal(),
382            FarmingError::Io(_) => true,
383            FarmingError::Decoded(error) => error.is_fatal,
384            FarmingError::SlotNotificationStreamEnded => true,
385        }
386    }
387}
388
389/// Various farming notifications
390#[derive(Debug, Clone, Encode, Decode)]
391pub enum FarmingNotification {
392    /// Auditing
393    Auditing(AuditingDetails),
394    /// Proving
395    Proving(ProvingDetails),
396    /// Non-fatal farming error
397    NonFatalError(Arc<FarmingError>),
398}
399
400/// Details about sector currently being plotted
401#[derive(Debug, Clone, Encode, Decode)]
402pub enum SectorPlottingDetails {
403    /// Starting plotting of a sector
404    Starting {
405        /// Progress so far in % (not including this sector)
406        progress: f32,
407        /// Whether sector is being replotted
408        replotting: bool,
409        /// Whether this is the last sector queued so far
410        last_queued: bool,
411    },
412    /// Downloading sector pieces
413    Downloading,
414    /// Downloaded sector pieces
415    Downloaded(Duration),
416    /// Encoding sector pieces
417    Encoding,
418    /// Encoded sector pieces
419    Encoded(Duration),
420    /// Writing sector
421    Writing,
422    /// Written sector
423    Written(Duration),
424    /// Finished plotting
425    Finished {
426        /// Information about plotted sector
427        plotted_sector: PlottedSector,
428        /// Information about old plotted sector that was replaced
429        old_plotted_sector: Option<PlottedSector>,
430        /// How much time it took to plot a sector
431        time: Duration,
432    },
433    /// Plotting failed
434    Error(String),
435}
436
437/// Details about sector expiration
438#[derive(Debug, Clone, Encode, Decode)]
439pub enum SectorExpirationDetails {
440    /// Sector expiration became known
441    Determined {
442        /// Segment index at which sector expires
443        expires_at: SegmentIndex,
444    },
445    /// Sector will expire at the next segment index and should be replotted
446    AboutToExpire,
447    /// Sector already expired
448    Expired,
449}
450
451/// Various sector updates
452#[derive(Debug, Clone, Encode, Decode)]
453pub enum SectorUpdate {
454    /// Sector is being plotted
455    Plotting(SectorPlottingDetails),
456    /// Sector expiration information updated
457    Expiration(SectorExpirationDetails),
458}
459
460/// Abstract piece reader implementation
461#[async_trait]
462pub trait PieceReader: Send + Sync + fmt::Debug {
463    /// Read piece from sector by offset, `None` means input parameters are incorrect or piece
464    /// reader was shut down
465    async fn read_piece(
466        &self,
467        sector_index: SectorIndex,
468        piece_offset: PieceOffset,
469    ) -> Result<Option<Piece>, FarmError>;
470}
471
472/// Opaque handler ID for event handlers, once dropped handler will be removed automatically
473pub trait HandlerId: Send + Sync + fmt::Debug {
474    /// Consumes [`HandlerId`] and prevents handler from being removed automatically.
475    fn detach(&self);
476}
477
478impl HandlerId for event_listener_primitives::HandlerId {
479    #[inline]
480    fn detach(&self) {
481        self.detach();
482    }
483}
484
485/// An identifier for a farm, can be used for in logs, thread names, etc.
486#[derive(
487    Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize, Display, From,
488)]
489#[serde(untagged)]
490pub enum FarmId {
491    /// Farm ID
492    Ulid(Ulid),
493}
494
495impl Encode for FarmId {
496    #[inline]
497    fn size_hint(&self) -> usize {
498        1_usize
499            + match self {
500                FarmId::Ulid(ulid) => 0_usize.saturating_add(Encode::size_hint(&ulid.0)),
501            }
502    }
503
504    #[inline]
505    fn encode_to<O>(&self, dest: &mut O)
506    where
507        O: Output + ?Sized,
508    {
509        match self {
510            FarmId::Ulid(ulid) => {
511                dest.push_byte(0);
512                Encode::encode_to(&ulid.0, dest);
513            }
514        }
515    }
516}
517
518impl EncodeLike for FarmId {}
519
520impl Decode for FarmId {
521    #[inline]
522    fn decode<I>(input: &mut I) -> Result<Self, parity_scale_codec::Error>
523    where
524        I: Input,
525    {
526        match input
527            .read_byte()
528            .map_err(|e| e.chain("Could not decode `FarmId`, failed to read variant byte"))?
529        {
530            0 => u128::decode(input)
531                .map(|ulid| FarmId::Ulid(Ulid(ulid)))
532                .map_err(|e| e.chain("Could not decode `FarmId::Ulid.0`")),
533            _ => Err("Could not decode `FarmId`, variant doesn't exist".into()),
534        }
535    }
536}
537
538#[expect(clippy::new_without_default)]
539impl FarmId {
540    /// Creates new ID
541    #[inline]
542    pub fn new() -> Self {
543        Self::Ulid(Ulid::new())
544    }
545}
546
547/// Abstract farm implementation
548#[async_trait(?Send)]
549pub trait Farm {
550    /// ID of this farm
551    fn id(&self) -> &FarmId;
552
553    /// Number of sectors in this farm
554    fn total_sectors_count(&self) -> u16;
555
556    /// Get plotted sectors instance
557    fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static>;
558
559    /// Get piece reader to read plotted pieces later
560    fn piece_reader(&self) -> Arc<dyn PieceReader + 'static>;
561
562    /// Subscribe to sector updates
563    fn on_sector_update(
564        &self,
565        callback: HandlerFn<(SectorIndex, SectorUpdate)>,
566    ) -> Box<dyn HandlerId>;
567
568    /// Subscribe to farming notifications
569    fn on_farming_notification(
570        &self,
571        callback: HandlerFn<FarmingNotification>,
572    ) -> Box<dyn HandlerId>;
573
574    /// Subscribe to new solution notification
575    fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn HandlerId>;
576
577    /// Run and wait for background threads to exit or return an error
578    fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>;
579}
580
581#[async_trait]
582impl<T> Farm for Box<T>
583where
584    T: Farm + ?Sized,
585{
586    #[inline]
587    fn id(&self) -> &FarmId {
588        self.as_ref().id()
589    }
590
591    #[inline]
592    fn total_sectors_count(&self) -> u16 {
593        self.as_ref().total_sectors_count()
594    }
595
596    #[inline]
597    fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static> {
598        self.as_ref().plotted_sectors()
599    }
600
601    #[inline]
602    fn piece_reader(&self) -> Arc<dyn PieceReader + 'static> {
603        self.as_ref().piece_reader()
604    }
605
606    #[inline]
607    fn on_sector_update(
608        &self,
609        callback: HandlerFn<(SectorIndex, SectorUpdate)>,
610    ) -> Box<dyn HandlerId> {
611        self.as_ref().on_sector_update(callback)
612    }
613
614    #[inline]
615    fn on_farming_notification(
616        &self,
617        callback: HandlerFn<FarmingNotification>,
618    ) -> Box<dyn HandlerId> {
619        self.as_ref().on_farming_notification(callback)
620    }
621
622    #[inline]
623    fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn HandlerId> {
624        self.as_ref().on_solution(callback)
625    }
626
627    #[inline]
628    fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
629        (*self).run()
630    }
631}