1use ab_core_primitives::pieces::{Piece, PieceIndex, PieceOffset};
11use ab_core_primitives::sectors::SectorIndex;
12use ab_core_primitives::segments::SegmentIndex;
13use ab_farmer_components::auditing::AuditingError;
14use ab_farmer_components::plotting::PlottedSector;
15use ab_farmer_components::proving::ProvingError;
16use ab_farmer_rpc_primitives::SolutionResponse;
17use ab_networking::libp2p::kad::RecordKey;
18use async_trait::async_trait;
19use derive_more::{Display, From};
20use futures::Stream;
21use parity_scale_codec::{Decode, Encode, EncodeLike, Input, Output};
22use serde::{Deserialize, Serialize};
23use std::future::Future;
24use std::pin::Pin;
25use std::sync::Arc;
26use std::time::Duration;
27use std::{fmt, io};
28use thiserror::Error;
29use ulid::Ulid;
30
31pub mod plotted_pieces;
32
33pub type FarmError = Box<dyn std::error::Error + Send + Sync + 'static>;
35pub type HandlerFn<A> = Arc<dyn Fn(&A) + Send + Sync + 'static>;
37
38#[async_trait]
40pub trait PlottedSectors: Send + Sync + fmt::Debug {
41 async fn get(
43 &self,
44 ) -> Result<
45 Box<dyn Stream<Item = Result<PlottedSector, FarmError>> + Unpin + Send + '_>,
46 FarmError,
47 >;
48}
49
50#[derive(
52 Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize, Display, From,
53)]
54#[serde(untagged)]
55pub enum PieceCacheId {
56 Ulid(Ulid),
58}
59
60impl Encode for PieceCacheId {
61 #[inline]
62 fn size_hint(&self) -> usize {
63 1_usize
64 + match self {
65 PieceCacheId::Ulid(ulid) => 0_usize.saturating_add(Encode::size_hint(&ulid.0)),
66 }
67 }
68
69 #[inline]
70 fn encode_to<O>(&self, dest: &mut O)
71 where
72 O: Output + ?Sized,
73 {
74 match self {
75 PieceCacheId::Ulid(ulid) => {
76 dest.push_byte(0);
77 Encode::encode_to(&ulid.0, dest);
78 }
79 }
80 }
81}
82
83impl EncodeLike for PieceCacheId {}
84
85impl Decode for PieceCacheId {
86 #[inline]
87 fn decode<I>(input: &mut I) -> Result<Self, parity_scale_codec::Error>
88 where
89 I: Input,
90 {
91 match input
92 .read_byte()
93 .map_err(|e| e.chain("Could not decode `PieceCacheId`, failed to read variant byte"))?
94 {
95 0 => u128::decode(input)
96 .map(|ulid| PieceCacheId::Ulid(Ulid(ulid)))
97 .map_err(|e| e.chain("Could not decode `PieceCacheId::Ulid.0`")),
98 _ => Err("Could not decode `PieceCacheId`, variant doesn't exist".into()),
99 }
100 }
101}
102
103#[expect(
104 clippy::new_without_default,
105 reason = "Default has different semantics"
106)]
107impl PieceCacheId {
108 #[inline]
110 pub fn new() -> Self {
111 Self::Ulid(Ulid::new())
112 }
113}
114
115#[derive(Debug, Display, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Encode, Decode)]
117#[repr(transparent)]
118pub struct PieceCacheOffset(pub(crate) u32);
119
120#[async_trait]
126pub trait PieceCache: Send + Sync + fmt::Debug {
127 fn id(&self) -> &PieceCacheId;
129
130 fn max_num_elements(&self) -> u32;
132
133 async fn contents(
138 &self,
139 ) -> Result<
140 Box<
141 dyn Stream<Item = Result<(PieceCacheOffset, Option<PieceIndex>), FarmError>>
142 + Unpin
143 + Send
144 + '_,
145 >,
146 FarmError,
147 >;
148
149 async fn write_piece(
154 &self,
155 offset: PieceCacheOffset,
156 piece_index: PieceIndex,
157 piece: &Piece,
158 ) -> Result<(), FarmError>;
159
160 async fn read_piece_index(
167 &self,
168 offset: PieceCacheOffset,
169 ) -> Result<Option<PieceIndex>, FarmError>;
170
171 async fn read_piece(
178 &self,
179 offset: PieceCacheOffset,
180 ) -> Result<Option<(PieceIndex, Piece)>, FarmError>;
181
182 async fn read_pieces(
190 &self,
191 offsets: Box<dyn Iterator<Item = PieceCacheOffset> + Send>,
192 ) -> Result<
193 Box<
194 dyn Stream<Item = Result<(PieceCacheOffset, Option<(PieceIndex, Piece)>), FarmError>>
195 + Send
196 + Unpin
197 + '_,
198 >,
199 FarmError,
200 >;
201}
202
203#[derive(Debug, Copy, Clone, Encode, Decode)]
205pub enum MaybePieceStoredResult {
206 No,
208 Vacant,
212 Yes,
214}
215
216#[async_trait]
223pub trait PlotCache: Send + Sync + fmt::Debug {
224 async fn is_piece_maybe_stored(
228 &self,
229 key: &RecordKey,
230 ) -> Result<MaybePieceStoredResult, FarmError>;
231
232 async fn try_store_piece(
235 &self,
236 piece_index: PieceIndex,
237 piece: &Piece,
238 ) -> Result<bool, FarmError>;
239
240 async fn read_piece(&self, key: &RecordKey) -> Result<Option<Piece>, FarmError>;
244}
245
246#[derive(Debug, Copy, Clone, Encode, Decode)]
248pub struct AuditingDetails {
249 pub sectors_count: u16,
251 pub time: Duration,
253}
254
255#[derive(Debug, Copy, Clone, Encode, Decode)]
257pub enum ProvingResult {
258 Success,
260 Timeout,
262 Rejected,
265 Failed,
267}
268
269impl fmt::Display for ProvingResult {
270 #[inline]
271 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
272 f.write_str(match self {
273 Self::Success => "Success",
274 Self::Timeout => "Timeout",
275 Self::Rejected => "Rejected",
276 Self::Failed => "Failed",
277 })
278 }
279}
280
281#[derive(Debug, Copy, Clone, Encode, Decode)]
283pub struct ProvingDetails {
284 pub result: ProvingResult,
286 pub time: Duration,
288}
289
290#[derive(Debug, Encode, Decode)]
292pub struct DecodedFarmingError {
293 error: String,
295 is_fatal: bool,
297}
298
299impl fmt::Display for DecodedFarmingError {
300 #[inline]
301 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
302 fmt::Display::fmt(&self.error, f)
303 }
304}
305
306#[derive(Debug, Error)]
308pub enum FarmingError {
309 #[error("Failed to subscribe to slot info notifications: {error}")]
311 FailedToSubscribeSlotInfo {
312 error: anyhow::Error,
314 },
315 #[error("Failed to retrieve farmer info: {error}")]
317 FailedToGetFarmerInfo {
318 error: anyhow::Error,
320 },
321 #[error("Slot info notification stream ended")]
323 SlotNotificationStreamEnded,
324 #[error("Low-level auditing error: {0}")]
326 LowLevelAuditing(#[from] AuditingError),
327 #[error("Low-level proving error: {0}")]
329 LowLevelProving(#[from] ProvingError),
330 #[error("Farming I/O error: {0}")]
332 Io(#[from] io::Error),
333 #[error("Decoded farming error {0}")]
335 Decoded(DecodedFarmingError),
336}
337
338impl Encode for FarmingError {
339 #[inline]
340 fn encode_to<O>(&self, dest: &mut O)
341 where
342 O: Output + ?Sized,
343 {
344 let error = DecodedFarmingError {
345 error: self.to_string(),
346 is_fatal: self.is_fatal(),
347 };
348
349 error.encode_to(dest);
350 }
351}
352
353impl Decode for FarmingError {
354 #[inline]
355 fn decode<I>(input: &mut I) -> Result<Self, parity_scale_codec::Error>
356 where
357 I: Input,
358 {
359 DecodedFarmingError::decode(input).map(FarmingError::Decoded)
360 }
361}
362
363impl FarmingError {
364 #[inline]
366 pub fn str_variant(&self) -> &str {
367 match self {
368 FarmingError::FailedToSubscribeSlotInfo { .. } => "FailedToSubscribeSlotInfo",
369 FarmingError::FailedToGetFarmerInfo { .. } => "FailedToGetFarmerInfo",
370 FarmingError::LowLevelAuditing(_) => "LowLevelAuditing",
371 FarmingError::LowLevelProving(_) => "LowLevelProving",
372 FarmingError::Io(_) => "Io",
373 FarmingError::Decoded(_) => "Decoded",
374 FarmingError::SlotNotificationStreamEnded => "SlotNotificationStreamEnded",
375 }
376 }
377
378 pub fn is_fatal(&self) -> bool {
380 match self {
381 FarmingError::FailedToSubscribeSlotInfo { .. } => true,
382 FarmingError::FailedToGetFarmerInfo { .. } => true,
383 FarmingError::LowLevelAuditing(_) => true,
384 FarmingError::LowLevelProving(error) => error.is_fatal(),
385 FarmingError::Io(_) => true,
386 FarmingError::Decoded(error) => error.is_fatal,
387 FarmingError::SlotNotificationStreamEnded => true,
388 }
389 }
390}
391
392#[derive(Debug, Clone, Encode, Decode)]
394pub enum FarmingNotification {
395 Auditing(AuditingDetails),
397 Proving(ProvingDetails),
399 NonFatalError(Arc<FarmingError>),
401}
402
403#[derive(Debug, Clone, Encode, Decode)]
405pub enum SectorPlottingDetails {
406 Starting {
408 progress: f32,
410 replotting: bool,
412 last_queued: bool,
414 },
415 Downloading,
417 Downloaded(Duration),
419 Encoding,
421 Encoded(Duration),
423 Writing,
425 Written(Duration),
427 Finished {
429 plotted_sector: PlottedSector,
431 old_plotted_sector: Option<PlottedSector>,
433 time: Duration,
435 },
436 Error(String),
438}
439
440#[derive(Debug, Clone, Encode, Decode)]
442pub enum SectorExpirationDetails {
443 Determined {
445 expires_at: SegmentIndex,
447 },
448 AboutToExpire,
450 Expired,
452}
453
454#[derive(Debug, Clone, Encode, Decode)]
456pub enum SectorUpdate {
457 Plotting(SectorPlottingDetails),
459 Expiration(SectorExpirationDetails),
461}
462
463#[async_trait]
465pub trait PieceReader: Send + Sync + fmt::Debug {
466 async fn read_piece(
469 &self,
470 sector_index: SectorIndex,
471 piece_offset: PieceOffset,
472 ) -> Result<Option<Piece>, FarmError>;
473}
474
475pub trait HandlerId: Send + Sync + fmt::Debug {
477 fn detach(&self);
479}
480
481impl HandlerId for event_listener_primitives::HandlerId {
482 #[inline]
483 fn detach(&self) {
484 self.detach();
485 }
486}
487
488#[derive(
490 Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize, Display, From,
491)]
492#[serde(untagged)]
493pub enum FarmId {
494 Ulid(Ulid),
496}
497
498impl Encode for FarmId {
499 #[inline]
500 fn size_hint(&self) -> usize {
501 1_usize
502 + match self {
503 FarmId::Ulid(ulid) => 0_usize.saturating_add(Encode::size_hint(&ulid.0)),
504 }
505 }
506
507 #[inline]
508 fn encode_to<O>(&self, dest: &mut O)
509 where
510 O: Output + ?Sized,
511 {
512 match self {
513 FarmId::Ulid(ulid) => {
514 dest.push_byte(0);
515 Encode::encode_to(&ulid.0, dest);
516 }
517 }
518 }
519}
520
521impl EncodeLike for FarmId {}
522
523impl Decode for FarmId {
524 #[inline]
525 fn decode<I>(input: &mut I) -> Result<Self, parity_scale_codec::Error>
526 where
527 I: Input,
528 {
529 match input
530 .read_byte()
531 .map_err(|e| e.chain("Could not decode `FarmId`, failed to read variant byte"))?
532 {
533 0 => u128::decode(input)
534 .map(|ulid| FarmId::Ulid(Ulid(ulid)))
535 .map_err(|e| e.chain("Could not decode `FarmId::Ulid.0`")),
536 _ => Err("Could not decode `FarmId`, variant doesn't exist".into()),
537 }
538 }
539}
540
541#[expect(
542 clippy::new_without_default,
543 reason = "Default has different semantics"
544)]
545impl FarmId {
546 #[inline]
548 pub fn new() -> Self {
549 Self::Ulid(Ulid::new())
550 }
551}
552
553#[async_trait(?Send)]
555pub trait Farm {
556 fn id(&self) -> &FarmId;
558
559 fn total_sectors_count(&self) -> u16;
561
562 fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static>;
564
565 fn piece_reader(&self) -> Arc<dyn PieceReader + 'static>;
567
568 fn on_sector_update(
570 &self,
571 callback: HandlerFn<(SectorIndex, SectorUpdate)>,
572 ) -> Box<dyn HandlerId>;
573
574 fn on_farming_notification(
576 &self,
577 callback: HandlerFn<FarmingNotification>,
578 ) -> Box<dyn HandlerId>;
579
580 fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn HandlerId>;
582
583 fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>;
585}
586
587#[async_trait]
588impl<T> Farm for Box<T>
589where
590 T: Farm + ?Sized,
591{
592 #[inline]
593 fn id(&self) -> &FarmId {
594 self.as_ref().id()
595 }
596
597 #[inline]
598 fn total_sectors_count(&self) -> u16 {
599 self.as_ref().total_sectors_count()
600 }
601
602 #[inline]
603 fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static> {
604 self.as_ref().plotted_sectors()
605 }
606
607 #[inline]
608 fn piece_reader(&self) -> Arc<dyn PieceReader + 'static> {
609 self.as_ref().piece_reader()
610 }
611
612 #[inline]
613 fn on_sector_update(
614 &self,
615 callback: HandlerFn<(SectorIndex, SectorUpdate)>,
616 ) -> Box<dyn HandlerId> {
617 self.as_ref().on_sector_update(callback)
618 }
619
620 #[inline]
621 fn on_farming_notification(
622 &self,
623 callback: HandlerFn<FarmingNotification>,
624 ) -> Box<dyn HandlerId> {
625 self.as_ref().on_farming_notification(callback)
626 }
627
628 #[inline]
629 fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn HandlerId> {
630 self.as_ref().on_solution(callback)
631 }
632
633 #[inline]
634 fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
635 (*self).run()
636 }
637}