ab_farmer/plotter/
pool.rs1use crate::plotter::{Plotter, SectorPlottingProgress};
4use ab_core_primitives::ed25519::Ed25519PublicKey;
5use ab_core_primitives::sectors::SectorIndex;
6use ab_core_primitives::solutions::ShardCommitmentHash;
7use ab_farmer_components::FarmerProtocolInfo;
8use async_trait::async_trait;
9use event_listener::Event;
10use futures::channel::mpsc;
11use futures::future;
12use std::any::type_name_of_val;
13use std::pin::pin;
14use std::time::Duration;
15use tracing::{error, trace};
16
17#[derive(Debug)]
22pub struct PoolPlotter {
23 plotters: Vec<Box<dyn Plotter + Send + Sync>>,
24 retry_interval: Duration,
25 notification: Event,
26}
27
28#[async_trait]
29impl Plotter for PoolPlotter {
30 async fn has_free_capacity(&self) -> Result<bool, String> {
31 for (index, plotter) in self.plotters.iter().enumerate() {
32 match plotter.has_free_capacity().await {
33 Ok(result) => {
34 if result {
35 return Ok(true);
36 }
37 }
38 Err(error) => {
39 error!(
40 %error,
41 %index,
42 r#type = type_name_of_val(plotter),
43 "Failed to check free capacity for plotter"
44 );
45 }
46 }
47 }
48
49 Ok(false)
50 }
51
52 async fn plot_sector(
53 &self,
54 public_key: Ed25519PublicKey,
55 shard_commitments_root: ShardCommitmentHash,
56 sector_index: SectorIndex,
57 farmer_protocol_info: FarmerProtocolInfo,
58 pieces_in_sector: u16,
59 replotting: bool,
60 progress_sender: mpsc::Sender<SectorPlottingProgress>,
61 ) {
62 loop {
63 for plotter in &self.plotters {
64 if plotter
65 .try_plot_sector(
66 public_key,
67 shard_commitments_root,
68 sector_index,
69 farmer_protocol_info,
70 pieces_in_sector,
71 replotting,
72 progress_sender.clone(),
73 )
74 .await
75 {
76 self.notification.notify_relaxed(1);
77 return;
78 }
79 }
80
81 trace!(
82 retry_interval = ?self.retry_interval,
83 "All plotters are busy, will wait and try again later"
84 );
85 future::select(
86 pin!(tokio::time::sleep(self.retry_interval)),
87 self.notification.listen(),
88 )
89 .await;
90 }
91 }
92
93 async fn try_plot_sector(
94 &self,
95 public_key: Ed25519PublicKey,
96 shard_commitments_root: ShardCommitmentHash,
97 sector_index: SectorIndex,
98 farmer_protocol_info: FarmerProtocolInfo,
99 pieces_in_sector: u16,
100 replotting: bool,
101 progress_sender: mpsc::Sender<SectorPlottingProgress>,
102 ) -> bool {
103 for plotter in &self.plotters {
104 if plotter
105 .try_plot_sector(
106 public_key,
107 shard_commitments_root,
108 sector_index,
109 farmer_protocol_info,
110 pieces_in_sector,
111 replotting,
112 progress_sender.clone(),
113 )
114 .await
115 {
116 self.notification.notify_relaxed(1);
117 return true;
118 }
119 }
120
121 false
122 }
123}
124
125impl PoolPlotter {
126 pub fn new(plotters: Vec<Box<dyn Plotter + Send + Sync>>, retry_interval: Duration) -> Self {
128 Self {
129 plotters,
130 retry_interval,
131 notification: Event::new(),
132 }
133 }
134}