ab_farmer/single_disk_farm/
farming.rs1pub mod rayon_files;
7
8use crate::farm::{
9 AuditingDetails, FarmingError, FarmingNotification, ProvingDetails, ProvingResult,
10};
11use crate::node_client::NodeClient;
12use crate::single_disk_farm::Handlers;
13use crate::single_disk_farm::metrics::SingleDiskFarmMetrics;
14use ab_core_primitives::address::Address;
15use ab_core_primitives::hashes::Blake3Hash;
16use ab_core_primitives::pieces::Record;
17use ab_core_primitives::pos::PosSeed;
18use ab_core_primitives::sectors::SectorIndex;
19use ab_core_primitives::segments::{HistorySize, SegmentIndex};
20use ab_core_primitives::solutions::{Solution, SolutionDistance};
21use ab_erasure_coding::ErasureCoding;
22use ab_farmer_components::ReadAtSync;
23use ab_farmer_components::auditing::{AuditingError, audit_plot_sync};
24use ab_farmer_components::proving::{ProvableSolutions, ProvingError};
25use ab_farmer_components::sector::{SectorMetadata, SectorMetadataChecksummed};
26use ab_farmer_components::shard_commitment::ShardCommitmentsRootsCache;
27use ab_farmer_rpc_primitives::{SlotInfo, SolutionResponse};
28use ab_proof_of_space::{Table, TableGenerator};
29use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
30use futures::StreamExt;
31use futures::channel::mpsc;
32use rayon::ThreadPool;
33use std::collections::HashSet;
34use std::sync::Arc;
35use std::time::Instant;
36use tracing::{Span, debug, error, info, trace, warn};
37
38const NON_FATAL_ERROR_LIMIT: usize = 10;
40
41pub(super) async fn slot_notification_forwarder<NC>(
42 node_client: &NC,
43 mut slot_info_forwarder_sender: mpsc::Sender<SlotInfo>,
44 metrics: Option<Arc<SingleDiskFarmMetrics>>,
45) -> Result<(), FarmingError>
46where
47 NC: NodeClient,
48{
49 info!("Subscribing to slot info notifications");
50
51 let mut slot_info_notifications = node_client
52 .subscribe_slot_info()
53 .await
54 .map_err(|error| FarmingError::FailedToSubscribeSlotInfo { error })?;
55
56 while let Some(slot_info) = slot_info_notifications.next().await {
57 debug!(?slot_info, "New slot");
58
59 let slot = slot_info.slot;
60
61 if slot_info_forwarder_sender.try_send(slot_info).is_err() {
64 if let Some(metrics) = &metrics {
65 metrics.skipped_slots.inc();
66 }
67 debug!(%slot, "Slow farming, skipping slot");
68 }
69 }
70
71 Err(FarmingError::SlotNotificationStreamEnded)
72}
73
74#[derive(Debug)]
76pub struct PlotAuditOptions<'a, 'b, PosTable>
77where
78 PosTable: Table,
79{
80 pub public_key_hash: &'a Blake3Hash,
82 pub shard_commitments_roots_cache: &'a ShardCommitmentsRootsCache,
84 pub slot_info: SlotInfo,
86 pub sectors_metadata: &'a [SectorMetadataChecksummed],
88 pub erasure_coding: &'a ErasureCoding,
90 pub sectors_being_modified: &'b HashSet<SectorIndex>,
93 pub table_generator: &'a PosTable::Generator,
95}
96
97impl<PosTable> Clone for PlotAuditOptions<'_, '_, PosTable>
98where
99 PosTable: Table,
100{
101 #[inline]
102 fn clone(&self) -> Self {
103 *self
104 }
105}
106
107impl<PosTable> Copy for PlotAuditOptions<'_, '_, PosTable> where PosTable: Table {}
108
109#[derive(Debug)]
111pub struct PlotAudit<Plot>(Plot)
112where
113 Plot: ReadAtSync;
114
115impl<'a, Plot> PlotAudit<Plot>
116where
117 Plot: ReadAtSync + 'a,
118{
119 pub fn new(plot: Plot) -> Self {
121 Self(plot)
122 }
123
124 #[expect(
126 clippy::type_complexity,
127 reason = "Probably not worth creating type aliases"
128 )]
129 pub fn audit<'b, PosTable>(
130 &'a self,
131 options: PlotAuditOptions<'a, 'b, PosTable>,
132 ) -> Result<
133 Vec<(
134 SectorIndex,
135 impl ProvableSolutions<Item = Result<Solution, ProvingError>> + use<'a, Plot, PosTable> + 'a,
136 )>,
137 AuditingError,
138 >
139 where
140 PosTable: Table,
141 {
142 let PlotAuditOptions {
143 public_key_hash,
144 shard_commitments_roots_cache,
145 slot_info,
146 sectors_metadata,
147 erasure_coding,
148 sectors_being_modified,
149 table_generator,
150 } = options;
151
152 let audit_results = audit_plot_sync(
153 public_key_hash,
154 shard_commitments_roots_cache,
155 slot_info.shard_membership_entropy,
156 slot_info.num_shards,
157 &slot_info.global_challenge,
158 slot_info.solution_range,
159 &self.0,
160 sectors_metadata,
161 sectors_being_modified,
162 )?;
163
164 Ok(audit_results
165 .into_iter()
166 .filter_map(|audit_results| {
167 let sector_index = audit_results.sector_index;
168
169 let sector_solutions = audit_results
170 .solution_candidates
171 .into_solutions(erasure_coding, |seed: &PosSeed| {
172 table_generator.create_proofs_parallel(seed)
173 });
174
175 let sector_solutions = match sector_solutions {
176 Ok(solutions) => solutions,
177 Err(error) => {
178 warn!(
179 %error,
180 %sector_index,
181 "Failed to turn solution candidates into solutions",
182 );
183
184 return None;
185 }
186 };
187
188 if sector_solutions.len() == 0 {
189 return None;
190 }
191
192 Some((sector_index, sector_solutions))
193 })
194 .collect())
195 }
196}
197
198pub(super) struct FarmingOptions<NC, PlotAudit> {
199 pub(super) public_key_hash: Blake3Hash,
200 pub(super) shard_commitments_roots_cache: ShardCommitmentsRootsCache,
201 #[expect(
203 dead_code,
204 reason = "Reward address was removed from `Solution` and will need to be re-introduced later"
205 )]
206 pub(super) reward_address: Address,
207 pub(super) node_client: NC,
208 pub(super) plot_audit: PlotAudit,
209 pub(super) sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
210 pub(super) erasure_coding: ErasureCoding,
211 pub(super) handlers: Arc<Handlers>,
212 pub(super) sectors_being_modified: Arc<AsyncRwLock<HashSet<SectorIndex>>>,
213 pub(super) slot_info_notifications: mpsc::Receiver<SlotInfo>,
214 pub(super) thread_pool: ThreadPool,
215 pub(super) global_mutex: Arc<AsyncMutex<()>>,
216 pub(super) metrics: Option<Arc<SingleDiskFarmMetrics>>,
217}
218
219pub(super) async fn farming<'a, PosTable, NC, Plot>(
224 farming_options: FarmingOptions<NC, PlotAudit<Plot>>,
225) -> Result<(), FarmingError>
226where
227 PosTable: Table,
228 NC: NodeClient,
229 Plot: ReadAtSync + 'a,
230{
231 let FarmingOptions {
232 public_key_hash,
233 shard_commitments_roots_cache,
234 reward_address: _,
236 node_client,
237 plot_audit,
238 sectors_metadata,
239 erasure_coding,
240 handlers,
241 sectors_being_modified,
242 mut slot_info_notifications,
243 thread_pool,
244 global_mutex,
245 metrics,
246 } = farming_options;
247
248 let farmer_app_info = node_client
249 .farmer_app_info()
250 .await
251 .map_err(|error| FarmingError::FailedToGetFarmerInfo { error })?;
252
253 let farming_timeout = farmer_app_info.farming_timeout;
255
256 let table_generator = PosTable::generator();
258 let span = Span::current();
259
260 let mut non_fatal_errors = 0;
261
262 while let Some(slot_info) = slot_info_notifications.next().await {
263 let slot = slot_info.slot;
264
265 global_mutex.lock().await;
267
268 let mut problematic_sectors = Vec::new();
269 let result = try {
270 let start = Instant::now();
271 let sectors_metadata = sectors_metadata.read().await;
272
273 debug!(%slot, sector_count = %sectors_metadata.len(), "Reading sectors");
274
275 let mut sectors_solutions = {
276 let sectors_being_modified = &*sectors_being_modified.read().await;
277
278 thread_pool
279 .install(|| {
280 let _span_guard = span.enter();
281
282 plot_audit.audit(PlotAuditOptions::<PosTable> {
283 public_key_hash: &public_key_hash,
284 shard_commitments_roots_cache: &shard_commitments_roots_cache,
285 slot_info,
286 sectors_metadata: §ors_metadata,
287 erasure_coding: &erasure_coding,
288 sectors_being_modified,
289 table_generator: &table_generator,
290 })
291 })
292 .map_err(FarmingError::LowLevelAuditing)?
293 };
294
295 sectors_solutions.sort_by(|a, b| {
296 let a_solution_distance =
297 a.1.best_solution_distance()
298 .unwrap_or(SolutionDistance::MAX);
299 let b_solution_distance =
300 b.1.best_solution_distance()
301 .unwrap_or(SolutionDistance::MAX);
302
303 a_solution_distance.cmp(&b_solution_distance)
304 });
305
306 {
307 let time = start.elapsed();
308 if let Some(metrics) = &metrics {
309 metrics.auditing_time.observe(time.as_secs_f64());
310 }
311 handlers
312 .farming_notification
313 .call_simple(&FarmingNotification::Auditing(AuditingDetails {
314 sectors_count: sectors_metadata.len() as u16,
315 time,
316 }));
317 }
318
319 let _proving_guard = global_mutex.lock().await;
322
323 'solutions_processing: for (sector_index, mut sector_solutions) in sectors_solutions {
324 if sector_solutions.is_empty() {
325 continue;
326 }
327 let mut start = Instant::now();
328 while let Some(maybe_solution) = thread_pool.install(|| {
329 let _span_guard = span.enter();
330
331 sector_solutions.next()
332 }) {
333 let solution = match maybe_solution {
334 Ok(solution) => solution,
335 Err(error) => {
336 if let Some(metrics) = &metrics {
337 metrics
338 .observe_proving_time(&start.elapsed(), ProvingResult::Failed);
339 }
340 error!(
341 %slot,
342 %sector_index,
343 %error,
344 "Failed to prove, scheduling sector for replotting"
345 );
346 problematic_sectors.push(sector_index);
347 start = Instant::now();
350 continue;
351 }
352 };
353
354 debug!(%slot, %sector_index, "Solution found");
355 trace!(?solution, "Solution found");
356
357 {
358 let time = start.elapsed();
359 if time >= farming_timeout {
360 if let Some(metrics) = &metrics {
361 metrics.observe_proving_time(&time, ProvingResult::Timeout);
362 }
363 handlers.farming_notification.call_simple(
364 &FarmingNotification::Proving(ProvingDetails {
365 result: ProvingResult::Timeout,
366 time,
367 }),
368 );
369 warn!(
370 %slot,
371 %sector_index,
372 "Proving for solution skipped due to farming time limit",
373 );
374
375 break 'solutions_processing;
376 }
377 }
378
379 let response = SolutionResponse {
380 slot_number: slot,
381 solution,
382 };
383
384 handlers.solution.call_simple(&response);
385
386 if let Err(error) = node_client.submit_solution_response(response).await {
387 let time = start.elapsed();
388 if let Some(metrics) = &metrics {
389 metrics.observe_proving_time(&time, ProvingResult::Rejected);
390 }
391 handlers
392 .farming_notification
393 .call_simple(&FarmingNotification::Proving(ProvingDetails {
394 result: ProvingResult::Rejected,
395 time,
396 }));
397 warn!(
398 %slot,
399 %sector_index,
400 %error,
401 "Failed to send solution to node, skipping further proving for this slot",
402 );
403 break 'solutions_processing;
404 }
405
406 let time = start.elapsed();
407 if let Some(metrics) = &metrics {
408 metrics.observe_proving_time(&time, ProvingResult::Success);
409 }
410 handlers
411 .farming_notification
412 .call_simple(&FarmingNotification::Proving(ProvingDetails {
413 result: ProvingResult::Success,
414 time,
415 }));
416 start = Instant::now();
417 }
418 }
419 };
420
421 if let Err(error) = result {
422 if error.is_fatal() {
423 return Err(error);
424 }
425
426 non_fatal_errors += 1;
427
428 if non_fatal_errors >= NON_FATAL_ERROR_LIMIT {
429 return Err(error);
430 }
431
432 warn!(
433 %error,
434 "Non-fatal farming error"
435 );
436
437 if let Some(metrics) = &metrics {
438 metrics.note_farming_error(&error);
439 }
440 handlers
441 .farming_notification
442 .call_simple(&FarmingNotification::NonFatalError(Arc::new(error)));
443
444 for sector_index in problematic_sectors.drain(..) {
445 sectors_being_modified.write().await.insert(sector_index);
447 if let Some(existing_sector_metadata) = sectors_metadata
449 .write()
450 .await
451 .get_mut(usize::from(sector_index))
452 {
453 *existing_sector_metadata = SectorMetadataChecksummed::from(SectorMetadata {
454 sector_index,
455 pieces_in_sector: existing_sector_metadata.pieces_in_sector,
456 s_bucket_sizes: Box::new([0; Record::NUM_S_BUCKETS]),
457 history_size: HistorySize::from(SegmentIndex::ZERO),
458 });
459 }
460 sectors_being_modified.write().await.remove(§or_index);
462 }
463 } else {
464 non_fatal_errors = 0;
465 }
466 }
467
468 Ok(())
469}