ab_farmer/single_disk_farm/
farming.rs1pub mod rayon_files;
7
8use crate::farm::{
9 AuditingDetails, FarmingError, FarmingNotification, ProvingDetails, ProvingResult,
10};
11use crate::node_client::NodeClient;
12use crate::single_disk_farm::Handlers;
13use crate::single_disk_farm::metrics::SingleDiskFarmMetrics;
14use ab_core_primitives::address::Address;
15use ab_core_primitives::hashes::Blake3Hash;
16use ab_core_primitives::pieces::Record;
17use ab_core_primitives::pos::PosSeed;
18use ab_core_primitives::sectors::SectorIndex;
19use ab_core_primitives::segments::{HistorySize, SegmentIndex};
20use ab_core_primitives::solutions::{Solution, SolutionDistance};
21use ab_erasure_coding::ErasureCoding;
22use ab_farmer_components::ReadAtSync;
23use ab_farmer_components::auditing::{AuditingError, audit_plot_sync};
24use ab_farmer_components::proving::{ProvableSolutions, ProvingError};
25use ab_farmer_components::sector::{SectorMetadata, SectorMetadataChecksummed};
26use ab_farmer_rpc_primitives::{SlotInfo, SolutionResponse};
27use ab_proof_of_space::{Table, TableGenerator};
28use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
29use futures::StreamExt;
30use futures::channel::mpsc;
31use rayon::ThreadPool;
32use std::collections::HashSet;
33use std::sync::Arc;
34use std::time::Instant;
35use tracing::{Span, debug, error, info, trace, warn};
36
37const NON_FATAL_ERROR_LIMIT: usize = 10;
39
40pub(super) async fn slot_notification_forwarder<NC>(
41 node_client: &NC,
42 mut slot_info_forwarder_sender: mpsc::Sender<SlotInfo>,
43 metrics: Option<Arc<SingleDiskFarmMetrics>>,
44) -> Result<(), FarmingError>
45where
46 NC: NodeClient,
47{
48 info!("Subscribing to slot info notifications");
49
50 let mut slot_info_notifications = node_client
51 .subscribe_slot_info()
52 .await
53 .map_err(|error| FarmingError::FailedToSubscribeSlotInfo { error })?;
54
55 while let Some(slot_info) = slot_info_notifications.next().await {
56 debug!(?slot_info, "New slot");
57
58 let slot = slot_info.slot_number;
59
60 if slot_info_forwarder_sender.try_send(slot_info).is_err() {
63 if let Some(metrics) = &metrics {
64 metrics.skipped_slots.inc();
65 }
66 debug!(%slot, "Slow farming, skipping slot");
67 }
68 }
69
70 Err(FarmingError::SlotNotificationStreamEnded)
71}
72
73#[derive(Debug)]
75pub struct PlotAuditOptions<'a, 'b, PosTable>
76where
77 PosTable: Table,
78{
79 pub public_key_hash: &'a Blake3Hash,
81 pub slot_info: SlotInfo,
83 pub sectors_metadata: &'a [SectorMetadataChecksummed],
85 pub erasure_coding: &'a ErasureCoding,
87 pub sectors_being_modified: &'b HashSet<SectorIndex>,
90 pub table_generator: &'a PosTable::Generator,
92}
93
94impl<PosTable> Clone for PlotAuditOptions<'_, '_, PosTable>
95where
96 PosTable: Table,
97{
98 #[inline]
99 fn clone(&self) -> Self {
100 *self
101 }
102}
103
104impl<PosTable> Copy for PlotAuditOptions<'_, '_, PosTable> where PosTable: Table {}
105
106#[derive(Debug)]
108pub struct PlotAudit<Plot>(Plot)
109where
110 Plot: ReadAtSync;
111
112impl<'a, Plot> PlotAudit<Plot>
113where
114 Plot: ReadAtSync + 'a,
115{
116 pub fn new(plot: Plot) -> Self {
118 Self(plot)
119 }
120
121 #[allow(clippy::type_complexity)]
123 pub fn audit<'b, PosTable>(
124 &'a self,
125 options: PlotAuditOptions<'a, 'b, PosTable>,
126 ) -> Result<
127 Vec<(
128 SectorIndex,
129 impl ProvableSolutions<Item = Result<Solution, ProvingError>> + use<'a, Plot, PosTable> + 'a,
130 )>,
131 AuditingError,
132 >
133 where
134 PosTable: Table,
135 {
136 let PlotAuditOptions {
137 public_key_hash,
138 slot_info,
139 sectors_metadata,
140 erasure_coding,
141 sectors_being_modified,
142 table_generator,
143 } = options;
144
145 let audit_results = audit_plot_sync(
146 public_key_hash,
147 &slot_info.global_challenge,
148 slot_info.solution_range,
149 &self.0,
150 sectors_metadata,
151 sectors_being_modified,
152 )?;
153
154 Ok(audit_results
155 .into_iter()
156 .filter_map(|audit_results| {
157 let sector_index = audit_results.sector_index;
158
159 let sector_solutions = audit_results
160 .solution_candidates
161 .into_solutions(erasure_coding, |seed: &PosSeed| {
162 table_generator.create_proofs_parallel(seed)
163 });
164
165 let sector_solutions = match sector_solutions {
166 Ok(solutions) => solutions,
167 Err(error) => {
168 warn!(
169 %error,
170 %sector_index,
171 "Failed to turn solution candidates into solutions",
172 );
173
174 return None;
175 }
176 };
177
178 if sector_solutions.len() == 0 {
179 return None;
180 }
181
182 Some((sector_index, sector_solutions))
183 })
184 .collect())
185 }
186}
187
188pub(super) struct FarmingOptions<NC, PlotAudit> {
189 pub(super) public_key_hash: Blake3Hash,
190 #[expect(
192 dead_code,
193 reason = "Reward address was removed from `Solution` and will need to be re-introduced later"
194 )]
195 pub(super) reward_address: Address,
196 pub(super) node_client: NC,
197 pub(super) plot_audit: PlotAudit,
198 pub(super) sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
199 pub(super) erasure_coding: ErasureCoding,
200 pub(super) handlers: Arc<Handlers>,
201 pub(super) sectors_being_modified: Arc<AsyncRwLock<HashSet<SectorIndex>>>,
202 pub(super) slot_info_notifications: mpsc::Receiver<SlotInfo>,
203 pub(super) thread_pool: ThreadPool,
204 pub(super) global_mutex: Arc<AsyncMutex<()>>,
205 pub(super) metrics: Option<Arc<SingleDiskFarmMetrics>>,
206}
207
208pub(super) async fn farming<'a, PosTable, NC, Plot>(
213 farming_options: FarmingOptions<NC, PlotAudit<Plot>>,
214) -> Result<(), FarmingError>
215where
216 PosTable: Table,
217 NC: NodeClient,
218 Plot: ReadAtSync + 'a,
219{
220 let FarmingOptions {
221 public_key_hash,
222 reward_address: _,
224 node_client,
225 plot_audit,
226 sectors_metadata,
227 erasure_coding,
228 handlers,
229 sectors_being_modified,
230 mut slot_info_notifications,
231 thread_pool,
232 global_mutex,
233 metrics,
234 } = farming_options;
235
236 let farmer_app_info = node_client
237 .farmer_app_info()
238 .await
239 .map_err(|error| FarmingError::FailedToGetFarmerInfo { error })?;
240
241 let farming_timeout = farmer_app_info.farming_timeout;
243
244 let table_generator = PosTable::generator();
246 let span = Span::current();
247
248 let mut non_fatal_errors = 0;
249
250 while let Some(slot_info) = slot_info_notifications.next().await {
251 let slot = slot_info.slot_number;
252
253 global_mutex.lock().await;
255
256 let mut problematic_sectors = Vec::new();
257 let result: Result<(), FarmingError> = try {
258 let start = Instant::now();
259 let sectors_metadata = sectors_metadata.read().await;
260
261 debug!(%slot, sector_count = %sectors_metadata.len(), "Reading sectors");
262
263 let mut sectors_solutions = {
264 let sectors_being_modified = &*sectors_being_modified.read().await;
265
266 thread_pool.install(|| {
267 let _span_guard = span.enter();
268
269 plot_audit.audit(PlotAuditOptions::<PosTable> {
270 public_key_hash: &public_key_hash,
271 slot_info,
272 sectors_metadata: §ors_metadata,
273 erasure_coding: &erasure_coding,
274 sectors_being_modified,
275 table_generator: &table_generator,
276 })
277 })?
278 };
279
280 sectors_solutions.sort_by(|a, b| {
281 let a_solution_distance =
282 a.1.best_solution_distance()
283 .unwrap_or(SolutionDistance::MAX);
284 let b_solution_distance =
285 b.1.best_solution_distance()
286 .unwrap_or(SolutionDistance::MAX);
287
288 a_solution_distance.cmp(&b_solution_distance)
289 });
290
291 {
292 let time = start.elapsed();
293 if let Some(metrics) = &metrics {
294 metrics.auditing_time.observe(time.as_secs_f64());
295 }
296 handlers
297 .farming_notification
298 .call_simple(&FarmingNotification::Auditing(AuditingDetails {
299 sectors_count: sectors_metadata.len() as u16,
300 time,
301 }));
302 }
303
304 let _proving_guard = global_mutex.lock().await;
307
308 'solutions_processing: for (sector_index, mut sector_solutions) in sectors_solutions {
309 if sector_solutions.is_empty() {
310 continue;
311 }
312 let mut start = Instant::now();
313 while let Some(maybe_solution) = thread_pool.install(|| {
314 let _span_guard = span.enter();
315
316 sector_solutions.next()
317 }) {
318 let solution = match maybe_solution {
319 Ok(solution) => solution,
320 Err(error) => {
321 if let Some(metrics) = &metrics {
322 metrics
323 .observe_proving_time(&start.elapsed(), ProvingResult::Failed);
324 }
325 error!(
326 %slot,
327 %sector_index,
328 %error,
329 "Failed to prove, scheduling sector for replotting"
330 );
331 problematic_sectors.push(sector_index);
332 start = Instant::now();
335 continue;
336 }
337 };
338
339 debug!(%slot, %sector_index, "Solution found");
340 trace!(?solution, "Solution found");
341
342 {
343 let time = start.elapsed();
344 if time >= farming_timeout {
345 if let Some(metrics) = &metrics {
346 metrics.observe_proving_time(&time, ProvingResult::Timeout);
347 }
348 handlers.farming_notification.call_simple(
349 &FarmingNotification::Proving(ProvingDetails {
350 result: ProvingResult::Timeout,
351 time,
352 }),
353 );
354 warn!(
355 %slot,
356 %sector_index,
357 "Proving for solution skipped due to farming time limit",
358 );
359
360 break 'solutions_processing;
361 }
362 }
363
364 let response = SolutionResponse {
365 slot_number: slot,
366 solution,
367 };
368
369 handlers.solution.call_simple(&response);
370
371 if let Err(error) = node_client.submit_solution_response(response).await {
372 let time = start.elapsed();
373 if let Some(metrics) = &metrics {
374 metrics.observe_proving_time(&time, ProvingResult::Rejected);
375 }
376 handlers
377 .farming_notification
378 .call_simple(&FarmingNotification::Proving(ProvingDetails {
379 result: ProvingResult::Rejected,
380 time,
381 }));
382 warn!(
383 %slot,
384 %sector_index,
385 %error,
386 "Failed to send solution to node, skipping further proving for this slot",
387 );
388 break 'solutions_processing;
389 }
390
391 let time = start.elapsed();
392 if let Some(metrics) = &metrics {
393 metrics.observe_proving_time(&time, ProvingResult::Success);
394 }
395 handlers
396 .farming_notification
397 .call_simple(&FarmingNotification::Proving(ProvingDetails {
398 result: ProvingResult::Success,
399 time,
400 }));
401 start = Instant::now();
402 }
403 }
404 };
405
406 if let Err(error) = result {
407 if error.is_fatal() {
408 return Err(error);
409 }
410
411 non_fatal_errors += 1;
412
413 if non_fatal_errors >= NON_FATAL_ERROR_LIMIT {
414 return Err(error);
415 }
416
417 warn!(
418 %error,
419 "Non-fatal farming error"
420 );
421
422 if let Some(metrics) = &metrics {
423 metrics.note_farming_error(&error);
424 }
425 handlers
426 .farming_notification
427 .call_simple(&FarmingNotification::NonFatalError(Arc::new(error)));
428
429 for sector_index in problematic_sectors.drain(..) {
430 sectors_being_modified.write().await.insert(sector_index);
432 if let Some(existing_sector_metadata) = sectors_metadata
434 .write()
435 .await
436 .get_mut(usize::from(sector_index))
437 {
438 *existing_sector_metadata = SectorMetadataChecksummed::from(SectorMetadata {
439 sector_index,
440 pieces_in_sector: existing_sector_metadata.pieces_in_sector,
441 s_bucket_sizes: Box::new([0; Record::NUM_S_BUCKETS]),
442 history_size: HistorySize::from(SegmentIndex::ZERO),
443 });
444 }
445 sectors_being_modified.write().await.remove(§or_index);
447 }
448 } else {
449 non_fatal_errors = 0;
450 }
451 }
452
453 Ok(())
454}