ab_farmer/cluster/controller/
farms.rs1use crate::cluster::controller::ClusterControllerFarmerIdentifyBroadcast;
8use crate::cluster::controller::stream_map::StreamMap;
9use crate::cluster::farmer::{
10 ClusterFarm, ClusterFarmerFarmDetails, ClusterFarmerFarmDetailsRequest, ClusterFarmerId,
11 ClusterFarmerIdentifyBroadcast,
12};
13use crate::cluster::nats_client::NatsClient;
14use crate::farm::plotted_pieces::PlottedPieces;
15use crate::farm::{Farm, FarmId, SectorPlottingDetails, SectorUpdate};
16use anyhow::anyhow;
17use async_lock::RwLock as AsyncRwLock;
18use futures::stream::FuturesUnordered;
19use futures::{FutureExt, StreamExt, select};
20use parking_lot::Mutex;
21use std::collections::{HashMap, HashSet};
22use std::mem;
23use std::pin::pin;
24use std::sync::Arc;
25use std::time::{Duration, Instant};
26use tokio::sync::broadcast;
27use tokio::task;
28use tokio::time::MissedTickBehavior;
29use tracing::{debug, error, info, trace, warn};
30
31pub type FarmIndex = u16;
33
34enum FarmAddRemoveResult {
35 Add {
36 close_receiver: broadcast::Receiver<()>,
37 farm: ClusterFarm,
38 },
39 Remove {
40 farm_index: FarmIndex,
41 },
42}
43
44struct FarmerAddResult<I> {
45 close_receiver: broadcast::Receiver<()>,
46 added_farms: I,
47}
48
49#[derive(Debug)]
50struct KnownFarmer {
51 farmer_id: ClusterFarmerId,
52 last_identification: Instant,
53 known_farms: HashMap<FarmIndex, FarmId>,
54 close_sender: Option<broadcast::Sender<()>>,
55}
56
57impl KnownFarmer {
58 fn notify_cleanup(&mut self) -> bool {
59 let Some(close_sender) = self.close_sender.take() else {
60 return false;
61 };
62 let _ = close_sender.send(());
63 true
64 }
65}
66
67#[derive(Debug)]
68struct KnownFarmers {
69 identification_broadcast_interval: Duration,
70 known_farmers: Vec<KnownFarmer>,
71}
72
73impl KnownFarmers {
74 fn new(identification_broadcast_interval: Duration) -> Self {
75 Self {
76 identification_broadcast_interval,
77 known_farmers: Vec::new(),
78 }
79 }
80
81 fn refresh(&mut self, farmer_id: ClusterFarmerId) -> bool {
83 self.known_farmers.iter_mut().any(|known_farmer| {
84 if known_farmer.farmer_id == farmer_id {
85 trace!(%farmer_id, "Updating last identification for farmer");
86 known_farmer.last_identification = Instant::now();
87 true
88 } else {
89 false
90 }
91 })
92 }
93
94 fn add(
95 &mut self,
96 farmer_id: ClusterFarmerId,
97 farms: Vec<ClusterFarmerFarmDetails>,
98 ) -> FarmerAddResult<impl Iterator<Item = (FarmIndex, ClusterFarmerFarmDetails)>> {
99 let farm_indices = self.pick_farm_indices(farms.len());
100 let farm_ids_to_add = farms
101 .iter()
102 .map(|farm_details| farm_details.farm_id)
103 .collect::<HashSet<FarmId>>();
104
105 if let Some(old_farmer) = self.known_farmers.iter_mut().find(|known_farmer| {
106 known_farmer
107 .known_farms
108 .values()
109 .any(|farm_id| farm_ids_to_add.contains(farm_id))
110 }) {
111 warn!(old_farmer_id = %old_farmer.farmer_id, "Some farms are already known, notify for cleanup them first");
112 old_farmer.notify_cleanup();
113 }
114
115 let (close_sender, close_receiver) = broadcast::channel(1);
116 self.known_farmers.push(KnownFarmer {
117 farmer_id,
118 last_identification: Instant::now(),
119 known_farms: farm_indices
120 .iter()
121 .copied()
122 .zip(farms.iter().map(|farm_details| farm_details.farm_id))
123 .collect(),
124 close_sender: Some(close_sender),
125 });
126
127 FarmerAddResult {
128 close_receiver,
129 added_farms: farm_indices.into_iter().zip(farms),
130 }
131 }
132
133 fn pick_farm_indices(&self, len: usize) -> Vec<u16> {
134 let used_indices = self
135 .known_farmers
136 .iter()
137 .flat_map(|known_farmer| known_farmer.known_farms.keys())
138 .collect::<HashSet<_>>();
139
140 let mut available_indices = Vec::with_capacity(len);
141
142 for farm_index in FarmIndex::MIN..=FarmIndex::MAX {
143 if !used_indices.contains(&farm_index) {
144 if available_indices.len() < len {
145 available_indices.push(farm_index);
146 } else {
147 return available_indices;
148 }
149 }
150 }
151
152 warn!(max_supported_farm_index = %FarmIndex::MAX, "Too many farms");
153 available_indices
154 }
155
156 fn remove_expired(&mut self) -> impl Iterator<Item = (ClusterFarmerId, &FarmIndex, &FarmId)> {
157 self.known_farmers
158 .iter_mut()
159 .filter_map(|known_farmer| {
160 if known_farmer.last_identification.elapsed()
161 > self.identification_broadcast_interval * 2
162 && known_farmer.notify_cleanup()
163 {
164 Some(
165 known_farmer
166 .known_farms
167 .iter()
168 .map(|(farm_index, farm_id)| {
169 (known_farmer.farmer_id, farm_index, farm_id)
170 }),
171 )
172 } else {
173 None
174 }
175 })
176 .flatten()
177 }
178
179 fn remove_farm(&mut self, farm_index: FarmIndex) {
180 self.known_farmers.retain_mut(|known_farmer| {
181 if known_farmer.known_farms.contains_key(&farm_index) {
182 known_farmer.known_farms.remove(&farm_index);
183 !known_farmer.known_farms.is_empty()
184 } else {
185 true
186 }
187 });
188 }
189}
190
191pub async fn maintain_farms(
193 instance: &str,
194 nats_client: &NatsClient,
195 plotted_pieces: &Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
196 identification_broadcast_interval: Duration,
197) -> anyhow::Result<()> {
198 let mut known_farmers = KnownFarmers::new(identification_broadcast_interval);
199
200 let mut farmers_to_add = StreamMap::default();
201 let mut farms_to_add_remove = StreamMap::default();
203 let mut farms = FuturesUnordered::new();
204
205 let farmer_identify_subscription = pin!(
206 nats_client
207 .subscribe_to_broadcasts::<ClusterFarmerIdentifyBroadcast>(None, None)
208 .await
209 .map_err(|error| anyhow!(
210 "Failed to subscribe to farmer identify broadcast: {error}"
211 ))?
212 );
213
214 if let Err(error) = nats_client
216 .broadcast(&ClusterControllerFarmerIdentifyBroadcast, instance)
217 .await
218 {
219 warn!(%error, "Failed to send farmer identification broadcast");
220 }
221
222 let mut farmer_identify_subscription = farmer_identify_subscription.fuse();
223 let mut farm_pruning_interval = tokio::time::interval_at(
224 (Instant::now() + identification_broadcast_interval * 2).into(),
225 identification_broadcast_interval * 2,
226 );
227 farm_pruning_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
228
229 loop {
230 select! {
231 (farm_index, result) = farms.select_next_some() => {
232 farms_to_add_remove.push(farm_index, Box::pin(async move {
233 let plotted_pieces = Arc::clone(plotted_pieces);
234
235 let delete_farm_fut = task::spawn_blocking(move || {
236 plotted_pieces.write_blocking().delete_farm(farm_index);
237 });
238 if let Err(error) = delete_farm_fut.await {
239 error!(
240 %farm_index,
241 %error,
242 "Failed to delete farm that exited"
243 );
244 }
245
246 FarmAddRemoveResult::Remove { farm_index }
247 }));
248
249 match result {
250 Ok(()) => {
251 info!(%farm_index, "Farm exited successfully");
252 }
253 Err(error) => {
254 error!(%farm_index, %error, "Farm exited with error");
255 }
256 }
257 }
258 maybe_identify_message = farmer_identify_subscription.next() => {
259 let Some(identify_message) = maybe_identify_message else {
260 return Err(anyhow!("Farmer identify stream ended"));
261 };
262 let ClusterFarmerIdentifyBroadcast {
263 farmer_id,
264 } = identify_message;
265
266 if known_farmers.refresh(farmer_id) {
267 trace!(
268 %farmer_id,
269 "Received identification for already known farmer"
270 );
271 } else if farmers_to_add.add_if_not_in_progress(farmer_id, Box::pin(collect_farmer_farms(farmer_id, nats_client))) {
272 debug!(
273 %farmer_id,
274 "Received identification for new farmer, collecting farms"
275 );
276 } else {
277 debug!(
278 %farmer_id,
279 "Received identification for new farmer, which is already in progress"
280 );
281 }
282 }
283 (farmer_id, maybe_farms) = farmers_to_add.select_next_some() => {
284 let Ok(farms) = maybe_farms.inspect_err(|error| {
285 warn!(
286 %farmer_id,
287 %error,
288 "Failed to collect farms to add, may retry later"
289 );
290 }) else {
291 continue;
292 };
293
294 let farm_add_result = known_farmers.add(farmer_id, farms);
295 let FarmerAddResult {
296 close_receiver,
297 added_farms,
298 } = farm_add_result;
299 for (farm_index, farm_details) in added_farms {
300 let ClusterFarmerFarmDetails {
301 farm_id,
302 total_sectors_count,
303 } = farm_details;
304
305 let plotted_pieces = Arc::clone(plotted_pieces);
306 let close_receiver = close_receiver.resubscribe();
307 farms_to_add_remove.push(
308 farm_index,
309 Box::pin(async move {
310 match initialize_farm(
311 farm_index,
312 farm_id,
313 total_sectors_count,
314 plotted_pieces,
315 nats_client,
316 )
317 .await
318 {
319 Ok(farm) => {
320 info!(
321 %farmer_id,
322 %farm_index,
323 %farm_id,
324 "Farm initialized successfully"
325 );
326
327 FarmAddRemoveResult::Add {
328 close_receiver,
329 farm,
330 }
331 }
332 Err(error) => {
333 warn!(
334 %farmer_id,
335 %farm_index,
336 %farm_id,
337 %error,
338 "Failed to initialize farm"
339 );
340 FarmAddRemoveResult::Remove { farm_index }
342 }
343 }
344 }),
345 );
346 }
347 }
348 _ = farm_pruning_interval.tick().fuse() => {
349 for (farmer_id, farm_index, farm_id) in known_farmers.remove_expired() {
350 warn!(
351 %farmer_id,
352 %farm_index,
353 %farm_id,
354 "Farm expired, notify for cleanup"
355 );
356 }
357 }
358 (farm_index, result) = farms_to_add_remove.select_next_some() => {
359 match result {
360 FarmAddRemoveResult::Add {
361 mut close_receiver,
362 farm,
363 } => {
364 farms.push(async move {
365 select! {
366 result = farm.run().fuse() => {
367 (farm_index, result)
368 }
369 _ = close_receiver.recv().fuse() => {
370 (farm_index, Ok(()))
372 }
373 }
374 });
375 }
376 FarmAddRemoveResult::Remove { farm_index } => {
377 known_farmers.remove_farm(farm_index);
378 }
379 }
380 }
381 }
382 }
383}
384
385async fn collect_farmer_farms(
387 farmer_id: ClusterFarmerId,
388 nats_client: &NatsClient,
389) -> anyhow::Result<Vec<ClusterFarmerFarmDetails>> {
390 trace!(%farmer_id, "Requesting farmer farm details");
391 Ok(nats_client
392 .stream_request(
393 &ClusterFarmerFarmDetailsRequest,
394 Some(&farmer_id.to_string()),
395 )
396 .await
397 .inspect_err(|error| {
398 warn!(
399 %error,
400 %farmer_id,
401 "Failed to request farmer farm details"
402 )
403 })?
404 .collect()
405 .await)
406}
407
408async fn initialize_farm(
409 farm_index: FarmIndex,
410 farm_id: FarmId,
411 total_sectors_count: u16,
412 plotted_pieces: Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
413 nats_client: &NatsClient,
414) -> anyhow::Result<ClusterFarm> {
415 let farm = ClusterFarm::new(farm_id, total_sectors_count, nats_client.clone())
416 .await
417 .map_err(|error| anyhow!("Failed instantiate cluster farm {farm_id}: {error}"))?;
418
419 let plotted_sectors_buffer = Arc::new(Mutex::new(Vec::new()));
421 let sector_update_handler = farm.on_sector_update(Arc::new({
422 let plotted_sectors_buffer = Arc::clone(&plotted_sectors_buffer);
423
424 move |(_sector_index, sector_update)| {
425 if let SectorUpdate::Plotting(SectorPlottingDetails::Finished {
426 plotted_sector,
427 old_plotted_sector,
428 ..
429 }) = sector_update
430 {
431 plotted_sectors_buffer
432 .lock()
433 .push((plotted_sector.clone(), old_plotted_sector.clone()));
434 }
435 }
436 }));
437
438 let plotted_sectors = farm.plotted_sectors();
440 let mut plotted_sectors = plotted_sectors
441 .get()
442 .await
443 .map_err(|error| anyhow!("Failed to get plotted sectors for farm {farm_id}: {error}"))?;
444
445 {
446 plotted_pieces
447 .write()
448 .await
449 .add_farm(farm_index, farm.piece_reader());
450
451 while let Some(plotted_sector_result) = plotted_sectors.next().await {
452 let plotted_sector = plotted_sector_result.map_err(|error| {
453 anyhow!("Failed to get plotted sector for farm {farm_id}: {error}")
454 })?;
455
456 let mut plotted_pieces_guard = plotted_pieces.write().await;
457 plotted_pieces_guard.add_sector(farm_index, &plotted_sector);
458
459 drop(plotted_pieces_guard);
461
462 task::yield_now().await;
463 }
464 }
465
466 drop(sector_update_handler);
469 let plotted_sectors_buffer = mem::take(&mut *plotted_sectors_buffer.lock());
470 let add_buffered_sectors_fut = task::spawn_blocking(move || {
471 let mut plotted_pieces = plotted_pieces.write_blocking();
472
473 for (plotted_sector, old_plotted_sector) in plotted_sectors_buffer {
474 if let Some(old_plotted_sector) = old_plotted_sector {
475 plotted_pieces.delete_sector(farm_index, &old_plotted_sector);
476 }
477 plotted_pieces.delete_sector(farm_index, &plotted_sector);
479 plotted_pieces.add_sector(farm_index, &plotted_sector);
480 }
481 });
482
483 add_buffered_sectors_fut
484 .await
485 .map_err(|error| anyhow!("Failed to add buffered sectors for farm {farm_id}: {error}"))?;
486
487 Ok(farm)
488}