ab_farmer/plotter/
pool.rs

1//! Pool plotter
2
3use crate::plotter::{Plotter, SectorPlottingProgress};
4use ab_core_primitives::ed25519::Ed25519PublicKey;
5use ab_core_primitives::sectors::SectorIndex;
6use ab_core_primitives::solutions::ShardCommitmentHash;
7use ab_farmer_components::FarmerProtocolInfo;
8use async_trait::async_trait;
9use event_listener::Event;
10use futures::channel::mpsc;
11use futures::future;
12use std::any::type_name_of_val;
13use std::pin::pin;
14use std::time::Duration;
15use tracing::{error, trace};
16
17/// Pool plotter.
18///
19/// This plotter implementation relies on retries and should only be used with local plotter
20/// implementations (like CPU and GPU).
21#[derive(Debug)]
22pub struct PoolPlotter {
23    plotters: Vec<Box<dyn Plotter + Send + Sync>>,
24    retry_interval: Duration,
25    notification: Event,
26}
27
28#[async_trait]
29impl Plotter for PoolPlotter {
30    async fn has_free_capacity(&self) -> Result<bool, String> {
31        for (index, plotter) in self.plotters.iter().enumerate() {
32            match plotter.has_free_capacity().await {
33                Ok(result) => {
34                    if result {
35                        return Ok(true);
36                    }
37                }
38                Err(error) => {
39                    error!(
40                        %error,
41                        %index,
42                        r#type = type_name_of_val(plotter),
43                        "Failed to check free capacity for plotter"
44                    );
45                }
46            }
47        }
48
49        Ok(false)
50    }
51
52    async fn plot_sector(
53        &self,
54        public_key: Ed25519PublicKey,
55        shard_commitments_root: ShardCommitmentHash,
56        sector_index: SectorIndex,
57        farmer_protocol_info: FarmerProtocolInfo,
58        pieces_in_sector: u16,
59        replotting: bool,
60        progress_sender: mpsc::Sender<SectorPlottingProgress>,
61    ) {
62        loop {
63            for plotter in &self.plotters {
64                if plotter
65                    .try_plot_sector(
66                        public_key,
67                        shard_commitments_root,
68                        sector_index,
69                        farmer_protocol_info,
70                        pieces_in_sector,
71                        replotting,
72                        progress_sender.clone(),
73                    )
74                    .await
75                {
76                    self.notification.notify_relaxed(1);
77                    return;
78                }
79            }
80
81            trace!(
82                retry_interval = ?self.retry_interval,
83                "All plotters are busy, will wait and try again later"
84            );
85            future::select(
86                pin!(tokio::time::sleep(self.retry_interval)),
87                self.notification.listen(),
88            )
89            .await;
90        }
91    }
92
93    async fn try_plot_sector(
94        &self,
95        public_key: Ed25519PublicKey,
96        shard_commitments_root: ShardCommitmentHash,
97        sector_index: SectorIndex,
98        farmer_protocol_info: FarmerProtocolInfo,
99        pieces_in_sector: u16,
100        replotting: bool,
101        progress_sender: mpsc::Sender<SectorPlottingProgress>,
102    ) -> bool {
103        for plotter in &self.plotters {
104            if plotter
105                .try_plot_sector(
106                    public_key,
107                    shard_commitments_root,
108                    sector_index,
109                    farmer_protocol_info,
110                    pieces_in_sector,
111                    replotting,
112                    progress_sender.clone(),
113                )
114                .await
115            {
116                self.notification.notify_relaxed(1);
117                return true;
118            }
119        }
120
121        false
122    }
123}
124
125impl PoolPlotter {
126    /// Create a new instance
127    pub fn new(plotters: Vec<Box<dyn Plotter + Send + Sync>>, retry_interval: Duration) -> Self {
128        Self {
129            plotters,
130            retry_interval,
131            notification: Event::new(),
132        }
133    }
134}