Skip to main content

ab_node_rpc_server/
lib.rs

1//! RPC API for the farmer
2
3#![feature(try_blocks)]
4
5use ab_archiving::archiver::NewArchivedSegment;
6use ab_client_api::{BeaconChainInfo, ChainSyncStatus};
7use ab_client_archiving::segment::{ArchivedSegmentNotification, recreate_genesis_segment};
8use ab_client_block_authoring::slot_worker::{
9    BlockSealNotification, NewSlotInfo, NewSlotNotification,
10};
11use ab_client_consensus_common::ConsensusConstants;
12use ab_core_primitives::block::header::OwnedBlockHeaderSeal;
13use ab_core_primitives::block::owned::OwnedBeaconChainBlock;
14use ab_core_primitives::hashes::Blake3Hash;
15use ab_core_primitives::pieces::{Piece, PieceIndex};
16use ab_core_primitives::pot::SlotNumber;
17use ab_core_primitives::segments::{
18    HistorySize, LocalSegmentIndex, SegmentHeader, SegmentIndex, SuperSegmentHeader,
19    SuperSegmentIndex,
20};
21use ab_core_primitives::solutions::Solution;
22use ab_erasure_coding::ErasureCoding;
23use ab_farmer_components::FarmerProtocolInfo;
24use ab_farmer_rpc_primitives::{
25    BlockSealInfo, BlockSealResponse, FarmerAppInfo, FarmerShardMembershipInfo,
26    MAX_SEGMENT_HEADERS_PER_REQUEST, MAX_SUPER_SEGMENT_HEADERS_PER_REQUEST,
27    SHARD_MEMBERSHIP_EXPIRATION, SlotInfo, SolutionResponse,
28};
29use ab_networking::libp2p::Multiaddr;
30use futures::channel::{mpsc, oneshot};
31use futures::{FutureExt, SinkExt, StreamExt, select};
32use jsonrpsee::core::{SubscriptionResult, async_trait};
33use jsonrpsee::proc_macros::rpc;
34use jsonrpsee::server::{Server, ServerConfig};
35use jsonrpsee::types::{ErrorObject, ErrorObjectOwned, SubscriptionId};
36use jsonrpsee::{
37    ConnectionId, Extensions, PendingSubscriptionSink, SubscriptionSink, TrySendError,
38};
39use parking_lot::Mutex;
40use replace_with::replace_with_or_abort;
41use schnellru::{ByLength, LruMap};
42use std::collections::HashMap;
43use std::collections::hash_map::Entry;
44use std::io;
45use std::net::SocketAddr;
46use std::sync::{Arc, Weak};
47use std::time::Instant;
48use tracing::{debug, error, info, warn};
49
50/// Top-level error type for the RPC handler.
51#[derive(Debug, thiserror::Error)]
52pub enum Error {
53    /// Solution was ignored
54    #[error("Solution was ignored for slot {slot}")]
55    SolutionWasIgnored {
56        /// Slot number
57        slot: SlotNumber,
58    },
59    /// Segment headers length exceeded the limit
60    #[error(
61        "Segment headers length exceeded the limit: {actual}/{MAX_SEGMENT_HEADERS_PER_REQUEST}"
62    )]
63    SegmentHeadersLengthExceeded {
64        /// Requested number of segment headers/indices
65        actual: usize,
66    },
67}
68
69impl From<Error> for ErrorObjectOwned {
70    fn from(error: Error) -> Self {
71        let code = match &error {
72            Error::SolutionWasIgnored { .. } => 0,
73            Error::SegmentHeadersLengthExceeded { .. } => 1,
74        };
75
76        ErrorObject::owned(code, error.to_string(), None::<()>)
77    }
78}
79
80/// Provides rpc methods for interacting with the farmer
81#[rpc(server)]
82pub trait FarmerRpcApi {
83    /// Get metadata necessary for farmer operation
84    #[method(name = "getFarmerAppInfo")]
85    fn get_farmer_app_info(&self) -> Result<FarmerAppInfo, Error>;
86
87    #[method(name = "submitSolutionResponse")]
88    fn submit_solution_response(&self, solution_response: SolutionResponse) -> Result<(), Error>;
89
90    /// Slot info subscription
91    #[subscription(
92        name = "subscribeSlotInfo" => "slot_info",
93        unsubscribe = "unsubscribeSlotInfo",
94        item = SlotInfo,
95    )]
96    async fn subscribe_slot_info(&self) -> SubscriptionResult;
97
98    /// Sign block subscription
99    #[subscription(
100        name = "subscribeBlockSealing" => "block_seal",
101        unsubscribe = "unsubscribeBlockSealing",
102        item = BlockSealInfo,
103    )]
104    async fn subscribe_block_seal(&self) -> SubscriptionResult;
105
106    #[method(name = "submitBlockSeal")]
107    fn submit_block_seal(&self, block_seal: BlockSealResponse) -> Result<(), Error>;
108
109    /// Archived segment header subscription
110    #[subscription(
111        name = "subscribeArchivedSegmentHeader" => "archived_segment_header",
112        unsubscribe = "unsubscribeArchivedSegmentHeader",
113        item = SegmentHeader,
114    )]
115    async fn subscribe_archived_segment_header(&self) -> SubscriptionResult;
116
117    #[method(name = "superSegmentHeaders")]
118    async fn super_segment_headers(
119        &self,
120        super_segment_indices: Vec<SuperSegmentIndex>,
121    ) -> Result<Vec<Option<SuperSegmentHeader>>, Error>;
122
123    #[method(name = "segmentHeaders")]
124    async fn segment_headers(
125        &self,
126        segment_indices: Vec<SegmentIndex>,
127    ) -> Result<Vec<Option<SegmentHeader>>, Error>;
128
129    #[method(name = "piece", blocking)]
130    fn piece(&self, piece_index: PieceIndex) -> Result<Option<Piece>, Error>;
131
132    #[method(name = "acknowledgeArchivedSegmentHeader")]
133    async fn acknowledge_archived_segment_header(
134        &self,
135        segment_index: SegmentIndex,
136    ) -> Result<(), Error>;
137
138    #[method(name = "lastSegmentHeaders")]
139    async fn last_segment_headers(&self, limit: u32) -> Result<Vec<Option<SegmentHeader>>, Error>;
140
141    #[method(name = "updateShardMembershipInfo", with_extensions)]
142    async fn update_shard_membership_info(
143        &self,
144        info: Vec<FarmerShardMembershipInfo>,
145    ) -> Result<(), Error>;
146}
147
148#[derive(Debug, Default)]
149struct ArchivedSegmentHeaderAcknowledgementSenders {
150    segment_index: SegmentIndex,
151    senders: HashMap<SubscriptionId<'static>, mpsc::Sender<()>>,
152}
153
154#[derive(Debug, Default)]
155struct BlockSignatureSenders {
156    current_pre_seal_hash: Blake3Hash,
157    senders: Vec<oneshot::Sender<OwnedBlockHeaderSeal>>,
158}
159
160/// In-memory cache of last archived segment, such that when request comes back right after
161/// archived segment notification, RPC server is able to answer quickly.
162///
163/// We store weak reference, such that archived segment is not persisted for longer than
164/// necessary occupying RAM.
165#[derive(Debug)]
166enum CachedArchivedSegment {
167    /// Special case for genesis segment when requested over RPC
168    Genesis(Arc<NewArchivedSegment>),
169    Weak(Weak<NewArchivedSegment>),
170}
171
172impl CachedArchivedSegment {
173    fn get(&self) -> Option<Arc<NewArchivedSegment>> {
174        match self {
175            CachedArchivedSegment::Genesis(archived_segment) => Some(Arc::clone(archived_segment)),
176            CachedArchivedSegment::Weak(weak_archived_segment) => weak_archived_segment.upgrade(),
177        }
178    }
179}
180
181#[derive(Debug)]
182struct ShardMembershipConnectionsState {
183    last_update: Instant,
184    info: Vec<FarmerShardMembershipInfo>,
185}
186
187#[derive(Debug, Default)]
188struct ShardMembershipConnections {
189    connections: HashMap<ConnectionId, ShardMembershipConnectionsState>,
190}
191
192/// Farmer RPC configuration
193#[derive(Debug)]
194pub struct FarmerRpcConfig<BCI, CSS> {
195    /// IP and port (TCP) on which to listen for farmer RPC requests
196    pub listen_on: SocketAddr,
197    /// Genesis beacon chain block
198    pub genesis_block: OwnedBeaconChainBlock,
199    /// Consensus constants
200    pub consensus_constants: ConsensusConstants,
201    /// Max pieces in a sector
202    pub max_pieces_in_sector: u16,
203    /// New slot notifications
204    pub new_slot_notification_receiver: mpsc::Receiver<NewSlotNotification>,
205    /// Block sealing notifications
206    pub block_sealing_notification_receiver: mpsc::Receiver<BlockSealNotification>,
207    /// Archived segment notifications
208    pub archived_segment_notification_receiver: mpsc::Receiver<ArchivedSegmentNotification>,
209    /// Shard membership updates
210    pub shard_membership_updates_sender: mpsc::Sender<Vec<FarmerShardMembershipInfo>>,
211    /// DSN bootstrap nodes
212    pub dsn_bootstrap_nodes: Vec<Multiaddr>,
213    /// Beacon chain info
214    pub beacon_chain_info: BCI,
215    /// Chain sync status
216    pub chain_sync_status: CSS,
217    /// Erasure coding instance
218    pub erasure_coding: ErasureCoding,
219}
220
221/// Worker that drives RPC server tasks
222#[derive(Debug)]
223pub struct FarmerRpcWorker<BCI, CSS>
224where
225    BCI: BeaconChainInfo,
226    CSS: ChainSyncStatus,
227{
228    server: Option<Server>,
229    rpc: Option<FarmerRpc<BCI, CSS>>,
230    new_slot_notification_receiver: mpsc::Receiver<NewSlotNotification>,
231    block_sealing_notification_receiver: mpsc::Receiver<BlockSealNotification>,
232    archived_segment_notification_receiver: mpsc::Receiver<ArchivedSegmentNotification>,
233    solution_response_senders: Arc<Mutex<LruMap<SlotNumber, mpsc::Sender<Solution>>>>,
234    block_sealing_senders: Arc<Mutex<BlockSignatureSenders>>,
235    cached_archived_segment: Arc<Mutex<Option<CachedArchivedSegment>>>,
236    archived_segment_acknowledgement_senders:
237        Arc<Mutex<ArchivedSegmentHeaderAcknowledgementSenders>>,
238    slot_info_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
239    block_sealing_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
240    archived_segment_header_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
241}
242
243impl<BCI, CSS> FarmerRpcWorker<BCI, CSS>
244where
245    BCI: BeaconChainInfo,
246    CSS: ChainSyncStatus,
247{
248    /// Creates a new farmer RPC worker
249    pub async fn new(config: FarmerRpcConfig<BCI, CSS>) -> io::Result<Self> {
250        let server = Server::builder()
251            .set_config(ServerConfig::builder().ws_only().build())
252            .build(config.listen_on)
253            .await?;
254
255        let address = server.local_addr()?;
256        info!(%address, "Started farmer RPC server");
257
258        let block_authoring_delay = u64::from(config.consensus_constants.block_authoring_delay);
259        let block_authoring_delay = usize::try_from(block_authoring_delay)
260            .expect("Block authoring delay will never exceed usize on any platform; qed");
261        let solution_response_senders_capacity = u32::try_from(block_authoring_delay)
262            .expect("Always a tiny constant in the protocol; qed");
263
264        let slot_info_subscriptions = Arc::default();
265        let block_sealing_subscriptions = Arc::default();
266
267        let solution_response_senders = Arc::new(Mutex::new(LruMap::new(ByLength::new(
268            solution_response_senders_capacity,
269        ))));
270        let block_sealing_senders = Arc::default();
271        let cached_archived_segment = Arc::default();
272        let archived_segment_header_subscriptions = Arc::default();
273
274        let rpc = FarmerRpc {
275            genesis_block: config.genesis_block,
276            solution_response_senders: Arc::clone(&solution_response_senders),
277            block_sealing_senders: Arc::clone(&block_sealing_senders),
278            dsn_bootstrap_nodes: config.dsn_bootstrap_nodes,
279            beacon_chain_info: config.beacon_chain_info,
280            cached_archived_segment: Arc::clone(&cached_archived_segment),
281            archived_segment_acknowledgement_senders: Arc::default(),
282            chain_sync_status: config.chain_sync_status,
283            consensus_constants: config.consensus_constants,
284            max_pieces_in_sector: config.max_pieces_in_sector,
285            slot_info_subscriptions: Arc::clone(&slot_info_subscriptions),
286            block_sealing_subscriptions: Arc::clone(&block_sealing_subscriptions),
287            archived_segment_header_subscriptions: Arc::clone(
288                &archived_segment_header_subscriptions,
289            ),
290            shard_membership_connections: Arc::default(),
291            shard_membership_updates_sender: config.shard_membership_updates_sender,
292            erasure_coding: config.erasure_coding,
293        };
294
295        Ok(Self {
296            server: Some(server),
297            rpc: Some(rpc),
298            new_slot_notification_receiver: config.new_slot_notification_receiver,
299            block_sealing_notification_receiver: config.block_sealing_notification_receiver,
300            archived_segment_notification_receiver: config.archived_segment_notification_receiver,
301            solution_response_senders,
302            block_sealing_senders,
303            cached_archived_segment,
304            archived_segment_acknowledgement_senders: Arc::new(Default::default()),
305            slot_info_subscriptions,
306            block_sealing_subscriptions,
307            archived_segment_header_subscriptions,
308        })
309    }
310
311    /// Drive RPC server tasks
312    pub async fn run(mut self) {
313        let server = self.server.take().expect("Called only once from here; qed");
314        let rpc = self.rpc.take().expect("Called only once from here; qed");
315        let mut server_fut = server.start(rpc.into_rpc()).stopped().boxed().fuse();
316
317        loop {
318            select! {
319                _ = server_fut => {}
320                maybe_new_slot_notification = self.new_slot_notification_receiver.next() => {
321                    let Some(new_slot_notification) = maybe_new_slot_notification else {
322                        break;
323                    };
324
325                    self.handle_new_slot_notification(new_slot_notification).await;
326                }
327                maybe_block_sealing_notification = self.block_sealing_notification_receiver.next() => {
328                    let Some(block_sealing_notification) = maybe_block_sealing_notification else {
329                        break;
330                    };
331
332                    self.handle_block_sealing_notification(block_sealing_notification).await;
333                }
334                maybe_archived_segment_notification = self.archived_segment_notification_receiver.next() => {
335                    let Some(archived_segment_notification) = maybe_archived_segment_notification else {
336                        break;
337                    };
338
339                    self.handle_archived_segment_notification(archived_segment_notification).await;
340                }
341            }
342        }
343    }
344
345    async fn handle_new_slot_notification(&mut self, new_slot_notification: NewSlotNotification) {
346        let NewSlotNotification {
347            new_slot_info,
348            solution_sender,
349        } = new_slot_notification;
350
351        let NewSlotInfo {
352            slot,
353            proof_of_time,
354            solution_range,
355            shard_membership_entropy,
356            num_shards,
357        } = new_slot_info;
358
359        // Store solution sender so that we can retrieve it when solution comes from
360        // the farmer
361        let mut solution_response_senders = self.solution_response_senders.lock();
362        if solution_response_senders.peek(&slot).is_none() {
363            solution_response_senders.insert(slot, solution_sender);
364        }
365
366        let global_challenge = proof_of_time.derive_global_challenge(slot);
367
368        // This will be sent to the farmer
369        let slot_info = SlotInfo {
370            slot,
371            global_challenge,
372            solution_range: solution_range.to_leaf_shard(num_shards),
373            shard_membership_entropy,
374            num_shards,
375        };
376        let slot_info = serde_json::value::to_raw_value(&slot_info)
377            .expect("Serialization of slot info never fails; qed");
378
379        self.slot_info_subscriptions.lock().retain_mut(|sink| {
380            match sink.try_send(slot_info.clone()) {
381                Ok(()) => true,
382                Err(error) => match error {
383                    TrySendError::Closed(_) => {
384                        // Remove closed receivers
385                        false
386                    }
387                    TrySendError::Full(_) => {
388                        warn!(
389                            subscription_id = ?sink.subscription_id(),
390                            "Slot info receiver is too slow, dropping notification"
391                        );
392                        true
393                    }
394                },
395            }
396        });
397    }
398
399    async fn handle_block_sealing_notification(
400        &mut self,
401        block_sealing_notification: BlockSealNotification,
402    ) {
403        let BlockSealNotification {
404            pre_seal_hash,
405            public_key_hash,
406            seal_sender,
407        } = block_sealing_notification;
408
409        // Store signature sender so that we can retrieve it when a solution comes from the farmer
410        {
411            let mut block_sealing_senders = self.block_sealing_senders.lock();
412
413            if block_sealing_senders.current_pre_seal_hash != pre_seal_hash {
414                block_sealing_senders.current_pre_seal_hash = pre_seal_hash;
415                block_sealing_senders.senders.clear();
416            }
417
418            block_sealing_senders.senders.push(seal_sender);
419        }
420
421        // This will be sent to the farmer
422        let block_seal_info = BlockSealInfo {
423            pre_seal_hash,
424            public_key_hash,
425        };
426        let block_seal_info = serde_json::value::to_raw_value(&block_seal_info)
427            .expect("Serialization of block seal info never fails; qed");
428
429        self.block_sealing_subscriptions.lock().retain_mut(|sink| {
430            match sink.try_send(block_seal_info.clone()) {
431                Ok(()) => true,
432                Err(error) => match error {
433                    TrySendError::Closed(_) => {
434                        // Remove closed receivers
435                        false
436                    }
437                    TrySendError::Full(_) => {
438                        warn!(
439                            subscription_id = ?sink.subscription_id(),
440                            "Block seal info receiver is too slow, dropping notification"
441                        );
442                        true
443                    }
444                },
445            }
446        });
447    }
448
449    async fn handle_archived_segment_notification(
450        &mut self,
451        archived_segment_notification: ArchivedSegmentNotification,
452    ) {
453        let ArchivedSegmentNotification {
454            mut archived_segment,
455            acknowledgement_sender,
456        } = archived_segment_notification;
457
458        let segment_index =
459            SegmentIndex::from(archived_segment.segment_header.local_segment_index());
460
461        // TODO: Combine `archived_segment_header_subscriptions` and
462        //  `archived_segment_acknowledgement_senders` under the same lock to avoid potential
463        //  accidental deadlock with future changed
464        self.archived_segment_header_subscriptions
465            .lock()
466            .retain_mut(|sink| {
467                let subscription_id = sink.subscription_id();
468
469                // Store acknowledgment sender so that we can retrieve it when acknowledgment
470                // comes from the farmer
471                let mut archived_segment_acknowledgement_senders =
472                    self.archived_segment_acknowledgement_senders.lock();
473
474                if archived_segment_acknowledgement_senders.segment_index != segment_index {
475                    archived_segment_acknowledgement_senders.segment_index = segment_index;
476                    archived_segment_acknowledgement_senders.senders.clear();
477                }
478
479                // TODO: This is a temporary hack while there is only beacon chain. Local segments
480                //  will require special caching on the farmer before super segment is created and
481                //  both global segment index and segment proof are known
482                {
483                    let archived_segment = Arc::make_mut(&mut archived_segment);
484
485                    for piece in archived_segment.pieces.iter_mut() {
486                        piece.header.super_segment_index = SuperSegmentIndex::from(u64::from(
487                            archived_segment.segment_header.segment_index.as_inner(),
488                        ))
489                        .into();
490                        // Since there is a single segment in super segment, the proof is empty
491                    }
492
493                    replace_with_or_abort(&mut archived_segment.pieces, |pieces| {
494                        pieces.to_shared()
495                    });
496                }
497
498                self.cached_archived_segment
499                    .lock()
500                    .replace(CachedArchivedSegment::Weak(Arc::downgrade(
501                        &archived_segment,
502                    )));
503
504                let maybe_archived_segment_header = match archived_segment_acknowledgement_senders
505                    .senders
506                    .entry(subscription_id.clone())
507                {
508                    Entry::Occupied(_) => {
509                        // No need to do anything, a farmer is processing a request
510                        None
511                    }
512                    Entry::Vacant(entry) => {
513                        entry.insert(acknowledgement_sender.clone());
514
515                        // This will be sent to the farmer
516                        Some(archived_segment.segment_header)
517                    }
518                };
519
520                // This will be sent to the farmer
521                let maybe_archived_segment_header =
522                    serde_json::value::to_raw_value(&maybe_archived_segment_header)
523                        .expect("Serialization of archived segment info never fails; qed");
524
525                match sink.try_send(maybe_archived_segment_header) {
526                    Ok(()) => true,
527                    Err(error) => match error {
528                        TrySendError::Closed(_) => {
529                            // Remove closed receivers
530                            archived_segment_acknowledgement_senders
531                                .senders
532                                .remove(&subscription_id);
533                            false
534                        }
535                        TrySendError::Full(_) => {
536                            warn!(
537                                ?subscription_id,
538                                "Block seal info receiver is too slow, dropping notification"
539                            );
540                            true
541                        }
542                    },
543                }
544            });
545    }
546}
547
548/// Implements the [`FarmerRpcApiServer`] trait for a farmer to connect to
549#[derive(Debug)]
550struct FarmerRpc<BCI, CSS>
551where
552    BCI: BeaconChainInfo,
553    CSS: ChainSyncStatus,
554{
555    genesis_block: OwnedBeaconChainBlock,
556    solution_response_senders: Arc<Mutex<LruMap<SlotNumber, mpsc::Sender<Solution>>>>,
557    block_sealing_senders: Arc<Mutex<BlockSignatureSenders>>,
558    dsn_bootstrap_nodes: Vec<Multiaddr>,
559    beacon_chain_info: BCI,
560    cached_archived_segment: Arc<Mutex<Option<CachedArchivedSegment>>>,
561    archived_segment_acknowledgement_senders:
562        Arc<Mutex<ArchivedSegmentHeaderAcknowledgementSenders>>,
563    chain_sync_status: CSS,
564    consensus_constants: ConsensusConstants,
565    max_pieces_in_sector: u16,
566    slot_info_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
567    block_sealing_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
568    archived_segment_header_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
569    shard_membership_connections: Arc<Mutex<ShardMembershipConnections>>,
570    shard_membership_updates_sender: mpsc::Sender<Vec<FarmerShardMembershipInfo>>,
571    erasure_coding: ErasureCoding,
572}
573
574#[async_trait]
575impl<BCI, CSS> FarmerRpcApiServer for FarmerRpc<BCI, CSS>
576where
577    BCI: BeaconChainInfo,
578    CSS: ChainSyncStatus,
579{
580    fn get_farmer_app_info(&self) -> Result<FarmerAppInfo, Error> {
581        let last_segment_index = self
582            .beacon_chain_info
583            .last_segment_header()
584            .map(|segment_header| segment_header.segment_index.as_inner())
585            .unwrap_or(LocalSegmentIndex::ZERO);
586
587        let consensus_constants = &self.consensus_constants;
588        let protocol_info = FarmerProtocolInfo {
589            history_size: HistorySize::from(SegmentIndex::from(last_segment_index)),
590            max_pieces_in_sector: self.max_pieces_in_sector,
591            recent_segments: consensus_constants.recent_segments,
592            recent_history_fraction: consensus_constants.recent_history_fraction,
593            min_sector_lifetime: consensus_constants.min_sector_lifetime,
594        };
595
596        let farmer_app_info = FarmerAppInfo {
597            genesis_root: *self.genesis_block.header.header().root(),
598            dsn_bootstrap_nodes: self.dsn_bootstrap_nodes.clone(),
599            syncing: self.chain_sync_status.is_syncing(),
600            farming_timeout: consensus_constants
601                .slot_duration
602                .as_duration()
603                .mul_f64(u64::from(consensus_constants.block_authoring_delay) as f64),
604            protocol_info,
605        };
606
607        Ok(farmer_app_info)
608    }
609
610    fn submit_solution_response(&self, solution_response: SolutionResponse) -> Result<(), Error> {
611        let slot = solution_response.slot_number;
612        let public_key_hash = solution_response.solution.public_key_hash;
613        let sector_index = solution_response.solution.sector_index;
614        let mut solution_response_senders = self.solution_response_senders.lock();
615
616        let success = solution_response_senders
617            .peek_mut(&slot)
618            .and_then(|sender| sender.try_send(solution_response.solution).ok())
619            .is_some();
620
621        if !success {
622            warn!(
623                %slot,
624                %sector_index,
625                %public_key_hash,
626                "Solution was ignored, likely because farmer was too slow"
627            );
628
629            return Err(Error::SolutionWasIgnored { slot });
630        }
631
632        Ok(())
633    }
634
635    async fn subscribe_slot_info(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
636        let subscription = pending.accept().await?;
637        self.slot_info_subscriptions.lock().push(subscription);
638
639        Ok(())
640    }
641
642    async fn subscribe_block_seal(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
643        let subscription = pending.accept().await?;
644        self.block_sealing_subscriptions.lock().push(subscription);
645
646        Ok(())
647    }
648
649    fn submit_block_seal(&self, block_seal: BlockSealResponse) -> Result<(), Error> {
650        let block_sealing_senders = self.block_sealing_senders.clone();
651
652        let mut block_sealing_senders = block_sealing_senders.lock();
653
654        if block_sealing_senders.current_pre_seal_hash == block_seal.pre_seal_hash
655            && let Some(sender) = block_sealing_senders.senders.pop()
656        {
657            let _ = sender.send(block_seal.seal);
658        }
659
660        Ok(())
661    }
662
663    async fn subscribe_archived_segment_header(
664        &self,
665        pending: PendingSubscriptionSink,
666    ) -> SubscriptionResult {
667        let subscription = pending.accept().await?;
668        self.archived_segment_header_subscriptions
669            .lock()
670            .push(subscription);
671
672        Ok(())
673    }
674
675    async fn acknowledge_archived_segment_header(
676        &self,
677        segment_index: SegmentIndex,
678    ) -> Result<(), Error> {
679        let archived_segment_acknowledgement_senders =
680            self.archived_segment_acknowledgement_senders.clone();
681
682        let maybe_sender = {
683            let mut archived_segment_acknowledgement_senders_guard =
684                archived_segment_acknowledgement_senders.lock();
685
686            (archived_segment_acknowledgement_senders_guard.segment_index == segment_index)
687                .then(|| {
688                    let last_key = archived_segment_acknowledgement_senders_guard
689                        .senders
690                        .keys()
691                        .next()
692                        .cloned()?;
693
694                    archived_segment_acknowledgement_senders_guard
695                        .senders
696                        .remove(&last_key)
697                })
698                .flatten()
699        };
700
701        if let Some(mut sender) = maybe_sender
702            && let Err(error) = sender.try_send(())
703            && !error.is_disconnected()
704        {
705            warn!(%error, "Failed to acknowledge archived segment");
706        }
707
708        debug!(%segment_index, "Acknowledged archived segment.");
709
710        Ok(())
711    }
712
713    // Note: this RPC uses the cached archived segment, which is only updated by archived segments
714    // subscriptions
715    fn piece(&self, requested_piece_index: PieceIndex) -> Result<Option<Piece>, Error> {
716        let archived_segment = {
717            let mut cached_archived_segment = self.cached_archived_segment.lock();
718
719            match cached_archived_segment
720                .as_ref()
721                .and_then(CachedArchivedSegment::get)
722            {
723                Some(archived_segment) => archived_segment,
724                None => {
725                    if requested_piece_index > SegmentIndex::ZERO.last_piece_index() {
726                        return Ok(None);
727                    }
728
729                    debug!(%requested_piece_index, "Re-creating the genesis segment on demand");
730
731                    // Re-create the genesis segment on demand
732                    let archived_segment = Arc::new(recreate_genesis_segment(
733                        &self.genesis_block,
734                        self.erasure_coding.clone(),
735                    ));
736
737                    cached_archived_segment.replace(CachedArchivedSegment::Genesis(Arc::clone(
738                        &archived_segment,
739                    )));
740
741                    archived_segment
742                }
743            }
744        };
745
746        if requested_piece_index.segment_index()
747            == SegmentIndex::from(archived_segment.segment_header.local_segment_index())
748        {
749            return Ok(archived_segment
750                .pieces
751                .pieces()
752                .nth(usize::from(requested_piece_index.position())));
753        }
754
755        Ok(None)
756    }
757
758    async fn super_segment_headers(
759        &self,
760        super_segment_indices: Vec<SuperSegmentIndex>,
761    ) -> Result<Vec<Option<SuperSegmentHeader>>, Error> {
762        if super_segment_indices.len() > MAX_SUPER_SEGMENT_HEADERS_PER_REQUEST {
763            error!(
764                "`super_segment_indices` length exceed the limit: {} ",
765                super_segment_indices.len()
766            );
767
768            return Err(Error::SegmentHeadersLengthExceeded {
769                actual: super_segment_indices.len(),
770            });
771        };
772
773        Ok(super_segment_indices
774            .into_iter()
775            .map(|super_segment_index| {
776                self.beacon_chain_info
777                    .get_super_segment_header(super_segment_index)
778            })
779            .collect())
780    }
781
782    async fn segment_headers(
783        &self,
784        segment_indices: Vec<SegmentIndex>,
785    ) -> Result<Vec<Option<SegmentHeader>>, Error> {
786        if segment_indices.len() > MAX_SEGMENT_HEADERS_PER_REQUEST {
787            error!(
788                "`segment_indices` length exceed the limit: {} ",
789                segment_indices.len()
790            );
791
792            return Err(Error::SegmentHeadersLengthExceeded {
793                actual: segment_indices.len(),
794            });
795        };
796
797        Ok(segment_indices
798            .into_iter()
799            .map(|segment_index| {
800                self.beacon_chain_info
801                    .get_segment_header(segment_index.into())
802            })
803            .collect())
804    }
805
806    async fn last_segment_headers(&self, limit: u32) -> Result<Vec<Option<SegmentHeader>>, Error> {
807        if limit as usize > MAX_SEGMENT_HEADERS_PER_REQUEST {
808            error!(
809                "Request limit ({}) exceed the server limit: {} ",
810                limit, MAX_SEGMENT_HEADERS_PER_REQUEST
811            );
812
813            return Err(Error::SegmentHeadersLengthExceeded {
814                actual: limit as usize,
815            });
816        };
817
818        let last_segment_index = self
819            .beacon_chain_info
820            .last_segment_header()
821            .map(|segment_header| segment_header.segment_index.as_inner())
822            .unwrap_or(LocalSegmentIndex::ZERO);
823
824        let mut last_segment_headers = (LocalSegmentIndex::ZERO..=last_segment_index)
825            .rev()
826            .take(limit as usize)
827            .map(|segment_index| self.beacon_chain_info.get_segment_header(segment_index))
828            .collect::<Vec<_>>();
829
830        last_segment_headers.reverse();
831
832        Ok(last_segment_headers)
833    }
834
835    async fn update_shard_membership_info(
836        &self,
837        extensions: &Extensions,
838        info: Vec<FarmerShardMembershipInfo>,
839    ) -> Result<(), Error> {
840        let connection_id = extensions
841            .get::<ConnectionId>()
842            .expect("`ConnectionId` is always present; qed");
843
844        let shard_membership = {
845            let mut shard_membership_connections = self.shard_membership_connections.lock();
846
847            // TODO: This is a workaround for https://github.com/paritytech/jsonrpsee/issues/1617
848            //  and should be replaced with cleanup on disconnection once that issue is resolved
849            shard_membership_connections
850                .connections
851                .retain(|_connection_id, state| {
852                    state.last_update.elapsed() >= SHARD_MEMBERSHIP_EXPIRATION
853                });
854
855            shard_membership_connections.connections.insert(
856                *connection_id,
857                ShardMembershipConnectionsState {
858                    last_update: Instant::now(),
859                    info,
860                },
861            );
862
863            shard_membership_connections
864                .connections
865                .values()
866                .flat_map(|state| state.info.clone())
867                .collect::<Vec<_>>()
868        };
869
870        if let Err(error) = self
871            .shard_membership_updates_sender
872            .clone()
873            .send(shard_membership)
874            .await
875        {
876            warn!(%error, "Failed to send shard membership update");
877        }
878
879        Ok(())
880    }
881}