ab_farmer/single_disk_farm/
farming.rs

1//! Farming-related utilities
2//!
3//! These utilities do not expose the whole farming workflow, but rather small bits of it that can
4//! be useful externally (for example for benchmarking purposes in CLI).
5
6pub mod rayon_files;
7
8use crate::farm::{
9    AuditingDetails, FarmingError, FarmingNotification, ProvingDetails, ProvingResult,
10};
11use crate::node_client::NodeClient;
12use crate::single_disk_farm::Handlers;
13use crate::single_disk_farm::metrics::SingleDiskFarmMetrics;
14use ab_core_primitives::address::Address;
15use ab_core_primitives::hashes::Blake3Hash;
16use ab_core_primitives::pieces::Record;
17use ab_core_primitives::pos::PosSeed;
18use ab_core_primitives::sectors::SectorIndex;
19use ab_core_primitives::segments::{HistorySize, SegmentIndex};
20use ab_core_primitives::solutions::{Solution, SolutionDistance};
21use ab_erasure_coding::ErasureCoding;
22use ab_farmer_components::ReadAtSync;
23use ab_farmer_components::auditing::{AuditingError, audit_plot_sync};
24use ab_farmer_components::proving::{ProvableSolutions, ProvingError};
25use ab_farmer_components::sector::{SectorMetadata, SectorMetadataChecksummed};
26use ab_farmer_components::shard_commitment::ShardCommitmentsRootsCache;
27use ab_farmer_rpc_primitives::{SlotInfo, SolutionResponse};
28use ab_proof_of_space::{Table, TableGenerator};
29use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
30use futures::StreamExt;
31use futures::channel::mpsc;
32use rayon::ThreadPool;
33use std::collections::HashSet;
34use std::sync::Arc;
35use std::time::Instant;
36use tracing::{Span, debug, error, info, trace, warn};
37
38/// How many non-fatal errors should happen in a row before farm is considered non-operational
39const NON_FATAL_ERROR_LIMIT: usize = 10;
40
41pub(super) async fn slot_notification_forwarder<NC>(
42    node_client: &NC,
43    mut slot_info_forwarder_sender: mpsc::Sender<SlotInfo>,
44    metrics: Option<Arc<SingleDiskFarmMetrics>>,
45) -> Result<(), FarmingError>
46where
47    NC: NodeClient,
48{
49    info!("Subscribing to slot info notifications");
50
51    let mut slot_info_notifications = node_client
52        .subscribe_slot_info()
53        .await
54        .map_err(|error| FarmingError::FailedToSubscribeSlotInfo { error })?;
55
56    while let Some(slot_info) = slot_info_notifications.next().await {
57        debug!(?slot_info, "New slot");
58
59        let slot = slot_info.slot;
60
61        // Error means farmer is still solving for previous slot, which is too late, and we need to
62        // skip this slot
63        if slot_info_forwarder_sender.try_send(slot_info).is_err() {
64            if let Some(metrics) = &metrics {
65                metrics.skipped_slots.inc();
66            }
67            debug!(%slot, "Slow farming, skipping slot");
68        }
69    }
70
71    Err(FarmingError::SlotNotificationStreamEnded)
72}
73
74/// Plot audit options
75#[derive(Debug)]
76pub struct PlotAuditOptions<'a, 'b, PosTable>
77where
78    PosTable: Table,
79{
80    /// Public key of the farm
81    pub public_key_hash: &'a Blake3Hash,
82    /// Cache for shard commitments roots
83    pub shard_commitments_roots_cache: &'a ShardCommitmentsRootsCache,
84    /// Slot info for the audit
85    pub slot_info: SlotInfo,
86    /// Metadata of all sectors plotted so far
87    pub sectors_metadata: &'a [SectorMetadataChecksummed],
88    /// Erasure coding instance
89    pub erasure_coding: &'a ErasureCoding,
90    /// Optional sector that is currently being modified (for example replotted) and should not be
91    /// audited
92    pub sectors_being_modified: &'b HashSet<SectorIndex>,
93    /// Proof of space table generator
94    pub table_generator: &'a PosTable::Generator,
95}
96
97impl<PosTable> Clone for PlotAuditOptions<'_, '_, PosTable>
98where
99    PosTable: Table,
100{
101    #[inline]
102    fn clone(&self) -> Self {
103        *self
104    }
105}
106
107impl<PosTable> Copy for PlotAuditOptions<'_, '_, PosTable> where PosTable: Table {}
108
109/// Plot auditing implementation
110#[derive(Debug)]
111pub struct PlotAudit<Plot>(Plot)
112where
113    Plot: ReadAtSync;
114
115impl<'a, Plot> PlotAudit<Plot>
116where
117    Plot: ReadAtSync + 'a,
118{
119    /// Create a new instance
120    pub fn new(plot: Plot) -> Self {
121        Self(plot)
122    }
123
124    /// Audit this plot
125    #[expect(clippy::type_complexity)]
126    pub fn audit<'b, PosTable>(
127        &'a self,
128        options: PlotAuditOptions<'a, 'b, PosTable>,
129    ) -> Result<
130        Vec<(
131            SectorIndex,
132            impl ProvableSolutions<Item = Result<Solution, ProvingError>> + use<'a, Plot, PosTable> + 'a,
133        )>,
134        AuditingError,
135    >
136    where
137        PosTable: Table,
138    {
139        let PlotAuditOptions {
140            public_key_hash,
141            shard_commitments_roots_cache,
142            slot_info,
143            sectors_metadata,
144            erasure_coding,
145            sectors_being_modified,
146            table_generator,
147        } = options;
148
149        let audit_results = audit_plot_sync(
150            public_key_hash,
151            shard_commitments_roots_cache,
152            slot_info.shard_membership_entropy,
153            slot_info.num_shards,
154            &slot_info.global_challenge,
155            slot_info.solution_range,
156            &self.0,
157            sectors_metadata,
158            sectors_being_modified,
159        )?;
160
161        Ok(audit_results
162            .into_iter()
163            .filter_map(|audit_results| {
164                let sector_index = audit_results.sector_index;
165
166                let sector_solutions = audit_results
167                    .solution_candidates
168                    .into_solutions(erasure_coding, |seed: &PosSeed| {
169                        table_generator.create_proofs_parallel(seed)
170                    });
171
172                let sector_solutions = match sector_solutions {
173                    Ok(solutions) => solutions,
174                    Err(error) => {
175                        warn!(
176                            %error,
177                            %sector_index,
178                            "Failed to turn solution candidates into solutions",
179                        );
180
181                        return None;
182                    }
183                };
184
185                if sector_solutions.len() == 0 {
186                    return None;
187                }
188
189                Some((sector_index, sector_solutions))
190            })
191            .collect())
192    }
193}
194
195pub(super) struct FarmingOptions<NC, PlotAudit> {
196    pub(super) public_key_hash: Blake3Hash,
197    pub(super) shard_commitments_roots_cache: ShardCommitmentsRootsCache,
198    // TODO: Use `reward_address` in the future
199    #[expect(
200        dead_code,
201        reason = "Reward address was removed from `Solution` and will need to be re-introduced later"
202    )]
203    pub(super) reward_address: Address,
204    pub(super) node_client: NC,
205    pub(super) plot_audit: PlotAudit,
206    pub(super) sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
207    pub(super) erasure_coding: ErasureCoding,
208    pub(super) handlers: Arc<Handlers>,
209    pub(super) sectors_being_modified: Arc<AsyncRwLock<HashSet<SectorIndex>>>,
210    pub(super) slot_info_notifications: mpsc::Receiver<SlotInfo>,
211    pub(super) thread_pool: ThreadPool,
212    pub(super) global_mutex: Arc<AsyncMutex<()>>,
213    pub(super) metrics: Option<Arc<SingleDiskFarmMetrics>>,
214}
215
216/// Starts farming process.
217///
218/// NOTE: Returned future is async, but does blocking operations and should be running in dedicated
219/// thread.
220pub(super) async fn farming<'a, PosTable, NC, Plot>(
221    farming_options: FarmingOptions<NC, PlotAudit<Plot>>,
222) -> Result<(), FarmingError>
223where
224    PosTable: Table,
225    NC: NodeClient,
226    Plot: ReadAtSync + 'a,
227{
228    let FarmingOptions {
229        public_key_hash,
230        shard_commitments_roots_cache,
231        // TODO: Use `reward_address` in the future
232        reward_address: _,
233        node_client,
234        plot_audit,
235        sectors_metadata,
236        erasure_coding,
237        handlers,
238        sectors_being_modified,
239        mut slot_info_notifications,
240        thread_pool,
241        global_mutex,
242        metrics,
243    } = farming_options;
244
245    let farmer_app_info = node_client
246        .farmer_app_info()
247        .await
248        .map_err(|error| FarmingError::FailedToGetFarmerInfo { error })?;
249
250    // We assume that each slot is one second
251    let farming_timeout = farmer_app_info.farming_timeout;
252
253    // TODO: Reuse global table generator (this comment is in many files)
254    let table_generator = PosTable::generator();
255    let span = Span::current();
256
257    let mut non_fatal_errors = 0;
258
259    while let Some(slot_info) = slot_info_notifications.next().await {
260        let slot = slot_info.slot;
261
262        // Take mutex briefly to make sure farming is allowed right now
263        global_mutex.lock().await;
264
265        let mut problematic_sectors = Vec::new();
266        let result = try {
267            let start = Instant::now();
268            let sectors_metadata = sectors_metadata.read().await;
269
270            debug!(%slot, sector_count = %sectors_metadata.len(), "Reading sectors");
271
272            let mut sectors_solutions = {
273                let sectors_being_modified = &*sectors_being_modified.read().await;
274
275                thread_pool
276                    .install(|| {
277                        let _span_guard = span.enter();
278
279                        plot_audit.audit(PlotAuditOptions::<PosTable> {
280                            public_key_hash: &public_key_hash,
281                            shard_commitments_roots_cache: &shard_commitments_roots_cache,
282                            slot_info,
283                            sectors_metadata: &sectors_metadata,
284                            erasure_coding: &erasure_coding,
285                            sectors_being_modified,
286                            table_generator: &table_generator,
287                        })
288                    })
289                    .map_err(FarmingError::LowLevelAuditing)?
290            };
291
292            sectors_solutions.sort_by(|a, b| {
293                let a_solution_distance =
294                    a.1.best_solution_distance()
295                        .unwrap_or(SolutionDistance::MAX);
296                let b_solution_distance =
297                    b.1.best_solution_distance()
298                        .unwrap_or(SolutionDistance::MAX);
299
300                a_solution_distance.cmp(&b_solution_distance)
301            });
302
303            {
304                let time = start.elapsed();
305                if let Some(metrics) = &metrics {
306                    metrics.auditing_time.observe(time.as_secs_f64());
307                }
308                handlers
309                    .farming_notification
310                    .call_simple(&FarmingNotification::Auditing(AuditingDetails {
311                        sectors_count: sectors_metadata.len() as u16,
312                        time,
313                    }));
314            }
315
316            // Take mutex and hold until proving end to make sure nothing else major happens at the
317            // same time
318            let _proving_guard = global_mutex.lock().await;
319
320            'solutions_processing: for (sector_index, mut sector_solutions) in sectors_solutions {
321                if sector_solutions.is_empty() {
322                    continue;
323                }
324                let mut start = Instant::now();
325                while let Some(maybe_solution) = thread_pool.install(|| {
326                    let _span_guard = span.enter();
327
328                    sector_solutions.next()
329                }) {
330                    let solution = match maybe_solution {
331                        Ok(solution) => solution,
332                        Err(error) => {
333                            if let Some(metrics) = &metrics {
334                                metrics
335                                    .observe_proving_time(&start.elapsed(), ProvingResult::Failed);
336                            }
337                            error!(
338                                %slot,
339                                %sector_index,
340                                %error,
341                                "Failed to prove, scheduling sector for replotting"
342                            );
343                            problematic_sectors.push(sector_index);
344                            // Do not error completely as disk corruption or other reasons why
345                            // proving might fail
346                            start = Instant::now();
347                            continue;
348                        }
349                    };
350
351                    debug!(%slot, %sector_index, "Solution found");
352                    trace!(?solution, "Solution found");
353
354                    {
355                        let time = start.elapsed();
356                        if time >= farming_timeout {
357                            if let Some(metrics) = &metrics {
358                                metrics.observe_proving_time(&time, ProvingResult::Timeout);
359                            }
360                            handlers.farming_notification.call_simple(
361                                &FarmingNotification::Proving(ProvingDetails {
362                                    result: ProvingResult::Timeout,
363                                    time,
364                                }),
365                            );
366                            warn!(
367                                %slot,
368                                %sector_index,
369                                "Proving for solution skipped due to farming time limit",
370                            );
371
372                            break 'solutions_processing;
373                        }
374                    }
375
376                    let response = SolutionResponse {
377                        slot_number: slot,
378                        solution,
379                    };
380
381                    handlers.solution.call_simple(&response);
382
383                    if let Err(error) = node_client.submit_solution_response(response).await {
384                        let time = start.elapsed();
385                        if let Some(metrics) = &metrics {
386                            metrics.observe_proving_time(&time, ProvingResult::Rejected);
387                        }
388                        handlers
389                            .farming_notification
390                            .call_simple(&FarmingNotification::Proving(ProvingDetails {
391                                result: ProvingResult::Rejected,
392                                time,
393                            }));
394                        warn!(
395                            %slot,
396                            %sector_index,
397                            %error,
398                            "Failed to send solution to node, skipping further proving for this slot",
399                        );
400                        break 'solutions_processing;
401                    }
402
403                    let time = start.elapsed();
404                    if let Some(metrics) = &metrics {
405                        metrics.observe_proving_time(&time, ProvingResult::Success);
406                    }
407                    handlers
408                        .farming_notification
409                        .call_simple(&FarmingNotification::Proving(ProvingDetails {
410                            result: ProvingResult::Success,
411                            time,
412                        }));
413                    start = Instant::now();
414                }
415            }
416        };
417
418        if let Err(error) = result {
419            if error.is_fatal() {
420                return Err(error);
421            }
422
423            non_fatal_errors += 1;
424
425            if non_fatal_errors >= NON_FATAL_ERROR_LIMIT {
426                return Err(error);
427            }
428
429            warn!(
430                %error,
431                "Non-fatal farming error"
432            );
433
434            if let Some(metrics) = &metrics {
435                metrics.note_farming_error(&error);
436            }
437            handlers
438                .farming_notification
439                .call_simple(&FarmingNotification::NonFatalError(Arc::new(error)));
440
441            for sector_index in problematic_sectors.drain(..) {
442                // Inform others that this sector is being modified
443                sectors_being_modified.write().await.insert(sector_index);
444                // Replace metadata with a dummy one, so it will be picked up for replotting next
445                if let Some(existing_sector_metadata) = sectors_metadata
446                    .write()
447                    .await
448                    .get_mut(usize::from(sector_index))
449                {
450                    *existing_sector_metadata = SectorMetadataChecksummed::from(SectorMetadata {
451                        sector_index,
452                        pieces_in_sector: existing_sector_metadata.pieces_in_sector,
453                        s_bucket_sizes: Box::new([0; Record::NUM_S_BUCKETS]),
454                        history_size: HistorySize::from(SegmentIndex::ZERO),
455                    });
456                }
457                // Inform others that this sector is no longer being modified
458                sectors_being_modified.write().await.remove(&sector_index);
459            }
460        } else {
461            non_fatal_errors = 0;
462        }
463    }
464
465    Ok(())
466}