ab_farmer/cluster/controller/
farms.rs

1//! This module exposed implementation of farms maintenance.
2//!
3//! The goal is to observe farms in a cluster and keep controller's data structures
4//! about which pieces are plotted in which sectors of which farm up to date. Implementation
5//! automatically handles dynamic farm addition and removal, etc.
6
7use crate::cluster::controller::ClusterControllerFarmerIdentifyBroadcast;
8use crate::cluster::controller::stream_map::StreamMap;
9use crate::cluster::farmer::{
10    ClusterFarm, ClusterFarmerFarmDetails, ClusterFarmerFarmDetailsRequest, ClusterFarmerId,
11    ClusterFarmerIdentifyBroadcast,
12};
13use crate::cluster::nats_client::NatsClient;
14use crate::farm::plotted_pieces::PlottedPieces;
15use crate::farm::{Farm, FarmId, SectorPlottingDetails, SectorUpdate};
16use anyhow::anyhow;
17use async_lock::RwLock as AsyncRwLock;
18use futures::stream::FuturesUnordered;
19use futures::{FutureExt, StreamExt, select};
20use parking_lot::Mutex;
21use std::collections::{HashMap, HashSet};
22use std::mem;
23use std::pin::pin;
24use std::sync::Arc;
25use std::time::{Duration, Instant};
26use tokio::sync::broadcast;
27use tokio::task;
28use tokio::time::MissedTickBehavior;
29use tracing::{debug, error, info, trace, warn};
30
31/// Number of farms in a cluster is currently limited to 2^16
32pub type FarmIndex = u16;
33
34enum FarmAddRemoveResult {
35    Add {
36        close_receiver: broadcast::Receiver<()>,
37        farm: ClusterFarm,
38    },
39    Remove {
40        farm_index: FarmIndex,
41    },
42}
43
44struct FarmerAddResult<I> {
45    close_receiver: broadcast::Receiver<()>,
46    added_farms: I,
47}
48
49#[derive(Debug)]
50struct KnownFarmer {
51    farmer_id: ClusterFarmerId,
52    last_identification: Instant,
53    known_farms: HashMap<FarmIndex, FarmId>,
54    close_sender: Option<broadcast::Sender<()>>,
55}
56
57impl KnownFarmer {
58    fn notify_cleanup(&mut self) -> bool {
59        let Some(close_sender) = self.close_sender.take() else {
60            return false;
61        };
62        let _ = close_sender.send(());
63        true
64    }
65}
66
67#[derive(Debug)]
68struct KnownFarmers {
69    identification_broadcast_interval: Duration,
70    known_farmers: Vec<KnownFarmer>,
71}
72
73impl KnownFarmers {
74    fn new(identification_broadcast_interval: Duration) -> Self {
75        Self {
76            identification_broadcast_interval,
77            known_farmers: Vec::new(),
78        }
79    }
80
81    /// Return `false` if the farmer is unknown and initialization is required
82    fn refresh(&mut self, farmer_id: ClusterFarmerId) -> bool {
83        self.known_farmers.iter_mut().any(|known_farmer| {
84            if known_farmer.farmer_id == farmer_id {
85                trace!(%farmer_id, "Updating last identification for farmer");
86                known_farmer.last_identification = Instant::now();
87                true
88            } else {
89                false
90            }
91        })
92    }
93
94    fn add(
95        &mut self,
96        farmer_id: ClusterFarmerId,
97        farms: Vec<ClusterFarmerFarmDetails>,
98    ) -> FarmerAddResult<impl Iterator<Item = (FarmIndex, ClusterFarmerFarmDetails)>> {
99        let farm_indices = self.pick_farm_indices(farms.len());
100        let farm_ids_to_add = farms
101            .iter()
102            .map(|farm_details| farm_details.farm_id)
103            .collect::<HashSet<FarmId>>();
104
105        if let Some(old_farmer) = self.known_farmers.iter_mut().find(|known_farmer| {
106            known_farmer
107                .known_farms
108                .values()
109                .any(|farm_id| farm_ids_to_add.contains(farm_id))
110        }) {
111            warn!(old_farmer_id = %old_farmer.farmer_id, "Some farms are already known, notify for cleanup them first");
112            old_farmer.notify_cleanup();
113        }
114
115        let (close_sender, close_receiver) = broadcast::channel(1);
116        self.known_farmers.push(KnownFarmer {
117            farmer_id,
118            last_identification: Instant::now(),
119            known_farms: farm_indices
120                .iter()
121                .copied()
122                .zip(farms.iter().map(|farm_details| farm_details.farm_id))
123                .collect(),
124            close_sender: Some(close_sender),
125        });
126
127        FarmerAddResult {
128            close_receiver,
129            added_farms: farm_indices.into_iter().zip(farms),
130        }
131    }
132
133    fn pick_farm_indices(&self, len: usize) -> Vec<u16> {
134        let used_indices = self
135            .known_farmers
136            .iter()
137            .flat_map(|known_farmer| known_farmer.known_farms.keys())
138            .collect::<HashSet<_>>();
139
140        let mut available_indices = Vec::with_capacity(len);
141
142        for farm_index in FarmIndex::MIN..=FarmIndex::MAX {
143            if !used_indices.contains(&farm_index) {
144                if available_indices.len() < len {
145                    available_indices.push(farm_index);
146                } else {
147                    return available_indices;
148                }
149            }
150        }
151
152        warn!(max_supported_farm_index = %FarmIndex::MAX, "Too many farms");
153        available_indices
154    }
155
156    fn remove_expired(&mut self) -> impl Iterator<Item = (ClusterFarmerId, &FarmIndex, &FarmId)> {
157        self.known_farmers
158            .iter_mut()
159            .filter_map(|known_farmer| {
160                if known_farmer.last_identification.elapsed()
161                    > self.identification_broadcast_interval * 2
162                    && known_farmer.notify_cleanup()
163                {
164                    Some(
165                        known_farmer
166                            .known_farms
167                            .iter()
168                            .map(|(farm_index, farm_id)| {
169                                (known_farmer.farmer_id, farm_index, farm_id)
170                            }),
171                    )
172                } else {
173                    None
174                }
175            })
176            .flatten()
177    }
178
179    fn remove_farm(&mut self, farm_index: FarmIndex) {
180        self.known_farmers.retain_mut(|known_farmer| {
181            if known_farmer.known_farms.contains_key(&farm_index) {
182                known_farmer.known_farms.remove(&farm_index);
183                !known_farmer.known_farms.is_empty()
184            } else {
185                true
186            }
187        });
188    }
189}
190
191/// Utility function for maintaining farms by controller in a cluster environment
192pub async fn maintain_farms(
193    instance: &str,
194    nats_client: &NatsClient,
195    plotted_pieces: &Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
196    identification_broadcast_interval: Duration,
197) -> anyhow::Result<()> {
198    let mut known_farmers = KnownFarmers::new(identification_broadcast_interval);
199
200    let mut farmers_to_add = StreamMap::default();
201    // Stream map for adding/removing farms
202    let mut farms_to_add_remove = StreamMap::default();
203    let mut farms = FuturesUnordered::new();
204
205    let farmer_identify_subscription = pin!(
206        nats_client
207            .subscribe_to_broadcasts::<ClusterFarmerIdentifyBroadcast>(None, None)
208            .await
209            .map_err(|error| anyhow!(
210                "Failed to subscribe to farmer identify broadcast: {error}"
211            ))?
212    );
213
214    // Request farmer to identify themselves
215    if let Err(error) = nats_client
216        .broadcast(&ClusterControllerFarmerIdentifyBroadcast, instance)
217        .await
218    {
219        warn!(%error, "Failed to send farmer identification broadcast");
220    }
221
222    let mut farmer_identify_subscription = farmer_identify_subscription.fuse();
223    let mut farm_pruning_interval = tokio::time::interval_at(
224        (Instant::now() + identification_broadcast_interval * 2).into(),
225        identification_broadcast_interval * 2,
226    );
227    farm_pruning_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
228
229    loop {
230        select! {
231            (farm_index, result) = farms.select_next_some() => {
232                farms_to_add_remove.push(farm_index, Box::pin(async move {
233                    let plotted_pieces = Arc::clone(plotted_pieces);
234
235                    let delete_farm_fut = task::spawn_blocking(move || {
236                        plotted_pieces.write_blocking().delete_farm(farm_index);
237                    });
238                    if let Err(error) = delete_farm_fut.await {
239                        error!(
240                            %farm_index,
241                            %error,
242                            "Failed to delete farm that exited"
243                        );
244                    }
245
246                    FarmAddRemoveResult::Remove { farm_index }
247                }));
248
249                match result {
250                    Ok(()) => {
251                        info!(%farm_index, "Farm exited successfully");
252                    }
253                    Err(error) => {
254                        error!(%farm_index, %error, "Farm exited with error");
255                    }
256                }
257            }
258            maybe_identify_message = farmer_identify_subscription.next() => {
259                let Some(identify_message) = maybe_identify_message else {
260                    return Err(anyhow!("Farmer identify stream ended"));
261                };
262                let ClusterFarmerIdentifyBroadcast {
263                    farmer_id,
264                } = identify_message;
265
266                if known_farmers.refresh(farmer_id) {
267                    trace!(
268                        %farmer_id,
269                        "Received identification for already known farmer"
270                    );
271                } else if farmers_to_add.add_if_not_in_progress(farmer_id, Box::pin(collect_farmer_farms(farmer_id, nats_client))) {
272                    debug!(
273                        %farmer_id,
274                        "Received identification for new farmer, collecting farms"
275                    );
276                } else {
277                    debug!(
278                        %farmer_id,
279                        "Received identification for new farmer, which is already in progress"
280                    );
281                }
282            }
283            (farmer_id, maybe_farms) = farmers_to_add.select_next_some() => {
284                let Ok(farms) = maybe_farms.inspect_err(|error| {
285                    warn!(
286                        %farmer_id,
287                        %error,
288                        "Failed to collect farms to add, may retry later"
289                    );
290                }) else {
291                    continue;
292                };
293
294                let farm_add_result = known_farmers.add(farmer_id, farms);
295                let FarmerAddResult {
296                    close_receiver,
297                    added_farms,
298                } = farm_add_result;
299                for (farm_index, farm_details) in added_farms {
300                    let ClusterFarmerFarmDetails {
301                        farm_id,
302                        total_sectors_count,
303                    } = farm_details;
304
305                    let plotted_pieces = Arc::clone(plotted_pieces);
306                    let close_receiver = close_receiver.resubscribe();
307                    farms_to_add_remove.push(
308                        farm_index,
309                        Box::pin(async move {
310                            match initialize_farm(
311                                farm_index,
312                                farm_id,
313                                total_sectors_count,
314                                plotted_pieces,
315                                nats_client,
316                            )
317                            .await
318                            {
319                                Ok(farm) => {
320                                    info!(
321                                        %farmer_id,
322                                        %farm_index,
323                                        %farm_id,
324                                        "Farm initialized successfully"
325                                    );
326
327                                    FarmAddRemoveResult::Add {
328                                        close_receiver,
329                                        farm,
330                                    }
331                                }
332                                Err(error) => {
333                                    warn!(
334                                        %farmer_id,
335                                        %farm_index,
336                                        %farm_id,
337                                        %error,
338                                        "Failed to initialize farm"
339                                    );
340                                    // We should remove the farm if it failed to initialize
341                                    FarmAddRemoveResult::Remove { farm_index }
342                                }
343                            }
344                        }),
345                    );
346                }
347            }
348            _ = farm_pruning_interval.tick().fuse() => {
349                for (farmer_id, farm_index, farm_id) in known_farmers.remove_expired() {
350                    warn!(
351                        %farmer_id,
352                        %farm_index,
353                        %farm_id,
354                        "Farm expired, notify for cleanup"
355                    );
356                }
357            }
358            (farm_index, result) = farms_to_add_remove.select_next_some() => {
359                match result {
360                    FarmAddRemoveResult::Add {
361                        mut close_receiver,
362                        farm,
363                    } => {
364                        farms.push(async move {
365                            select! {
366                                result = farm.run().fuse() => {
367                                    (farm_index, result)
368                                }
369                                _ = close_receiver.recv().fuse() => {
370                                    // Nothing to do
371                                    (farm_index, Ok(()))
372                                }
373                            }
374                        });
375                    }
376                    FarmAddRemoveResult::Remove { farm_index } => {
377                        known_farmers.remove_farm(farm_index);
378                    }
379                }
380            }
381        }
382    }
383}
384
385/// Collect `ClusterFarmerFarmDetails` from the farmer by sending a stream request
386async fn collect_farmer_farms(
387    farmer_id: ClusterFarmerId,
388    nats_client: &NatsClient,
389) -> anyhow::Result<Vec<ClusterFarmerFarmDetails>> {
390    trace!(%farmer_id, "Requesting farmer farm details");
391    Ok(nats_client
392        .stream_request(
393            &ClusterFarmerFarmDetailsRequest,
394            Some(&farmer_id.to_string()),
395        )
396        .await
397        .inspect_err(|error| {
398            warn!(
399                %error,
400                %farmer_id,
401                "Failed to request farmer farm details"
402            )
403        })?
404        .collect()
405        .await)
406}
407
408async fn initialize_farm(
409    farm_index: FarmIndex,
410    farm_id: FarmId,
411    total_sectors_count: u16,
412    plotted_pieces: Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
413    nats_client: &NatsClient,
414) -> anyhow::Result<ClusterFarm> {
415    let farm = ClusterFarm::new(farm_id, total_sectors_count, nats_client.clone())
416        .await
417        .map_err(|error| anyhow!("Failed instantiate cluster farm {farm_id}: {error}"))?;
418
419    // Buffer sectors that are plotted while already plotted sectors are being iterated over
420    let plotted_sectors_buffer = Arc::new(Mutex::new(Vec::new()));
421    let sector_update_handler = farm.on_sector_update(Arc::new({
422        let plotted_sectors_buffer = Arc::clone(&plotted_sectors_buffer);
423
424        move |(_sector_index, sector_update)| {
425            if let SectorUpdate::Plotting(SectorPlottingDetails::Finished {
426                plotted_sector,
427                old_plotted_sector,
428                ..
429            }) = sector_update
430            {
431                plotted_sectors_buffer
432                    .lock()
433                    .push((plotted_sector.clone(), old_plotted_sector.clone()));
434            }
435        }
436    }));
437
438    // Add plotted sectors of the farm to global plotted pieces
439    let plotted_sectors = farm.plotted_sectors();
440    let mut plotted_sectors = plotted_sectors
441        .get()
442        .await
443        .map_err(|error| anyhow!("Failed to get plotted sectors for farm {farm_id}: {error}"))?;
444
445    {
446        plotted_pieces
447            .write()
448            .await
449            .add_farm(farm_index, farm.piece_reader());
450
451        while let Some(plotted_sector_result) = plotted_sectors.next().await {
452            let plotted_sector = plotted_sector_result.map_err(|error| {
453                anyhow!("Failed to get plotted sector for farm {farm_id}: {error}")
454            })?;
455
456            let mut plotted_pieces_guard = plotted_pieces.write().await;
457            plotted_pieces_guard.add_sector(farm_index, &plotted_sector);
458
459            // Drop the guard immediately to make sure other tasks are able to access the plotted pieces
460            drop(plotted_pieces_guard);
461
462            task::yield_now().await;
463        }
464    }
465
466    // Add sectors that were plotted while above iteration was happening to plotted sectors
467    // too
468    drop(sector_update_handler);
469    let plotted_sectors_buffer = mem::take(&mut *plotted_sectors_buffer.lock());
470    let add_buffered_sectors_fut = task::spawn_blocking(move || {
471        let mut plotted_pieces = plotted_pieces.write_blocking();
472
473        for (plotted_sector, old_plotted_sector) in plotted_sectors_buffer {
474            if let Some(old_plotted_sector) = old_plotted_sector {
475                plotted_pieces.delete_sector(farm_index, &old_plotted_sector);
476            }
477            // Call delete first to avoid adding duplicates
478            plotted_pieces.delete_sector(farm_index, &plotted_sector);
479            plotted_pieces.add_sector(farm_index, &plotted_sector);
480        }
481    });
482
483    add_buffered_sectors_fut
484        .await
485        .map_err(|error| anyhow!("Failed to add buffered sectors for farm {farm_id}: {error}"))?;
486
487    Ok(farm)
488}