ab_farmer/single_disk_farm/
farming.rs

1//! Farming-related utilities
2//!
3//! These utilities do not expose the whole farming workflow, but rather small bits of it that can
4//! be useful externally (for example for benchmarking purposes in CLI).
5
6pub mod rayon_files;
7
8use crate::farm::{
9    AuditingDetails, FarmingError, FarmingNotification, ProvingDetails, ProvingResult,
10};
11use crate::node_client::NodeClient;
12use crate::single_disk_farm::Handlers;
13use crate::single_disk_farm::metrics::SingleDiskFarmMetrics;
14use ab_core_primitives::address::Address;
15use ab_core_primitives::hashes::Blake3Hash;
16use ab_core_primitives::pieces::Record;
17use ab_core_primitives::pos::PosSeed;
18use ab_core_primitives::sectors::SectorIndex;
19use ab_core_primitives::segments::{HistorySize, SegmentIndex};
20use ab_core_primitives::solutions::{Solution, SolutionDistance};
21use ab_erasure_coding::ErasureCoding;
22use ab_farmer_components::ReadAtSync;
23use ab_farmer_components::auditing::{AuditingError, audit_plot_sync};
24use ab_farmer_components::proving::{ProvableSolutions, ProvingError};
25use ab_farmer_components::sector::{SectorMetadata, SectorMetadataChecksummed};
26use ab_farmer_rpc_primitives::{SlotInfo, SolutionResponse};
27use ab_proof_of_space::{Table, TableGenerator};
28use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
29use futures::StreamExt;
30use futures::channel::mpsc;
31use rayon::ThreadPool;
32use std::collections::HashSet;
33use std::sync::Arc;
34use std::time::Instant;
35use tracing::{Span, debug, error, info, trace, warn};
36
37/// How many non-fatal errors should happen in a row before farm is considered non-operational
38const NON_FATAL_ERROR_LIMIT: usize = 10;
39
40pub(super) async fn slot_notification_forwarder<NC>(
41    node_client: &NC,
42    mut slot_info_forwarder_sender: mpsc::Sender<SlotInfo>,
43    metrics: Option<Arc<SingleDiskFarmMetrics>>,
44) -> Result<(), FarmingError>
45where
46    NC: NodeClient,
47{
48    info!("Subscribing to slot info notifications");
49
50    let mut slot_info_notifications = node_client
51        .subscribe_slot_info()
52        .await
53        .map_err(|error| FarmingError::FailedToSubscribeSlotInfo { error })?;
54
55    while let Some(slot_info) = slot_info_notifications.next().await {
56        debug!(?slot_info, "New slot");
57
58        let slot = slot_info.slot_number;
59
60        // Error means farmer is still solving for previous slot, which is too late, and we need to
61        // skip this slot
62        if slot_info_forwarder_sender.try_send(slot_info).is_err() {
63            if let Some(metrics) = &metrics {
64                metrics.skipped_slots.inc();
65            }
66            debug!(%slot, "Slow farming, skipping slot");
67        }
68    }
69
70    Err(FarmingError::SlotNotificationStreamEnded)
71}
72
73/// Plot audit options
74#[derive(Debug)]
75pub struct PlotAuditOptions<'a, 'b, PosTable>
76where
77    PosTable: Table,
78{
79    /// Public key of the farm
80    pub public_key_hash: &'a Blake3Hash,
81    /// Slot info for the audit
82    pub slot_info: SlotInfo,
83    /// Metadata of all sectors plotted so far
84    pub sectors_metadata: &'a [SectorMetadataChecksummed],
85    /// Erasure coding instance
86    pub erasure_coding: &'a ErasureCoding,
87    /// Optional sector that is currently being modified (for example replotted) and should not be
88    /// audited
89    pub sectors_being_modified: &'b HashSet<SectorIndex>,
90    /// Proof of space table generator
91    pub table_generator: &'a PosTable::Generator,
92}
93
94impl<PosTable> Clone for PlotAuditOptions<'_, '_, PosTable>
95where
96    PosTable: Table,
97{
98    #[inline]
99    fn clone(&self) -> Self {
100        *self
101    }
102}
103
104impl<PosTable> Copy for PlotAuditOptions<'_, '_, PosTable> where PosTable: Table {}
105
106/// Plot auditing implementation
107#[derive(Debug)]
108pub struct PlotAudit<Plot>(Plot)
109where
110    Plot: ReadAtSync;
111
112impl<'a, Plot> PlotAudit<Plot>
113where
114    Plot: ReadAtSync + 'a,
115{
116    /// Create a new instance
117    pub fn new(plot: Plot) -> Self {
118        Self(plot)
119    }
120
121    /// Audit this plot
122    #[allow(clippy::type_complexity)]
123    pub fn audit<'b, PosTable>(
124        &'a self,
125        options: PlotAuditOptions<'a, 'b, PosTable>,
126    ) -> Result<
127        Vec<(
128            SectorIndex,
129            impl ProvableSolutions<Item = Result<Solution, ProvingError>> + use<'a, Plot, PosTable> + 'a,
130        )>,
131        AuditingError,
132    >
133    where
134        PosTable: Table,
135    {
136        let PlotAuditOptions {
137            public_key_hash,
138            slot_info,
139            sectors_metadata,
140            erasure_coding,
141            sectors_being_modified,
142            table_generator,
143        } = options;
144
145        let audit_results = audit_plot_sync(
146            public_key_hash,
147            &slot_info.global_challenge,
148            slot_info.solution_range,
149            &self.0,
150            sectors_metadata,
151            sectors_being_modified,
152        )?;
153
154        Ok(audit_results
155            .into_iter()
156            .filter_map(|audit_results| {
157                let sector_index = audit_results.sector_index;
158
159                let sector_solutions = audit_results
160                    .solution_candidates
161                    .into_solutions(erasure_coding, |seed: &PosSeed| {
162                        table_generator.create_proofs_parallel(seed)
163                    });
164
165                let sector_solutions = match sector_solutions {
166                    Ok(solutions) => solutions,
167                    Err(error) => {
168                        warn!(
169                            %error,
170                            %sector_index,
171                            "Failed to turn solution candidates into solutions",
172                        );
173
174                        return None;
175                    }
176                };
177
178                if sector_solutions.len() == 0 {
179                    return None;
180                }
181
182                Some((sector_index, sector_solutions))
183            })
184            .collect())
185    }
186}
187
188pub(super) struct FarmingOptions<NC, PlotAudit> {
189    pub(super) public_key_hash: Blake3Hash,
190    // TODO: Use `reward_address` in the future
191    #[expect(
192        dead_code,
193        reason = "Reward address was removed from `Solution` and will need to be re-introduced later"
194    )]
195    pub(super) reward_address: Address,
196    pub(super) node_client: NC,
197    pub(super) plot_audit: PlotAudit,
198    pub(super) sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
199    pub(super) erasure_coding: ErasureCoding,
200    pub(super) handlers: Arc<Handlers>,
201    pub(super) sectors_being_modified: Arc<AsyncRwLock<HashSet<SectorIndex>>>,
202    pub(super) slot_info_notifications: mpsc::Receiver<SlotInfo>,
203    pub(super) thread_pool: ThreadPool,
204    pub(super) global_mutex: Arc<AsyncMutex<()>>,
205    pub(super) metrics: Option<Arc<SingleDiskFarmMetrics>>,
206}
207
208/// Starts farming process.
209///
210/// NOTE: Returned future is async, but does blocking operations and should be running in dedicated
211/// thread.
212pub(super) async fn farming<'a, PosTable, NC, Plot>(
213    farming_options: FarmingOptions<NC, PlotAudit<Plot>>,
214) -> Result<(), FarmingError>
215where
216    PosTable: Table,
217    NC: NodeClient,
218    Plot: ReadAtSync + 'a,
219{
220    let FarmingOptions {
221        public_key_hash,
222        // TODO: Use `reward_address` in the future
223        reward_address: _,
224        node_client,
225        plot_audit,
226        sectors_metadata,
227        erasure_coding,
228        handlers,
229        sectors_being_modified,
230        mut slot_info_notifications,
231        thread_pool,
232        global_mutex,
233        metrics,
234    } = farming_options;
235
236    let farmer_app_info = node_client
237        .farmer_app_info()
238        .await
239        .map_err(|error| FarmingError::FailedToGetFarmerInfo { error })?;
240
241    // We assume that each slot is one second
242    let farming_timeout = farmer_app_info.farming_timeout;
243
244    // TODO: Reuse global table generator (this comment is in many files)
245    let table_generator = PosTable::generator();
246    let span = Span::current();
247
248    let mut non_fatal_errors = 0;
249
250    while let Some(slot_info) = slot_info_notifications.next().await {
251        let slot = slot_info.slot_number;
252
253        // Take mutex briefly to make sure farming is allowed right now
254        global_mutex.lock().await;
255
256        let mut problematic_sectors = Vec::new();
257        let result: Result<(), FarmingError> = try {
258            let start = Instant::now();
259            let sectors_metadata = sectors_metadata.read().await;
260
261            debug!(%slot, sector_count = %sectors_metadata.len(), "Reading sectors");
262
263            let mut sectors_solutions = {
264                let sectors_being_modified = &*sectors_being_modified.read().await;
265
266                thread_pool.install(|| {
267                    let _span_guard = span.enter();
268
269                    plot_audit.audit(PlotAuditOptions::<PosTable> {
270                        public_key_hash: &public_key_hash,
271                        slot_info,
272                        sectors_metadata: &sectors_metadata,
273                        erasure_coding: &erasure_coding,
274                        sectors_being_modified,
275                        table_generator: &table_generator,
276                    })
277                })?
278            };
279
280            sectors_solutions.sort_by(|a, b| {
281                let a_solution_distance =
282                    a.1.best_solution_distance()
283                        .unwrap_or(SolutionDistance::MAX);
284                let b_solution_distance =
285                    b.1.best_solution_distance()
286                        .unwrap_or(SolutionDistance::MAX);
287
288                a_solution_distance.cmp(&b_solution_distance)
289            });
290
291            {
292                let time = start.elapsed();
293                if let Some(metrics) = &metrics {
294                    metrics.auditing_time.observe(time.as_secs_f64());
295                }
296                handlers
297                    .farming_notification
298                    .call_simple(&FarmingNotification::Auditing(AuditingDetails {
299                        sectors_count: sectors_metadata.len() as u16,
300                        time,
301                    }));
302            }
303
304            // Take mutex and hold until proving end to make sure nothing else major happens at the
305            // same time
306            let _proving_guard = global_mutex.lock().await;
307
308            'solutions_processing: for (sector_index, mut sector_solutions) in sectors_solutions {
309                if sector_solutions.is_empty() {
310                    continue;
311                }
312                let mut start = Instant::now();
313                while let Some(maybe_solution) = thread_pool.install(|| {
314                    let _span_guard = span.enter();
315
316                    sector_solutions.next()
317                }) {
318                    let solution = match maybe_solution {
319                        Ok(solution) => solution,
320                        Err(error) => {
321                            if let Some(metrics) = &metrics {
322                                metrics
323                                    .observe_proving_time(&start.elapsed(), ProvingResult::Failed);
324                            }
325                            error!(
326                                %slot,
327                                %sector_index,
328                                %error,
329                                "Failed to prove, scheduling sector for replotting"
330                            );
331                            problematic_sectors.push(sector_index);
332                            // Do not error completely as disk corruption or other reasons why
333                            // proving might fail
334                            start = Instant::now();
335                            continue;
336                        }
337                    };
338
339                    debug!(%slot, %sector_index, "Solution found");
340                    trace!(?solution, "Solution found");
341
342                    {
343                        let time = start.elapsed();
344                        if time >= farming_timeout {
345                            if let Some(metrics) = &metrics {
346                                metrics.observe_proving_time(&time, ProvingResult::Timeout);
347                            }
348                            handlers.farming_notification.call_simple(
349                                &FarmingNotification::Proving(ProvingDetails {
350                                    result: ProvingResult::Timeout,
351                                    time,
352                                }),
353                            );
354                            warn!(
355                                %slot,
356                                %sector_index,
357                                "Proving for solution skipped due to farming time limit",
358                            );
359
360                            break 'solutions_processing;
361                        }
362                    }
363
364                    let response = SolutionResponse {
365                        slot_number: slot,
366                        solution,
367                    };
368
369                    handlers.solution.call_simple(&response);
370
371                    if let Err(error) = node_client.submit_solution_response(response).await {
372                        let time = start.elapsed();
373                        if let Some(metrics) = &metrics {
374                            metrics.observe_proving_time(&time, ProvingResult::Rejected);
375                        }
376                        handlers
377                            .farming_notification
378                            .call_simple(&FarmingNotification::Proving(ProvingDetails {
379                                result: ProvingResult::Rejected,
380                                time,
381                            }));
382                        warn!(
383                            %slot,
384                            %sector_index,
385                            %error,
386                            "Failed to send solution to node, skipping further proving for this slot",
387                        );
388                        break 'solutions_processing;
389                    }
390
391                    let time = start.elapsed();
392                    if let Some(metrics) = &metrics {
393                        metrics.observe_proving_time(&time, ProvingResult::Success);
394                    }
395                    handlers
396                        .farming_notification
397                        .call_simple(&FarmingNotification::Proving(ProvingDetails {
398                            result: ProvingResult::Success,
399                            time,
400                        }));
401                    start = Instant::now();
402                }
403            }
404        };
405
406        if let Err(error) = result {
407            if error.is_fatal() {
408                return Err(error);
409            }
410
411            non_fatal_errors += 1;
412
413            if non_fatal_errors >= NON_FATAL_ERROR_LIMIT {
414                return Err(error);
415            }
416
417            warn!(
418                %error,
419                "Non-fatal farming error"
420            );
421
422            if let Some(metrics) = &metrics {
423                metrics.note_farming_error(&error);
424            }
425            handlers
426                .farming_notification
427                .call_simple(&FarmingNotification::NonFatalError(Arc::new(error)));
428
429            for sector_index in problematic_sectors.drain(..) {
430                // Inform others that this sector is being modified
431                sectors_being_modified.write().await.insert(sector_index);
432                // Replace metadata with a dummy one, so it will be picked up for replotting next
433                if let Some(existing_sector_metadata) = sectors_metadata
434                    .write()
435                    .await
436                    .get_mut(usize::from(sector_index))
437                {
438                    *existing_sector_metadata = SectorMetadataChecksummed::from(SectorMetadata {
439                        sector_index,
440                        pieces_in_sector: existing_sector_metadata.pieces_in_sector,
441                        s_bucket_sizes: Box::new([0; Record::NUM_S_BUCKETS]),
442                        history_size: HistorySize::from(SegmentIndex::ZERO),
443                    });
444                }
445                // Inform others that this sector is no longer being modified
446                sectors_being_modified.write().await.remove(&sector_index);
447            }
448        } else {
449            non_fatal_errors = 0;
450        }
451    }
452
453    Ok(())
454}