1use ab_core_primitives::pieces::{Piece, PieceIndex, PieceOffset};
11use ab_core_primitives::sectors::SectorIndex;
12use ab_core_primitives::segments::SegmentIndex;
13use ab_farmer_components::auditing::AuditingError;
14use ab_farmer_components::plotting::PlottedSector;
15use ab_farmer_components::proving::ProvingError;
16use ab_farmer_rpc_primitives::SolutionResponse;
17use ab_networking::libp2p::kad::RecordKey;
18use async_trait::async_trait;
19use derive_more::{Display, From};
20use futures::Stream;
21use parity_scale_codec::{Decode, Encode, EncodeLike, Input, Output};
22use serde::{Deserialize, Serialize};
23use std::future::Future;
24use std::pin::Pin;
25use std::sync::Arc;
26use std::time::Duration;
27use std::{fmt, io};
28use thiserror::Error;
29use ulid::Ulid;
30
31pub mod plotted_pieces;
32
33pub type FarmError = Box<dyn std::error::Error + Send + Sync + 'static>;
35pub type HandlerFn<A> = Arc<dyn Fn(&A) + Send + Sync + 'static>;
37
38#[async_trait]
40pub trait PlottedSectors: Send + Sync + fmt::Debug {
41 async fn get(
43 &self,
44 ) -> Result<
45 Box<dyn Stream<Item = Result<PlottedSector, FarmError>> + Unpin + Send + '_>,
46 FarmError,
47 >;
48}
49
50#[derive(
52 Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize, Display, From,
53)]
54#[serde(untagged)]
55pub enum PieceCacheId {
56 Ulid(Ulid),
58}
59
60impl Encode for PieceCacheId {
61 #[inline]
62 fn size_hint(&self) -> usize {
63 1_usize
64 + match self {
65 PieceCacheId::Ulid(ulid) => 0_usize.saturating_add(Encode::size_hint(&ulid.0)),
66 }
67 }
68
69 #[inline]
70 fn encode_to<O>(&self, dest: &mut O)
71 where
72 O: Output + ?Sized,
73 {
74 match self {
75 PieceCacheId::Ulid(ulid) => {
76 dest.push_byte(0);
77 Encode::encode_to(&ulid.0, dest);
78 }
79 }
80 }
81}
82
83impl EncodeLike for PieceCacheId {}
84
85impl Decode for PieceCacheId {
86 #[inline]
87 fn decode<I>(input: &mut I) -> Result<Self, parity_scale_codec::Error>
88 where
89 I: Input,
90 {
91 match input
92 .read_byte()
93 .map_err(|e| e.chain("Could not decode `PieceCacheId`, failed to read variant byte"))?
94 {
95 0 => u128::decode(input)
96 .map(|ulid| PieceCacheId::Ulid(Ulid(ulid)))
97 .map_err(|e| e.chain("Could not decode `PieceCacheId::Ulid.0`")),
98 _ => Err("Could not decode `PieceCacheId`, variant doesn't exist".into()),
99 }
100 }
101}
102
103#[expect(clippy::new_without_default)]
104impl PieceCacheId {
105 #[inline]
107 pub fn new() -> Self {
108 Self::Ulid(Ulid::new())
109 }
110}
111
112#[derive(Debug, Display, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Encode, Decode)]
114#[repr(transparent)]
115pub struct PieceCacheOffset(pub(crate) u32);
116
117#[async_trait]
123pub trait PieceCache: Send + Sync + fmt::Debug {
124 fn id(&self) -> &PieceCacheId;
126
127 fn max_num_elements(&self) -> u32;
129
130 async fn contents(
135 &self,
136 ) -> Result<
137 Box<
138 dyn Stream<Item = Result<(PieceCacheOffset, Option<PieceIndex>), FarmError>>
139 + Unpin
140 + Send
141 + '_,
142 >,
143 FarmError,
144 >;
145
146 async fn write_piece(
151 &self,
152 offset: PieceCacheOffset,
153 piece_index: PieceIndex,
154 piece: &Piece,
155 ) -> Result<(), FarmError>;
156
157 async fn read_piece_index(
164 &self,
165 offset: PieceCacheOffset,
166 ) -> Result<Option<PieceIndex>, FarmError>;
167
168 async fn read_piece(
175 &self,
176 offset: PieceCacheOffset,
177 ) -> Result<Option<(PieceIndex, Piece)>, FarmError>;
178
179 async fn read_pieces(
187 &self,
188 offsets: Box<dyn Iterator<Item = PieceCacheOffset> + Send>,
189 ) -> Result<
190 Box<
191 dyn Stream<Item = Result<(PieceCacheOffset, Option<(PieceIndex, Piece)>), FarmError>>
192 + Send
193 + Unpin
194 + '_,
195 >,
196 FarmError,
197 >;
198}
199
200#[derive(Debug, Copy, Clone, Encode, Decode)]
202pub enum MaybePieceStoredResult {
203 No,
205 Vacant,
209 Yes,
211}
212
213#[async_trait]
220pub trait PlotCache: Send + Sync + fmt::Debug {
221 async fn is_piece_maybe_stored(
225 &self,
226 key: &RecordKey,
227 ) -> Result<MaybePieceStoredResult, FarmError>;
228
229 async fn try_store_piece(
232 &self,
233 piece_index: PieceIndex,
234 piece: &Piece,
235 ) -> Result<bool, FarmError>;
236
237 async fn read_piece(&self, key: &RecordKey) -> Result<Option<Piece>, FarmError>;
241}
242
243#[derive(Debug, Copy, Clone, Encode, Decode)]
245pub struct AuditingDetails {
246 pub sectors_count: u16,
248 pub time: Duration,
250}
251
252#[derive(Debug, Copy, Clone, Encode, Decode)]
254pub enum ProvingResult {
255 Success,
257 Timeout,
259 Rejected,
262 Failed,
264}
265
266impl fmt::Display for ProvingResult {
267 #[inline]
268 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
269 f.write_str(match self {
270 Self::Success => "Success",
271 Self::Timeout => "Timeout",
272 Self::Rejected => "Rejected",
273 Self::Failed => "Failed",
274 })
275 }
276}
277
278#[derive(Debug, Copy, Clone, Encode, Decode)]
280pub struct ProvingDetails {
281 pub result: ProvingResult,
283 pub time: Duration,
285}
286
287#[derive(Debug, Encode, Decode)]
289pub struct DecodedFarmingError {
290 error: String,
292 is_fatal: bool,
294}
295
296impl fmt::Display for DecodedFarmingError {
297 #[inline]
298 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
299 fmt::Display::fmt(&self.error, f)
300 }
301}
302
303#[derive(Debug, Error)]
305pub enum FarmingError {
306 #[error("Failed to subscribe to slot info notifications: {error}")]
308 FailedToSubscribeSlotInfo {
309 error: anyhow::Error,
311 },
312 #[error("Failed to retrieve farmer info: {error}")]
314 FailedToGetFarmerInfo {
315 error: anyhow::Error,
317 },
318 #[error("Slot info notification stream ended")]
320 SlotNotificationStreamEnded,
321 #[error("Low-level auditing error: {0}")]
323 LowLevelAuditing(#[from] AuditingError),
324 #[error("Low-level proving error: {0}")]
326 LowLevelProving(#[from] ProvingError),
327 #[error("Farming I/O error: {0}")]
329 Io(#[from] io::Error),
330 #[error("Decoded farming error {0}")]
332 Decoded(DecodedFarmingError),
333}
334
335impl Encode for FarmingError {
336 #[inline]
337 fn encode_to<O>(&self, dest: &mut O)
338 where
339 O: Output + ?Sized,
340 {
341 let error = DecodedFarmingError {
342 error: self.to_string(),
343 is_fatal: self.is_fatal(),
344 };
345
346 error.encode_to(dest);
347 }
348}
349
350impl Decode for FarmingError {
351 #[inline]
352 fn decode<I>(input: &mut I) -> Result<Self, parity_scale_codec::Error>
353 where
354 I: Input,
355 {
356 DecodedFarmingError::decode(input).map(FarmingError::Decoded)
357 }
358}
359
360impl FarmingError {
361 #[inline]
363 pub fn str_variant(&self) -> &str {
364 match self {
365 FarmingError::FailedToSubscribeSlotInfo { .. } => "FailedToSubscribeSlotInfo",
366 FarmingError::FailedToGetFarmerInfo { .. } => "FailedToGetFarmerInfo",
367 FarmingError::LowLevelAuditing(_) => "LowLevelAuditing",
368 FarmingError::LowLevelProving(_) => "LowLevelProving",
369 FarmingError::Io(_) => "Io",
370 FarmingError::Decoded(_) => "Decoded",
371 FarmingError::SlotNotificationStreamEnded => "SlotNotificationStreamEnded",
372 }
373 }
374
375 pub fn is_fatal(&self) -> bool {
377 match self {
378 FarmingError::FailedToSubscribeSlotInfo { .. } => true,
379 FarmingError::FailedToGetFarmerInfo { .. } => true,
380 FarmingError::LowLevelAuditing(_) => true,
381 FarmingError::LowLevelProving(error) => error.is_fatal(),
382 FarmingError::Io(_) => true,
383 FarmingError::Decoded(error) => error.is_fatal,
384 FarmingError::SlotNotificationStreamEnded => true,
385 }
386 }
387}
388
389#[derive(Debug, Clone, Encode, Decode)]
391pub enum FarmingNotification {
392 Auditing(AuditingDetails),
394 Proving(ProvingDetails),
396 NonFatalError(Arc<FarmingError>),
398}
399
400#[derive(Debug, Clone, Encode, Decode)]
402pub enum SectorPlottingDetails {
403 Starting {
405 progress: f32,
407 replotting: bool,
409 last_queued: bool,
411 },
412 Downloading,
414 Downloaded(Duration),
416 Encoding,
418 Encoded(Duration),
420 Writing,
422 Written(Duration),
424 Finished {
426 plotted_sector: PlottedSector,
428 old_plotted_sector: Option<PlottedSector>,
430 time: Duration,
432 },
433 Error(String),
435}
436
437#[derive(Debug, Clone, Encode, Decode)]
439pub enum SectorExpirationDetails {
440 Determined {
442 expires_at: SegmentIndex,
444 },
445 AboutToExpire,
447 Expired,
449}
450
451#[derive(Debug, Clone, Encode, Decode)]
453pub enum SectorUpdate {
454 Plotting(SectorPlottingDetails),
456 Expiration(SectorExpirationDetails),
458}
459
460#[async_trait]
462pub trait PieceReader: Send + Sync + fmt::Debug {
463 async fn read_piece(
466 &self,
467 sector_index: SectorIndex,
468 piece_offset: PieceOffset,
469 ) -> Result<Option<Piece>, FarmError>;
470}
471
472pub trait HandlerId: Send + Sync + fmt::Debug {
474 fn detach(&self);
476}
477
478impl HandlerId for event_listener_primitives::HandlerId {
479 #[inline]
480 fn detach(&self) {
481 self.detach();
482 }
483}
484
485#[derive(
487 Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize, Display, From,
488)]
489#[serde(untagged)]
490pub enum FarmId {
491 Ulid(Ulid),
493}
494
495impl Encode for FarmId {
496 #[inline]
497 fn size_hint(&self) -> usize {
498 1_usize
499 + match self {
500 FarmId::Ulid(ulid) => 0_usize.saturating_add(Encode::size_hint(&ulid.0)),
501 }
502 }
503
504 #[inline]
505 fn encode_to<O>(&self, dest: &mut O)
506 where
507 O: Output + ?Sized,
508 {
509 match self {
510 FarmId::Ulid(ulid) => {
511 dest.push_byte(0);
512 Encode::encode_to(&ulid.0, dest);
513 }
514 }
515 }
516}
517
518impl EncodeLike for FarmId {}
519
520impl Decode for FarmId {
521 #[inline]
522 fn decode<I>(input: &mut I) -> Result<Self, parity_scale_codec::Error>
523 where
524 I: Input,
525 {
526 match input
527 .read_byte()
528 .map_err(|e| e.chain("Could not decode `FarmId`, failed to read variant byte"))?
529 {
530 0 => u128::decode(input)
531 .map(|ulid| FarmId::Ulid(Ulid(ulid)))
532 .map_err(|e| e.chain("Could not decode `FarmId::Ulid.0`")),
533 _ => Err("Could not decode `FarmId`, variant doesn't exist".into()),
534 }
535 }
536}
537
538#[expect(clippy::new_without_default)]
539impl FarmId {
540 #[inline]
542 pub fn new() -> Self {
543 Self::Ulid(Ulid::new())
544 }
545}
546
547#[async_trait(?Send)]
549pub trait Farm {
550 fn id(&self) -> &FarmId;
552
553 fn total_sectors_count(&self) -> u16;
555
556 fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static>;
558
559 fn piece_reader(&self) -> Arc<dyn PieceReader + 'static>;
561
562 fn on_sector_update(
564 &self,
565 callback: HandlerFn<(SectorIndex, SectorUpdate)>,
566 ) -> Box<dyn HandlerId>;
567
568 fn on_farming_notification(
570 &self,
571 callback: HandlerFn<FarmingNotification>,
572 ) -> Box<dyn HandlerId>;
573
574 fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn HandlerId>;
576
577 fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>;
579}
580
581#[async_trait]
582impl<T> Farm for Box<T>
583where
584 T: Farm + ?Sized,
585{
586 #[inline]
587 fn id(&self) -> &FarmId {
588 self.as_ref().id()
589 }
590
591 #[inline]
592 fn total_sectors_count(&self) -> u16 {
593 self.as_ref().total_sectors_count()
594 }
595
596 #[inline]
597 fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static> {
598 self.as_ref().plotted_sectors()
599 }
600
601 #[inline]
602 fn piece_reader(&self) -> Arc<dyn PieceReader + 'static> {
603 self.as_ref().piece_reader()
604 }
605
606 #[inline]
607 fn on_sector_update(
608 &self,
609 callback: HandlerFn<(SectorIndex, SectorUpdate)>,
610 ) -> Box<dyn HandlerId> {
611 self.as_ref().on_sector_update(callback)
612 }
613
614 #[inline]
615 fn on_farming_notification(
616 &self,
617 callback: HandlerFn<FarmingNotification>,
618 ) -> Box<dyn HandlerId> {
619 self.as_ref().on_farming_notification(callback)
620 }
621
622 #[inline]
623 fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn HandlerId> {
624 self.as_ref().on_solution(callback)
625 }
626
627 #[inline]
628 fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
629 (*self).run()
630 }
631}