ab_farmer/single_disk_farm/
farming.rs1pub mod rayon_files;
7
8use crate::farm::{
9 AuditingDetails, FarmingError, FarmingNotification, ProvingDetails, ProvingResult,
10};
11use crate::node_client::NodeClient;
12use crate::single_disk_farm::Handlers;
13use crate::single_disk_farm::metrics::SingleDiskFarmMetrics;
14use ab_core_primitives::address::Address;
15use ab_core_primitives::hashes::Blake3Hash;
16use ab_core_primitives::pieces::Record;
17use ab_core_primitives::pos::PosSeed;
18use ab_core_primitives::sectors::SectorIndex;
19use ab_core_primitives::segments::{HistorySize, SegmentIndex};
20use ab_core_primitives::solutions::{Solution, SolutionDistance};
21use ab_erasure_coding::ErasureCoding;
22use ab_farmer_components::ReadAtSync;
23use ab_farmer_components::auditing::{AuditingError, audit_plot_sync};
24use ab_farmer_components::proving::{ProvableSolutions, ProvingError};
25use ab_farmer_components::sector::{SectorMetadata, SectorMetadataChecksummed};
26use ab_farmer_components::shard_commitment::ShardCommitmentsRootsCache;
27use ab_farmer_rpc_primitives::{SlotInfo, SolutionResponse};
28use ab_proof_of_space::{Table, TableGenerator};
29use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
30use futures::StreamExt;
31use futures::channel::mpsc;
32use rayon::ThreadPool;
33use std::collections::HashSet;
34use std::sync::Arc;
35use std::time::Instant;
36use tracing::{Span, debug, error, info, trace, warn};
37
38const NON_FATAL_ERROR_LIMIT: usize = 10;
40
41pub(super) async fn slot_notification_forwarder<NC>(
42 node_client: &NC,
43 mut slot_info_forwarder_sender: mpsc::Sender<SlotInfo>,
44 metrics: Option<Arc<SingleDiskFarmMetrics>>,
45) -> Result<(), FarmingError>
46where
47 NC: NodeClient,
48{
49 info!("Subscribing to slot info notifications");
50
51 let mut slot_info_notifications = node_client
52 .subscribe_slot_info()
53 .await
54 .map_err(|error| FarmingError::FailedToSubscribeSlotInfo { error })?;
55
56 while let Some(slot_info) = slot_info_notifications.next().await {
57 debug!(?slot_info, "New slot");
58
59 let slot = slot_info.slot;
60
61 if slot_info_forwarder_sender.try_send(slot_info).is_err() {
64 if let Some(metrics) = &metrics {
65 metrics.skipped_slots.inc();
66 }
67 debug!(%slot, "Slow farming, skipping slot");
68 }
69 }
70
71 Err(FarmingError::SlotNotificationStreamEnded)
72}
73
74#[derive(Debug)]
76pub struct PlotAuditOptions<'a, 'b, PosTable>
77where
78 PosTable: Table,
79{
80 pub public_key_hash: &'a Blake3Hash,
82 pub shard_commitments_roots_cache: &'a ShardCommitmentsRootsCache,
84 pub slot_info: SlotInfo,
86 pub sectors_metadata: &'a [SectorMetadataChecksummed],
88 pub erasure_coding: &'a ErasureCoding,
90 pub sectors_being_modified: &'b HashSet<SectorIndex>,
93 pub table_generator: &'a PosTable::Generator,
95}
96
97impl<PosTable> Clone for PlotAuditOptions<'_, '_, PosTable>
98where
99 PosTable: Table,
100{
101 #[inline]
102 fn clone(&self) -> Self {
103 *self
104 }
105}
106
107impl<PosTable> Copy for PlotAuditOptions<'_, '_, PosTable> where PosTable: Table {}
108
109#[derive(Debug)]
111pub struct PlotAudit<Plot>(Plot)
112where
113 Plot: ReadAtSync;
114
115impl<'a, Plot> PlotAudit<Plot>
116where
117 Plot: ReadAtSync + 'a,
118{
119 pub fn new(plot: Plot) -> Self {
121 Self(plot)
122 }
123
124 #[expect(clippy::type_complexity)]
126 pub fn audit<'b, PosTable>(
127 &'a self,
128 options: PlotAuditOptions<'a, 'b, PosTable>,
129 ) -> Result<
130 Vec<(
131 SectorIndex,
132 impl ProvableSolutions<Item = Result<Solution, ProvingError>> + use<'a, Plot, PosTable> + 'a,
133 )>,
134 AuditingError,
135 >
136 where
137 PosTable: Table,
138 {
139 let PlotAuditOptions {
140 public_key_hash,
141 shard_commitments_roots_cache,
142 slot_info,
143 sectors_metadata,
144 erasure_coding,
145 sectors_being_modified,
146 table_generator,
147 } = options;
148
149 let audit_results = audit_plot_sync(
150 public_key_hash,
151 shard_commitments_roots_cache,
152 slot_info.shard_membership_entropy,
153 slot_info.num_shards,
154 &slot_info.global_challenge,
155 slot_info.solution_range,
156 &self.0,
157 sectors_metadata,
158 sectors_being_modified,
159 )?;
160
161 Ok(audit_results
162 .into_iter()
163 .filter_map(|audit_results| {
164 let sector_index = audit_results.sector_index;
165
166 let sector_solutions = audit_results
167 .solution_candidates
168 .into_solutions(erasure_coding, |seed: &PosSeed| {
169 table_generator.create_proofs_parallel(seed)
170 });
171
172 let sector_solutions = match sector_solutions {
173 Ok(solutions) => solutions,
174 Err(error) => {
175 warn!(
176 %error,
177 %sector_index,
178 "Failed to turn solution candidates into solutions",
179 );
180
181 return None;
182 }
183 };
184
185 if sector_solutions.len() == 0 {
186 return None;
187 }
188
189 Some((sector_index, sector_solutions))
190 })
191 .collect())
192 }
193}
194
195pub(super) struct FarmingOptions<NC, PlotAudit> {
196 pub(super) public_key_hash: Blake3Hash,
197 pub(super) shard_commitments_roots_cache: ShardCommitmentsRootsCache,
198 #[expect(
200 dead_code,
201 reason = "Reward address was removed from `Solution` and will need to be re-introduced later"
202 )]
203 pub(super) reward_address: Address,
204 pub(super) node_client: NC,
205 pub(super) plot_audit: PlotAudit,
206 pub(super) sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
207 pub(super) erasure_coding: ErasureCoding,
208 pub(super) handlers: Arc<Handlers>,
209 pub(super) sectors_being_modified: Arc<AsyncRwLock<HashSet<SectorIndex>>>,
210 pub(super) slot_info_notifications: mpsc::Receiver<SlotInfo>,
211 pub(super) thread_pool: ThreadPool,
212 pub(super) global_mutex: Arc<AsyncMutex<()>>,
213 pub(super) metrics: Option<Arc<SingleDiskFarmMetrics>>,
214}
215
216pub(super) async fn farming<'a, PosTable, NC, Plot>(
221 farming_options: FarmingOptions<NC, PlotAudit<Plot>>,
222) -> Result<(), FarmingError>
223where
224 PosTable: Table,
225 NC: NodeClient,
226 Plot: ReadAtSync + 'a,
227{
228 let FarmingOptions {
229 public_key_hash,
230 shard_commitments_roots_cache,
231 reward_address: _,
233 node_client,
234 plot_audit,
235 sectors_metadata,
236 erasure_coding,
237 handlers,
238 sectors_being_modified,
239 mut slot_info_notifications,
240 thread_pool,
241 global_mutex,
242 metrics,
243 } = farming_options;
244
245 let farmer_app_info = node_client
246 .farmer_app_info()
247 .await
248 .map_err(|error| FarmingError::FailedToGetFarmerInfo { error })?;
249
250 let farming_timeout = farmer_app_info.farming_timeout;
252
253 let table_generator = PosTable::generator();
255 let span = Span::current();
256
257 let mut non_fatal_errors = 0;
258
259 while let Some(slot_info) = slot_info_notifications.next().await {
260 let slot = slot_info.slot;
261
262 global_mutex.lock().await;
264
265 let mut problematic_sectors = Vec::new();
266 let result = try {
267 let start = Instant::now();
268 let sectors_metadata = sectors_metadata.read().await;
269
270 debug!(%slot, sector_count = %sectors_metadata.len(), "Reading sectors");
271
272 let mut sectors_solutions = {
273 let sectors_being_modified = &*sectors_being_modified.read().await;
274
275 thread_pool
276 .install(|| {
277 let _span_guard = span.enter();
278
279 plot_audit.audit(PlotAuditOptions::<PosTable> {
280 public_key_hash: &public_key_hash,
281 shard_commitments_roots_cache: &shard_commitments_roots_cache,
282 slot_info,
283 sectors_metadata: §ors_metadata,
284 erasure_coding: &erasure_coding,
285 sectors_being_modified,
286 table_generator: &table_generator,
287 })
288 })
289 .map_err(FarmingError::LowLevelAuditing)?
290 };
291
292 sectors_solutions.sort_by(|a, b| {
293 let a_solution_distance =
294 a.1.best_solution_distance()
295 .unwrap_or(SolutionDistance::MAX);
296 let b_solution_distance =
297 b.1.best_solution_distance()
298 .unwrap_or(SolutionDistance::MAX);
299
300 a_solution_distance.cmp(&b_solution_distance)
301 });
302
303 {
304 let time = start.elapsed();
305 if let Some(metrics) = &metrics {
306 metrics.auditing_time.observe(time.as_secs_f64());
307 }
308 handlers
309 .farming_notification
310 .call_simple(&FarmingNotification::Auditing(AuditingDetails {
311 sectors_count: sectors_metadata.len() as u16,
312 time,
313 }));
314 }
315
316 let _proving_guard = global_mutex.lock().await;
319
320 'solutions_processing: for (sector_index, mut sector_solutions) in sectors_solutions {
321 if sector_solutions.is_empty() {
322 continue;
323 }
324 let mut start = Instant::now();
325 while let Some(maybe_solution) = thread_pool.install(|| {
326 let _span_guard = span.enter();
327
328 sector_solutions.next()
329 }) {
330 let solution = match maybe_solution {
331 Ok(solution) => solution,
332 Err(error) => {
333 if let Some(metrics) = &metrics {
334 metrics
335 .observe_proving_time(&start.elapsed(), ProvingResult::Failed);
336 }
337 error!(
338 %slot,
339 %sector_index,
340 %error,
341 "Failed to prove, scheduling sector for replotting"
342 );
343 problematic_sectors.push(sector_index);
344 start = Instant::now();
347 continue;
348 }
349 };
350
351 debug!(%slot, %sector_index, "Solution found");
352 trace!(?solution, "Solution found");
353
354 {
355 let time = start.elapsed();
356 if time >= farming_timeout {
357 if let Some(metrics) = &metrics {
358 metrics.observe_proving_time(&time, ProvingResult::Timeout);
359 }
360 handlers.farming_notification.call_simple(
361 &FarmingNotification::Proving(ProvingDetails {
362 result: ProvingResult::Timeout,
363 time,
364 }),
365 );
366 warn!(
367 %slot,
368 %sector_index,
369 "Proving for solution skipped due to farming time limit",
370 );
371
372 break 'solutions_processing;
373 }
374 }
375
376 let response = SolutionResponse {
377 slot_number: slot,
378 solution,
379 };
380
381 handlers.solution.call_simple(&response);
382
383 if let Err(error) = node_client.submit_solution_response(response).await {
384 let time = start.elapsed();
385 if let Some(metrics) = &metrics {
386 metrics.observe_proving_time(&time, ProvingResult::Rejected);
387 }
388 handlers
389 .farming_notification
390 .call_simple(&FarmingNotification::Proving(ProvingDetails {
391 result: ProvingResult::Rejected,
392 time,
393 }));
394 warn!(
395 %slot,
396 %sector_index,
397 %error,
398 "Failed to send solution to node, skipping further proving for this slot",
399 );
400 break 'solutions_processing;
401 }
402
403 let time = start.elapsed();
404 if let Some(metrics) = &metrics {
405 metrics.observe_proving_time(&time, ProvingResult::Success);
406 }
407 handlers
408 .farming_notification
409 .call_simple(&FarmingNotification::Proving(ProvingDetails {
410 result: ProvingResult::Success,
411 time,
412 }));
413 start = Instant::now();
414 }
415 }
416 };
417
418 if let Err(error) = result {
419 if error.is_fatal() {
420 return Err(error);
421 }
422
423 non_fatal_errors += 1;
424
425 if non_fatal_errors >= NON_FATAL_ERROR_LIMIT {
426 return Err(error);
427 }
428
429 warn!(
430 %error,
431 "Non-fatal farming error"
432 );
433
434 if let Some(metrics) = &metrics {
435 metrics.note_farming_error(&error);
436 }
437 handlers
438 .farming_notification
439 .call_simple(&FarmingNotification::NonFatalError(Arc::new(error)));
440
441 for sector_index in problematic_sectors.drain(..) {
442 sectors_being_modified.write().await.insert(sector_index);
444 if let Some(existing_sector_metadata) = sectors_metadata
446 .write()
447 .await
448 .get_mut(usize::from(sector_index))
449 {
450 *existing_sector_metadata = SectorMetadataChecksummed::from(SectorMetadata {
451 sector_index,
452 pieces_in_sector: existing_sector_metadata.pieces_in_sector,
453 s_bucket_sizes: Box::new([0; Record::NUM_S_BUCKETS]),
454 history_size: HistorySize::from(SegmentIndex::ZERO),
455 });
456 }
457 sectors_being_modified.write().await.remove(§or_index);
459 }
460 } else {
461 non_fatal_errors = 0;
462 }
463 }
464
465 Ok(())
466}