Skip to main content

ab_farmer/single_disk_farm/
farming.rs

1//! Farming-related utilities
2//!
3//! These utilities do not expose the whole farming workflow, but rather small bits of it that can
4//! be useful externally (for example for benchmarking purposes in CLI).
5
6pub mod rayon_files;
7
8use crate::farm::{
9    AuditingDetails, FarmingError, FarmingNotification, ProvingDetails, ProvingResult,
10};
11use crate::node_client::NodeClient;
12use crate::single_disk_farm::Handlers;
13use crate::single_disk_farm::metrics::SingleDiskFarmMetrics;
14use ab_core_primitives::address::Address;
15use ab_core_primitives::hashes::Blake3Hash;
16use ab_core_primitives::pieces::Record;
17use ab_core_primitives::pos::PosSeed;
18use ab_core_primitives::sectors::SectorIndex;
19use ab_core_primitives::segments::{HistorySize, SegmentIndex};
20use ab_core_primitives::solutions::{Solution, SolutionDistance};
21use ab_erasure_coding::ErasureCoding;
22use ab_farmer_components::ReadAtSync;
23use ab_farmer_components::auditing::{AuditingError, audit_plot_sync};
24use ab_farmer_components::proving::{ProvableSolutions, ProvingError};
25use ab_farmer_components::sector::{SectorMetadata, SectorMetadataChecksummed};
26use ab_farmer_components::shard_commitment::ShardCommitmentsRootsCache;
27use ab_farmer_rpc_primitives::{SlotInfo, SolutionResponse};
28use ab_proof_of_space::{Table, TableGenerator};
29use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
30use futures::StreamExt;
31use futures::channel::mpsc;
32use rayon::ThreadPool;
33use std::collections::HashSet;
34use std::sync::Arc;
35use std::time::Instant;
36use tracing::{Span, debug, error, info, trace, warn};
37
38/// How many non-fatal errors should happen in a row before farm is considered non-operational
39const NON_FATAL_ERROR_LIMIT: usize = 10;
40
41pub(super) async fn slot_notification_forwarder<NC>(
42    node_client: &NC,
43    mut slot_info_forwarder_sender: mpsc::Sender<SlotInfo>,
44    metrics: Option<Arc<SingleDiskFarmMetrics>>,
45) -> Result<(), FarmingError>
46where
47    NC: NodeClient,
48{
49    info!("Subscribing to slot info notifications");
50
51    let mut slot_info_notifications = node_client
52        .subscribe_slot_info()
53        .await
54        .map_err(|error| FarmingError::FailedToSubscribeSlotInfo { error })?;
55
56    while let Some(slot_info) = slot_info_notifications.next().await {
57        debug!(?slot_info, "New slot");
58
59        let slot = slot_info.slot;
60
61        // Error means farmer is still solving for previous slot, which is too late, and we need to
62        // skip this slot
63        if slot_info_forwarder_sender.try_send(slot_info).is_err() {
64            if let Some(metrics) = &metrics {
65                metrics.skipped_slots.inc();
66            }
67            debug!(%slot, "Slow farming, skipping slot");
68        }
69    }
70
71    Err(FarmingError::SlotNotificationStreamEnded)
72}
73
74/// Plot audit options
75#[derive(Debug)]
76pub struct PlotAuditOptions<'a, 'b, PosTable>
77where
78    PosTable: Table,
79{
80    /// Public key of the farm
81    pub public_key_hash: &'a Blake3Hash,
82    /// Cache for shard commitments roots
83    pub shard_commitments_roots_cache: &'a ShardCommitmentsRootsCache,
84    /// Slot info for the audit
85    pub slot_info: SlotInfo,
86    /// Metadata of all sectors plotted so far
87    pub sectors_metadata: &'a [SectorMetadataChecksummed],
88    /// Erasure coding instance
89    pub erasure_coding: &'a ErasureCoding,
90    /// Optional sector that is currently being modified (for example replotted) and should not be
91    /// audited
92    pub sectors_being_modified: &'b HashSet<SectorIndex>,
93    /// Proof of space table generator
94    pub table_generator: &'a PosTable::Generator,
95}
96
97impl<PosTable> Clone for PlotAuditOptions<'_, '_, PosTable>
98where
99    PosTable: Table,
100{
101    #[inline]
102    fn clone(&self) -> Self {
103        *self
104    }
105}
106
107impl<PosTable> Copy for PlotAuditOptions<'_, '_, PosTable> where PosTable: Table {}
108
109/// Plot auditing implementation
110#[derive(Debug)]
111pub struct PlotAudit<Plot>(Plot)
112where
113    Plot: ReadAtSync;
114
115impl<'a, Plot> PlotAudit<Plot>
116where
117    Plot: ReadAtSync + 'a,
118{
119    /// Create a new instance
120    pub fn new(plot: Plot) -> Self {
121        Self(plot)
122    }
123
124    /// Audit this plot
125    #[expect(
126        clippy::type_complexity,
127        reason = "Probably not worth creating type aliases"
128    )]
129    pub fn audit<'b, PosTable>(
130        &'a self,
131        options: PlotAuditOptions<'a, 'b, PosTable>,
132    ) -> Result<
133        Vec<(
134            SectorIndex,
135            impl ProvableSolutions<Item = Result<Solution, ProvingError>> + use<'a, Plot, PosTable> + 'a,
136        )>,
137        AuditingError,
138    >
139    where
140        PosTable: Table,
141    {
142        let PlotAuditOptions {
143            public_key_hash,
144            shard_commitments_roots_cache,
145            slot_info,
146            sectors_metadata,
147            erasure_coding,
148            sectors_being_modified,
149            table_generator,
150        } = options;
151
152        let audit_results = audit_plot_sync(
153            public_key_hash,
154            shard_commitments_roots_cache,
155            slot_info.shard_membership_entropy,
156            slot_info.num_shards,
157            &slot_info.global_challenge,
158            slot_info.solution_range,
159            &self.0,
160            sectors_metadata,
161            sectors_being_modified,
162        )?;
163
164        Ok(audit_results
165            .into_iter()
166            .filter_map(|audit_results| {
167                let sector_index = audit_results.sector_index;
168
169                let sector_solutions = audit_results
170                    .solution_candidates
171                    .into_solutions(erasure_coding, |seed: &PosSeed| {
172                        table_generator.create_proofs_parallel(seed)
173                    });
174
175                let sector_solutions = match sector_solutions {
176                    Ok(solutions) => solutions,
177                    Err(error) => {
178                        warn!(
179                            %error,
180                            %sector_index,
181                            "Failed to turn solution candidates into solutions",
182                        );
183
184                        return None;
185                    }
186                };
187
188                if sector_solutions.len() == 0 {
189                    return None;
190                }
191
192                Some((sector_index, sector_solutions))
193            })
194            .collect())
195    }
196}
197
198pub(super) struct FarmingOptions<NC, PlotAudit> {
199    pub(super) public_key_hash: Blake3Hash,
200    pub(super) shard_commitments_roots_cache: ShardCommitmentsRootsCache,
201    // TODO: Use `reward_address` in the future
202    #[expect(
203        dead_code,
204        reason = "Reward address was removed from `Solution` and will need to be re-introduced later"
205    )]
206    pub(super) reward_address: Address,
207    pub(super) node_client: NC,
208    pub(super) plot_audit: PlotAudit,
209    pub(super) sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
210    pub(super) erasure_coding: ErasureCoding,
211    pub(super) handlers: Arc<Handlers>,
212    pub(super) sectors_being_modified: Arc<AsyncRwLock<HashSet<SectorIndex>>>,
213    pub(super) slot_info_notifications: mpsc::Receiver<SlotInfo>,
214    pub(super) thread_pool: ThreadPool,
215    pub(super) global_mutex: Arc<AsyncMutex<()>>,
216    pub(super) metrics: Option<Arc<SingleDiskFarmMetrics>>,
217}
218
219/// Starts farming process.
220///
221/// NOTE: Returned future is async, but does blocking operations and should be running in dedicated
222/// thread.
223pub(super) async fn farming<'a, PosTable, NC, Plot>(
224    farming_options: FarmingOptions<NC, PlotAudit<Plot>>,
225) -> Result<(), FarmingError>
226where
227    PosTable: Table,
228    NC: NodeClient,
229    Plot: ReadAtSync + 'a,
230{
231    let FarmingOptions {
232        public_key_hash,
233        shard_commitments_roots_cache,
234        // TODO: Use `reward_address` in the future
235        reward_address: _,
236        node_client,
237        plot_audit,
238        sectors_metadata,
239        erasure_coding,
240        handlers,
241        sectors_being_modified,
242        mut slot_info_notifications,
243        thread_pool,
244        global_mutex,
245        metrics,
246    } = farming_options;
247
248    let farmer_app_info = node_client
249        .farmer_app_info()
250        .await
251        .map_err(|error| FarmingError::FailedToGetFarmerInfo { error })?;
252
253    // We assume that each slot is one second
254    let farming_timeout = farmer_app_info.farming_timeout;
255
256    // TODO: Reuse global table generator (this comment is in many files)
257    let table_generator = PosTable::generator();
258    let span = Span::current();
259
260    let mut non_fatal_errors = 0;
261
262    while let Some(slot_info) = slot_info_notifications.next().await {
263        let slot = slot_info.slot;
264
265        // Take mutex briefly to make sure farming is allowed right now
266        global_mutex.lock().await;
267
268        let mut problematic_sectors = Vec::new();
269        let result = try {
270            let start = Instant::now();
271            let sectors_metadata = sectors_metadata.read().await;
272
273            debug!(%slot, sector_count = %sectors_metadata.len(), "Reading sectors");
274
275            let mut sectors_solutions = {
276                let sectors_being_modified = &*sectors_being_modified.read().await;
277
278                thread_pool
279                    .install(|| {
280                        let _span_guard = span.enter();
281
282                        plot_audit.audit(PlotAuditOptions::<PosTable> {
283                            public_key_hash: &public_key_hash,
284                            shard_commitments_roots_cache: &shard_commitments_roots_cache,
285                            slot_info,
286                            sectors_metadata: &sectors_metadata,
287                            erasure_coding: &erasure_coding,
288                            sectors_being_modified,
289                            table_generator: &table_generator,
290                        })
291                    })
292                    .map_err(FarmingError::LowLevelAuditing)?
293            };
294
295            sectors_solutions.sort_by(|a, b| {
296                let a_solution_distance =
297                    a.1.best_solution_distance()
298                        .unwrap_or(SolutionDistance::MAX);
299                let b_solution_distance =
300                    b.1.best_solution_distance()
301                        .unwrap_or(SolutionDistance::MAX);
302
303                a_solution_distance.cmp(&b_solution_distance)
304            });
305
306            {
307                let time = start.elapsed();
308                if let Some(metrics) = &metrics {
309                    metrics.auditing_time.observe(time.as_secs_f64());
310                }
311                handlers
312                    .farming_notification
313                    .call_simple(&FarmingNotification::Auditing(AuditingDetails {
314                        sectors_count: sectors_metadata.len() as u16,
315                        time,
316                    }));
317            }
318
319            // Take mutex and hold until proving end to make sure nothing else major happens at the
320            // same time
321            let _proving_guard = global_mutex.lock().await;
322
323            'solutions_processing: for (sector_index, mut sector_solutions) in sectors_solutions {
324                if sector_solutions.is_empty() {
325                    continue;
326                }
327                let mut start = Instant::now();
328                while let Some(maybe_solution) = thread_pool.install(|| {
329                    let _span_guard = span.enter();
330
331                    sector_solutions.next()
332                }) {
333                    let solution = match maybe_solution {
334                        Ok(solution) => solution,
335                        Err(error) => {
336                            if let Some(metrics) = &metrics {
337                                metrics
338                                    .observe_proving_time(&start.elapsed(), ProvingResult::Failed);
339                            }
340                            error!(
341                                %slot,
342                                %sector_index,
343                                %error,
344                                "Failed to prove, scheduling sector for replotting"
345                            );
346                            problematic_sectors.push(sector_index);
347                            // Do not error completely as disk corruption or other reasons why
348                            // proving might fail
349                            start = Instant::now();
350                            continue;
351                        }
352                    };
353
354                    debug!(%slot, %sector_index, "Solution found");
355                    trace!(?solution, "Solution found");
356
357                    {
358                        let time = start.elapsed();
359                        if time >= farming_timeout {
360                            if let Some(metrics) = &metrics {
361                                metrics.observe_proving_time(&time, ProvingResult::Timeout);
362                            }
363                            handlers.farming_notification.call_simple(
364                                &FarmingNotification::Proving(ProvingDetails {
365                                    result: ProvingResult::Timeout,
366                                    time,
367                                }),
368                            );
369                            warn!(
370                                %slot,
371                                %sector_index,
372                                "Proving for solution skipped due to farming time limit",
373                            );
374
375                            break 'solutions_processing;
376                        }
377                    }
378
379                    let response = SolutionResponse {
380                        slot_number: slot,
381                        solution,
382                    };
383
384                    handlers.solution.call_simple(&response);
385
386                    if let Err(error) = node_client.submit_solution_response(response).await {
387                        let time = start.elapsed();
388                        if let Some(metrics) = &metrics {
389                            metrics.observe_proving_time(&time, ProvingResult::Rejected);
390                        }
391                        handlers
392                            .farming_notification
393                            .call_simple(&FarmingNotification::Proving(ProvingDetails {
394                                result: ProvingResult::Rejected,
395                                time,
396                            }));
397                        warn!(
398                            %slot,
399                            %sector_index,
400                            %error,
401                            "Failed to send solution to node, skipping further proving for this slot",
402                        );
403                        break 'solutions_processing;
404                    }
405
406                    let time = start.elapsed();
407                    if let Some(metrics) = &metrics {
408                        metrics.observe_proving_time(&time, ProvingResult::Success);
409                    }
410                    handlers
411                        .farming_notification
412                        .call_simple(&FarmingNotification::Proving(ProvingDetails {
413                            result: ProvingResult::Success,
414                            time,
415                        }));
416                    start = Instant::now();
417                }
418            }
419        };
420
421        if let Err(error) = result {
422            if error.is_fatal() {
423                return Err(error);
424            }
425
426            non_fatal_errors += 1;
427
428            if non_fatal_errors >= NON_FATAL_ERROR_LIMIT {
429                return Err(error);
430            }
431
432            warn!(
433                %error,
434                "Non-fatal farming error"
435            );
436
437            if let Some(metrics) = &metrics {
438                metrics.note_farming_error(&error);
439            }
440            handlers
441                .farming_notification
442                .call_simple(&FarmingNotification::NonFatalError(Arc::new(error)));
443
444            for sector_index in problematic_sectors.drain(..) {
445                // Inform others that this sector is being modified
446                sectors_being_modified.write().await.insert(sector_index);
447                // Replace metadata with a dummy one, so it will be picked up for replotting next
448                if let Some(existing_sector_metadata) = sectors_metadata
449                    .write()
450                    .await
451                    .get_mut(usize::from(sector_index))
452                {
453                    *existing_sector_metadata = SectorMetadataChecksummed::from(SectorMetadata {
454                        sector_index,
455                        pieces_in_sector: existing_sector_metadata.pieces_in_sector,
456                        s_bucket_sizes: Box::new([0; Record::NUM_S_BUCKETS]),
457                        history_size: HistorySize::from(SegmentIndex::ZERO),
458                    });
459                }
460                // Inform others that this sector is no longer being modified
461                sectors_being_modified.write().await.remove(&sector_index);
462            }
463        } else {
464            non_fatal_errors = 0;
465        }
466    }
467
468    Ok(())
469}