1#![feature(try_blocks)]
4
5use ab_archiving::archiver::NewArchivedSegment;
6use ab_client_api::{ChainInfo, ChainSyncStatus};
7use ab_client_archiving::{ArchivedSegmentNotification, recreate_genesis_segment};
8use ab_client_block_authoring::slot_worker::{BlockSealNotification, NewSlotNotification};
9use ab_client_consensus_common::ConsensusConstants;
10use ab_core_primitives::block::header::OwnedBlockHeaderSeal;
11use ab_core_primitives::block::owned::OwnedBeaconChainBlock;
12use ab_core_primitives::hashes::Blake3Hash;
13use ab_core_primitives::pieces::{Piece, PieceIndex};
14use ab_core_primitives::pot::SlotNumber;
15use ab_core_primitives::segments::{HistorySize, SegmentHeader, SegmentIndex};
16use ab_core_primitives::solutions::Solution;
17use ab_erasure_coding::ErasureCoding;
18use ab_farmer_components::FarmerProtocolInfo;
19use ab_farmer_rpc_primitives::{
20 BlockSealInfo, BlockSealResponse, FarmerAppInfo, MAX_SEGMENT_HEADERS_PER_REQUEST, SlotInfo,
21 SolutionResponse,
22};
23use ab_networking::libp2p::Multiaddr;
24use futures::channel::{mpsc, oneshot};
25use futures::{StreamExt, select};
26use jsonrpsee::core::{SubscriptionResult, async_trait};
27use jsonrpsee::proc_macros::rpc;
28use jsonrpsee::types::{ErrorObject, ErrorObjectOwned, SubscriptionId};
29use jsonrpsee::{PendingSubscriptionSink, SubscriptionSink, TrySendError};
30use parking_lot::Mutex;
31use schnellru::{ByLength, LruMap};
32use std::collections::HashMap;
33use std::collections::hash_map::Entry;
34use std::sync::{Arc, Weak};
35use tracing::{debug, error, warn};
36
37#[derive(Debug, thiserror::Error)]
39pub enum Error {
40 #[error("Solution was ignored for slot {slot}")]
42 SolutionWasIgnored {
43 slot: SlotNumber,
45 },
46 #[error(
48 "Segment headers length exceeded the limit: {actual}/{MAX_SEGMENT_HEADERS_PER_REQUEST}"
49 )]
50 SegmentHeadersLengthExceeded {
51 actual: usize,
53 },
54}
55
56impl From<Error> for ErrorObjectOwned {
57 fn from(error: Error) -> Self {
58 match &error {
59 Error::SolutionWasIgnored { .. } => {
60 ErrorObject::owned(0, error.to_string(), None::<()>)
61 }
62 Error::SegmentHeadersLengthExceeded { .. } => {
63 ErrorObject::owned(1, error.to_string(), None::<()>)
64 }
65 }
66 }
67}
68
69#[rpc(server)]
71pub trait FarmerRpcApi {
72 #[method(name = "getFarmerAppInfo")]
74 fn get_farmer_app_info(&self) -> Result<FarmerAppInfo, Error>;
75
76 #[method(name = "submitSolutionResponse")]
77 fn submit_solution_response(&self, solution_response: SolutionResponse) -> Result<(), Error>;
78
79 #[subscription(
81 name = "subscribeSlotInfo" => "slot_info",
82 unsubscribe = "unsubscribeSlotInfo",
83 item = SlotInfo,
84 )]
85 async fn subscribe_slot_info(&self) -> SubscriptionResult;
86
87 #[subscription(
89 name = "subscribeBlockSealing" => "block_seal",
90 unsubscribe = "unsubscribeBlockSealing",
91 item = BlockSealInfo,
92 )]
93 async fn subscribe_block_seal(&self) -> SubscriptionResult;
94
95 #[method(name = "submitBlockSeal")]
96 fn submit_block_seal(&self, block_seal: BlockSealResponse) -> Result<(), Error>;
97
98 #[subscription(
100 name = "subscribeArchivedSegmentHeader" => "archived_segment_header",
101 unsubscribe = "unsubscribeArchivedSegmentHeader",
102 item = SegmentHeader,
103 )]
104 async fn subscribe_archived_segment_header(&self) -> SubscriptionResult;
105
106 #[method(name = "segmentHeaders")]
107 async fn segment_headers(
108 &self,
109 segment_indices: Vec<SegmentIndex>,
110 ) -> Result<Vec<Option<SegmentHeader>>, Error>;
111
112 #[method(name = "piece", blocking)]
113 fn piece(&self, piece_index: PieceIndex) -> Result<Option<Piece>, Error>;
114
115 #[method(name = "acknowledgeArchivedSegmentHeader")]
116 async fn acknowledge_archived_segment_header(
117 &self,
118 segment_index: SegmentIndex,
119 ) -> Result<(), Error>;
120
121 #[method(name = "lastSegmentHeaders")]
122 async fn last_segment_headers(&self, limit: u32) -> Result<Vec<Option<SegmentHeader>>, Error>;
123}
124
125#[derive(Debug, Default)]
126struct ArchivedSegmentHeaderAcknowledgementSenders {
127 segment_index: SegmentIndex,
128 senders: HashMap<SubscriptionId<'static>, mpsc::Sender<()>>,
129}
130
131#[derive(Debug, Default)]
132struct BlockSignatureSenders {
133 current_pre_seal_hash: Blake3Hash,
134 senders: Vec<oneshot::Sender<OwnedBlockHeaderSeal>>,
135}
136
137#[derive(Debug)]
143enum CachedArchivedSegment {
144 Genesis(Arc<NewArchivedSegment>),
146 Weak(Weak<NewArchivedSegment>),
147}
148
149impl CachedArchivedSegment {
150 fn get(&self) -> Option<Arc<NewArchivedSegment>> {
151 match self {
152 CachedArchivedSegment::Genesis(archived_segment) => Some(Arc::clone(archived_segment)),
153 CachedArchivedSegment::Weak(weak_archived_segment) => weak_archived_segment.upgrade(),
154 }
155 }
156}
157
158#[derive(Debug)]
160pub struct FarmerRpcConfig<CI, CSS> {
161 pub genesis_block: OwnedBeaconChainBlock,
163 pub consensus_constants: ConsensusConstants,
165 pub max_pieces_in_sector: u16,
167 pub new_slot_notification_receiver: mpsc::Receiver<NewSlotNotification>,
169 pub block_sealing_notification_receiver: mpsc::Receiver<BlockSealNotification>,
171 pub archived_segment_notification_receiver: mpsc::Receiver<ArchivedSegmentNotification>,
173 pub dsn_bootstrap_nodes: Vec<Multiaddr>,
175 pub chain_info: CI,
177 pub chain_sync_status: CSS,
179 pub erasure_coding: ErasureCoding,
181}
182
183#[derive(Debug)]
185pub struct FarmerRpcWorker {
186 new_slot_notification_receiver: mpsc::Receiver<NewSlotNotification>,
187 block_sealing_notification_receiver: mpsc::Receiver<BlockSealNotification>,
188 archived_segment_notification_receiver: mpsc::Receiver<ArchivedSegmentNotification>,
189 solution_response_senders: Arc<Mutex<LruMap<SlotNumber, mpsc::Sender<Solution>>>>,
190 block_sealing_senders: Arc<Mutex<BlockSignatureSenders>>,
191 cached_archived_segment: Arc<Mutex<Option<CachedArchivedSegment>>>,
192 archived_segment_acknowledgement_senders:
193 Arc<Mutex<ArchivedSegmentHeaderAcknowledgementSenders>>,
194 slot_info_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
195 block_sealing_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
196 archived_segment_header_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
197}
198
199impl FarmerRpcWorker {
200 pub async fn run(mut self) {
202 loop {
203 select! {
204 maybe_new_slot_notification = self.new_slot_notification_receiver.next() => {
205 let Some(new_slot_notification) = maybe_new_slot_notification else {
206 break;
207 };
208
209 self.handle_new_slot_notification(new_slot_notification).await;
210 }
211 maybe_block_sealing_notification = self.block_sealing_notification_receiver.next() => {
212 let Some(block_sealing_notification) = maybe_block_sealing_notification else {
213 break;
214 };
215
216 self.handle_block_sealing_notification(block_sealing_notification).await;
217 }
218 maybe_archived_segment_notification = self.archived_segment_notification_receiver.next() => {
219 let Some(archived_segment_notification) = maybe_archived_segment_notification else {
220 break;
221 };
222
223 self.handle_archived_segment_notification(archived_segment_notification).await;
224 }
225 }
226 }
227 }
228
229 async fn handle_new_slot_notification(&mut self, new_slot_notification: NewSlotNotification) {
230 let NewSlotNotification {
231 new_slot_info,
232 solution_sender,
233 } = new_slot_notification;
234
235 let slot_number = new_slot_info.slot;
236
237 let mut solution_response_senders = self.solution_response_senders.lock();
240 if solution_response_senders.peek(&slot_number).is_none() {
241 solution_response_senders.insert(slot_number, solution_sender);
242 }
243
244 let global_challenge = new_slot_info
245 .proof_of_time
246 .derive_global_challenge(slot_number);
247
248 let slot_info = SlotInfo {
250 slot_number,
251 global_challenge,
252 solution_range: new_slot_info.solution_range,
253 };
254 let slot_info = serde_json::value::to_raw_value(&slot_info)
255 .expect("Serialization of slot info never fails; qed");
256
257 self.slot_info_subscriptions.lock().retain_mut(|sink| {
258 match sink.try_send(slot_info.clone()) {
259 Ok(()) => true,
260 Err(error) => match error {
261 TrySendError::Closed(_) => {
262 false
264 }
265 TrySendError::Full(_) => {
266 warn!(
267 subscription_id = ?sink.subscription_id(),
268 "Slot info receiver is too slow, dropping notification"
269 );
270 true
271 }
272 },
273 }
274 });
275 }
276
277 async fn handle_block_sealing_notification(
278 &mut self,
279 block_sealing_notification: BlockSealNotification,
280 ) {
281 let BlockSealNotification {
282 pre_seal_hash,
283 public_key_hash,
284 seal_sender,
285 } = block_sealing_notification;
286
287 {
289 let mut block_sealing_senders = self.block_sealing_senders.lock();
290
291 if block_sealing_senders.current_pre_seal_hash != pre_seal_hash {
292 block_sealing_senders.current_pre_seal_hash = pre_seal_hash;
293 block_sealing_senders.senders.clear();
294 }
295
296 block_sealing_senders.senders.push(seal_sender);
297 }
298
299 let block_seal_info = BlockSealInfo {
301 pre_seal_hash,
302 public_key_hash,
303 };
304 let block_seal_info = serde_json::value::to_raw_value(&block_seal_info)
305 .expect("Serialization of block seal info never fails; qed");
306
307 self.block_sealing_subscriptions.lock().retain_mut(|sink| {
308 match sink.try_send(block_seal_info.clone()) {
309 Ok(()) => true,
310 Err(error) => match error {
311 TrySendError::Closed(_) => {
312 false
314 }
315 TrySendError::Full(_) => {
316 warn!(
317 subscription_id = ?sink.subscription_id(),
318 "Block seal info receiver is too slow, dropping notification"
319 );
320 true
321 }
322 },
323 }
324 });
325 }
326
327 async fn handle_archived_segment_notification(
328 &mut self,
329 archived_segment_notification: ArchivedSegmentNotification,
330 ) {
331 let ArchivedSegmentNotification {
332 archived_segment,
333 acknowledgement_sender,
334 } = archived_segment_notification;
335
336 let segment_index = archived_segment.segment_header.segment_index();
337
338 self.archived_segment_header_subscriptions
339 .lock()
340 .retain_mut(|sink| {
341 let subscription_id = sink.subscription_id();
342
343 let mut archived_segment_acknowledgement_senders =
346 self.archived_segment_acknowledgement_senders.lock();
347
348 if archived_segment_acknowledgement_senders.segment_index != segment_index {
349 archived_segment_acknowledgement_senders.segment_index = segment_index;
350 archived_segment_acknowledgement_senders.senders.clear();
351 }
352
353 let maybe_archived_segment_header = match archived_segment_acknowledgement_senders
354 .senders
355 .entry(subscription_id.clone())
356 {
357 Entry::Occupied(_) => {
358 None
360 }
361 Entry::Vacant(entry) => {
362 entry.insert(acknowledgement_sender.clone());
363
364 Some(archived_segment.segment_header)
366 }
367 };
368
369 self.cached_archived_segment
370 .lock()
371 .replace(CachedArchivedSegment::Weak(Arc::downgrade(
372 &archived_segment,
373 )));
374
375 let maybe_archived_segment_header =
377 serde_json::value::to_raw_value(&maybe_archived_segment_header)
378 .expect("Serialization of archived segment info never fails; qed");
379
380 match sink.try_send(maybe_archived_segment_header) {
381 Ok(()) => true,
382 Err(error) => match error {
383 TrySendError::Closed(_) => {
384 archived_segment_acknowledgement_senders
386 .senders
387 .remove(&subscription_id);
388 false
389 }
390 TrySendError::Full(_) => {
391 warn!(
392 ?subscription_id,
393 "Block seal info receiver is too slow, dropping notification"
394 );
395 true
396 }
397 },
398 }
399 });
400 }
401}
402
403#[derive(Debug)]
405pub struct FarmerRpc<CI, CSS>
406where
407 CI: ChainInfo<OwnedBeaconChainBlock>,
408 CSS: ChainSyncStatus,
409{
410 genesis_block: OwnedBeaconChainBlock,
411 solution_response_senders: Arc<Mutex<LruMap<SlotNumber, mpsc::Sender<Solution>>>>,
412 block_sealing_senders: Arc<Mutex<BlockSignatureSenders>>,
413 dsn_bootstrap_nodes: Vec<Multiaddr>,
414 chain_info: CI,
415 cached_archived_segment: Arc<Mutex<Option<CachedArchivedSegment>>>,
416 archived_segment_acknowledgement_senders:
417 Arc<Mutex<ArchivedSegmentHeaderAcknowledgementSenders>>,
418 chain_sync_status: CSS,
419 consensus_constants: ConsensusConstants,
420 max_pieces_in_sector: u16,
421 slot_info_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
422 block_sealing_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
423 archived_segment_header_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
424 erasure_coding: ErasureCoding,
425}
426
427impl<CI, CSS> FarmerRpc<CI, CSS>
435where
436 CI: ChainInfo<OwnedBeaconChainBlock>,
437 CSS: ChainSyncStatus,
438{
439 pub fn new(config: FarmerRpcConfig<CI, CSS>) -> (Self, FarmerRpcWorker) {
441 let block_authoring_delay = u64::from(config.consensus_constants.block_authoring_delay);
442 let block_authoring_delay = usize::try_from(block_authoring_delay)
443 .expect("Block authoring delay will never exceed usize on any platform; qed");
444 let solution_response_senders_capacity = u32::try_from(block_authoring_delay)
445 .expect("Always a tiny constant in the protocol; qed");
446
447 let slot_info_subscriptions = Arc::default();
448 let block_sealing_subscriptions = Arc::default();
449
450 let solution_response_senders = Arc::new(Mutex::new(LruMap::new(ByLength::new(
451 solution_response_senders_capacity,
452 ))));
453 let block_sealing_senders = Arc::default();
454 let archived_segment_header_subscriptions = Arc::default();
455 let cached_archived_segment = Arc::default();
456
457 let rpc = Self {
458 genesis_block: config.genesis_block,
459 solution_response_senders: Arc::clone(&solution_response_senders),
460 block_sealing_senders: Arc::clone(&block_sealing_senders),
461 dsn_bootstrap_nodes: config.dsn_bootstrap_nodes,
462 chain_info: config.chain_info,
463 cached_archived_segment: Arc::clone(&cached_archived_segment),
464 archived_segment_acknowledgement_senders: Arc::default(),
465 chain_sync_status: config.chain_sync_status,
466 consensus_constants: config.consensus_constants,
467 max_pieces_in_sector: config.max_pieces_in_sector,
468 slot_info_subscriptions: Arc::clone(&slot_info_subscriptions),
469 block_sealing_subscriptions: Arc::clone(&block_sealing_subscriptions),
470 archived_segment_header_subscriptions: Arc::clone(
471 &archived_segment_header_subscriptions,
472 ),
473 erasure_coding: config.erasure_coding,
474 };
475
476 let worker = FarmerRpcWorker {
477 new_slot_notification_receiver: config.new_slot_notification_receiver,
478 block_sealing_notification_receiver: config.block_sealing_notification_receiver,
479 archived_segment_notification_receiver: config.archived_segment_notification_receiver,
480 solution_response_senders,
481 block_sealing_senders,
482 cached_archived_segment,
483 archived_segment_acknowledgement_senders: Arc::new(Default::default()),
484 slot_info_subscriptions,
485 block_sealing_subscriptions,
486 archived_segment_header_subscriptions,
487 };
488
489 (rpc, worker)
490 }
491}
492
493#[async_trait]
494impl<CI, CSS> FarmerRpcApiServer for FarmerRpc<CI, CSS>
495where
496 CI: ChainInfo<OwnedBeaconChainBlock>,
497 CSS: ChainSyncStatus,
498{
499 fn get_farmer_app_info(&self) -> Result<FarmerAppInfo, Error> {
500 let last_segment_index = self
501 .chain_info
502 .max_segment_index()
503 .unwrap_or(SegmentIndex::ZERO);
504
505 let consensus_constants = &self.consensus_constants;
506 let protocol_info = FarmerProtocolInfo {
507 history_size: HistorySize::from(last_segment_index),
508 max_pieces_in_sector: self.max_pieces_in_sector,
509 recent_segments: consensus_constants.recent_segments,
510 recent_history_fraction: consensus_constants.recent_history_fraction,
511 min_sector_lifetime: consensus_constants.min_sector_lifetime,
512 };
513
514 let farmer_app_info = FarmerAppInfo {
515 genesis_root: *self.genesis_block.header.header().root(),
516 dsn_bootstrap_nodes: self.dsn_bootstrap_nodes.clone(),
517 syncing: self.chain_sync_status.is_syncing(),
518 farming_timeout: consensus_constants
519 .slot_duration
520 .as_duration()
521 .mul_f64(consensus_constants.block_authoring_delay.as_u64() as f64),
522 protocol_info,
523 };
524
525 Ok(farmer_app_info)
526 }
527
528 fn submit_solution_response(&self, solution_response: SolutionResponse) -> Result<(), Error> {
529 let slot = solution_response.slot_number;
530 let public_key_hash = solution_response.solution.public_key_hash;
531 let sector_index = solution_response.solution.sector_index;
532 let mut solution_response_senders = self.solution_response_senders.lock();
533
534 let success = solution_response_senders
535 .peek_mut(&slot)
536 .and_then(|sender| sender.try_send(solution_response.solution).ok())
537 .is_some();
538
539 if !success {
540 warn!(
541 %slot,
542 %sector_index,
543 %public_key_hash,
544 "Solution was ignored, likely because farmer was too slow"
545 );
546
547 return Err(Error::SolutionWasIgnored { slot });
548 }
549
550 Ok(())
551 }
552
553 async fn subscribe_slot_info(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
554 let subscription = pending.accept().await?;
555 self.slot_info_subscriptions.lock().push(subscription);
556
557 Ok(())
558 }
559
560 async fn subscribe_block_seal(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
561 let subscription = pending.accept().await?;
562 self.block_sealing_subscriptions.lock().push(subscription);
563
564 Ok(())
565 }
566
567 fn submit_block_seal(&self, block_seal: BlockSealResponse) -> Result<(), Error> {
568 let block_sealing_senders = self.block_sealing_senders.clone();
569
570 let mut block_sealing_senders = block_sealing_senders.lock();
571
572 if block_sealing_senders.current_pre_seal_hash == block_seal.pre_seal_hash
573 && let Some(sender) = block_sealing_senders.senders.pop()
574 {
575 let _ = sender.send(block_seal.seal);
576 }
577
578 Ok(())
579 }
580
581 async fn subscribe_archived_segment_header(
582 &self,
583 pending: PendingSubscriptionSink,
584 ) -> SubscriptionResult {
585 let subscription = pending.accept().await?;
586 self.archived_segment_header_subscriptions
587 .lock()
588 .push(subscription);
589
590 Ok(())
591 }
592
593 async fn acknowledge_archived_segment_header(
594 &self,
595 segment_index: SegmentIndex,
596 ) -> Result<(), Error> {
597 let archived_segment_acknowledgement_senders =
598 self.archived_segment_acknowledgement_senders.clone();
599
600 let maybe_sender = {
601 let mut archived_segment_acknowledgement_senders_guard =
602 archived_segment_acknowledgement_senders.lock();
603
604 (archived_segment_acknowledgement_senders_guard.segment_index == segment_index)
605 .then(|| {
606 let last_key = archived_segment_acknowledgement_senders_guard
607 .senders
608 .keys()
609 .next()
610 .cloned()?;
611
612 archived_segment_acknowledgement_senders_guard
613 .senders
614 .remove(&last_key)
615 })
616 .flatten()
617 };
618
619 if let Some(mut sender) = maybe_sender
620 && let Err(error) = sender.try_send(())
621 && !error.is_disconnected()
622 {
623 warn!("Failed to acknowledge archived segment: {error}");
624 }
625
626 debug!(%segment_index, "Acknowledged archived segment.");
627
628 Ok(())
629 }
630
631 fn piece(&self, requested_piece_index: PieceIndex) -> Result<Option<Piece>, Error> {
634 let archived_segment = {
635 let mut cached_archived_segment = self.cached_archived_segment.lock();
636
637 match cached_archived_segment
638 .as_ref()
639 .and_then(CachedArchivedSegment::get)
640 {
641 Some(archived_segment) => archived_segment,
642 None => {
643 if requested_piece_index > SegmentIndex::ZERO.last_piece_index() {
644 return Ok(None);
645 }
646
647 debug!(%requested_piece_index, "Re-creating the genesis segment on demand");
648
649 let archived_segment = Arc::new(recreate_genesis_segment(
651 &self.genesis_block,
652 self.erasure_coding.clone(),
653 ));
654
655 cached_archived_segment.replace(CachedArchivedSegment::Genesis(Arc::clone(
656 &archived_segment,
657 )));
658
659 archived_segment
660 }
661 }
662 };
663
664 if requested_piece_index.segment_index() == archived_segment.segment_header.segment_index()
665 {
666 return Ok(archived_segment
667 .pieces
668 .pieces()
669 .nth(requested_piece_index.position() as usize));
670 }
671
672 Ok(None)
673 }
674
675 async fn segment_headers(
676 &self,
677 segment_indices: Vec<SegmentIndex>,
678 ) -> Result<Vec<Option<SegmentHeader>>, Error> {
679 if segment_indices.len() > MAX_SEGMENT_HEADERS_PER_REQUEST {
680 error!(
681 "`segment_indices` length exceed the limit: {} ",
682 segment_indices.len()
683 );
684
685 return Err(Error::SegmentHeadersLengthExceeded {
686 actual: segment_indices.len(),
687 });
688 };
689
690 Ok(segment_indices
691 .into_iter()
692 .map(|segment_index| self.chain_info.get_segment_header(segment_index))
693 .collect())
694 }
695
696 async fn last_segment_headers(&self, limit: u32) -> Result<Vec<Option<SegmentHeader>>, Error> {
697 if limit as usize > MAX_SEGMENT_HEADERS_PER_REQUEST {
698 error!(
699 "Request limit ({}) exceed the server limit: {} ",
700 limit, MAX_SEGMENT_HEADERS_PER_REQUEST
701 );
702
703 return Err(Error::SegmentHeadersLengthExceeded {
704 actual: limit as usize,
705 });
706 };
707
708 let last_segment_index = self
709 .chain_info
710 .max_segment_index()
711 .unwrap_or(SegmentIndex::ZERO);
712
713 let mut last_segment_headers = (SegmentIndex::ZERO..=last_segment_index)
714 .rev()
715 .take(limit as usize)
716 .map(|segment_index| self.chain_info.get_segment_header(segment_index))
717 .collect::<Vec<_>>();
718
719 last_segment_headers.reverse();
720
721 Ok(last_segment_headers)
722 }
723}