Skip to main content

ab_node_rpc_server/
lib.rs

1//! RPC API for the farmer
2
3use ab_archiving::archiver::NewArchivedSegment;
4use ab_client_api::{BeaconChainInfo, ChainSyncStatus};
5use ab_client_archiving::recreate::{
6    RecreateSegmentError, RecreateSegmentSuperSegmentDetails, recreate_genesis_segment,
7    recreate_segment,
8};
9use ab_client_block_authoring::slot_worker::{
10    BlockSealNotification, NewSlotInfo, NewSlotNotification,
11};
12use ab_client_consensus_common::ConsensusConstants;
13use ab_core_primitives::block::header::OwnedBlockHeaderSeal;
14use ab_core_primitives::block::owned::OwnedBeaconChainBlock;
15use ab_core_primitives::hashes::Blake3Hash;
16use ab_core_primitives::pieces::{Piece, PieceIndex};
17use ab_core_primitives::pot::SlotNumber;
18use ab_core_primitives::segments::{
19    HistorySize, LocalSegmentIndex, SegmentIndex, SuperSegment, SuperSegmentHeader,
20    SuperSegmentIndex, SuperSegmentRoot,
21};
22use ab_core_primitives::shard::ShardIndex;
23use ab_core_primitives::solutions::Solution;
24use ab_erasure_coding::ErasureCoding;
25use ab_farmer_components::FarmerProtocolInfo;
26use ab_farmer_rpc_primitives::{
27    BlockSealInfo, BlockSealResponse, FarmerAppInfo, FarmerShardMembershipInfo,
28    MAX_SUPER_SEGMENT_HEADERS_PER_REQUEST, SHARD_MEMBERSHIP_EXPIRATION, SlotInfo, SolutionResponse,
29};
30use ab_networking::libp2p::Multiaddr;
31use async_lock::Mutex as AsyncMutex;
32use futures::channel::{mpsc, oneshot};
33use futures::{FutureExt, SinkExt, StreamExt, select};
34use jsonrpsee::core::{SubscriptionResult, async_trait};
35use jsonrpsee::proc_macros::rpc;
36use jsonrpsee::server::{Server, ServerConfig};
37use jsonrpsee::tokio::task::{JoinError, spawn_blocking};
38use jsonrpsee::tokio::time::MissedTickBehavior;
39use jsonrpsee::types::{ErrorObject, ErrorObjectOwned};
40use jsonrpsee::{
41    ConnectionId, Extensions, PendingSubscriptionSink, SubscriptionSink, TrySendError,
42};
43use parking_lot::Mutex;
44use schnellru::{ByLength, LruMap};
45use std::collections::{HashMap, VecDeque};
46use std::io;
47use std::net::SocketAddr;
48use std::sync::Arc;
49use std::time::{Duration, Instant};
50use tracing::{error, info, warn};
51
52const CACHED_SUPER_SEGMENTS_CAPACITY: usize = 5;
53const CACHED_ARCHIVED_SEGMENT_TIMEOUT: Duration = Duration::from_mins(1);
54
55/// Top-level error type for the RPC handler.
56#[derive(Debug, thiserror::Error)]
57pub enum Error {
58    /// Solution was ignored
59    #[error("Solution was ignored for slot {slot}")]
60    SolutionWasIgnored {
61        /// Slot number
62        slot: SlotNumber,
63    },
64    /// Super segment headers length exceeded the limit
65    #[error(
66        "Super segment headers length exceeded the limit: \
67        {actual}/{MAX_SUPER_SEGMENT_HEADERS_PER_REQUEST}"
68    )]
69    SuperSegmentHeadersLengthExceeded {
70        /// Requested number of super segment headers/indices
71        actual: usize,
72    },
73    /// Failed to recreate segment
74    #[error("Failed to recreate segment: {0}")]
75    FailedToRecreateSegment(#[from] RecreateSegmentError),
76    /// Blocking task join error
77    #[error("Blocking task join error: {0}")]
78    BlockingTaskJoinError(#[from] JoinError),
79}
80
81impl From<Error> for ErrorObjectOwned {
82    fn from(error: Error) -> Self {
83        let code = match &error {
84            Error::SolutionWasIgnored { .. } => 0,
85            Error::SuperSegmentHeadersLengthExceeded { .. } => 1,
86            Error::FailedToRecreateSegment(_) => 2,
87            Error::BlockingTaskJoinError(_) => 3,
88        };
89
90        ErrorObject::owned(code, error.to_string(), None::<()>)
91    }
92}
93
94/// Provides rpc methods for interacting with the farmer
95#[rpc(server)]
96pub trait FarmerRpcApi {
97    /// Get metadata necessary for farmer operation
98    #[method(name = "getFarmerAppInfo")]
99    fn get_farmer_app_info(&self) -> Result<FarmerAppInfo, Error>;
100
101    #[method(name = "submitSolutionResponse")]
102    fn submit_solution_response(&self, solution_response: SolutionResponse) -> Result<(), Error>;
103
104    /// Slot info subscription
105    #[subscription(
106        name = "subscribeSlotInfo" => "slot_info",
107        unsubscribe = "unsubscribeSlotInfo",
108        item = SlotInfo,
109    )]
110    async fn subscribe_slot_info(&self) -> SubscriptionResult;
111
112    /// Sign block subscription
113    #[subscription(
114        name = "subscribeBlockSealing" => "block_seal",
115        unsubscribe = "unsubscribeBlockSealing",
116        item = BlockSealInfo,
117    )]
118    async fn subscribe_block_seal(&self) -> SubscriptionResult;
119
120    #[method(name = "submitBlockSeal")]
121    fn submit_block_seal(&self, block_seal: BlockSealResponse) -> Result<(), Error>;
122
123    /// New super segment header subscription
124    #[subscription(
125        name = "subscribeNewSuperSegmentHeader" => "new_super_segment_header",
126        unsubscribe = "unsubscribeNewSuperSegmentHeader",
127        item = SuperSegmentHeader,
128    )]
129    async fn subscribe_new_super_segment_header(&self) -> SubscriptionResult;
130
131    #[method(name = "superSegmentHeaders")]
132    async fn super_segment_headers(
133        &self,
134        super_segment_indices: Vec<SuperSegmentIndex>,
135    ) -> Result<Vec<Option<SuperSegmentHeader>>, Error>;
136
137    #[method(name = "lastSuperSegmentHeaders")]
138    async fn last_super_segment_headers(
139        &self,
140        limit: u32,
141    ) -> Result<Vec<Option<SuperSegmentHeader>>, Error>;
142
143    #[method(name = "superSegmentRootForSegmentIndex")]
144    async fn super_segment_root_for_segment_index(
145        &self,
146        segment_index: SegmentIndex,
147    ) -> Result<Option<SuperSegmentRoot>, Error>;
148
149    #[method(name = "piece")]
150    async fn piece(&self, piece_index: PieceIndex) -> Result<Option<Piece>, Error>;
151
152    #[method(name = "updateShardMembershipInfo", with_extensions)]
153    async fn update_shard_membership_info(
154        &self,
155        info: Vec<FarmerShardMembershipInfo>,
156    ) -> Result<(), Error>;
157}
158
159#[derive(Debug, Default)]
160struct BlockSignatureSenders {
161    current_pre_seal_hash: Blake3Hash,
162    senders: Vec<oneshot::Sender<OwnedBlockHeaderSeal>>,
163}
164
165#[derive(Debug)]
166struct CachedSuperSegments {
167    super_segments: VecDeque<SuperSegment>,
168}
169
170impl Default for CachedSuperSegments {
171    fn default() -> Self {
172        Self {
173            super_segments: VecDeque::with_capacity(CACHED_SUPER_SEGMENTS_CAPACITY),
174        }
175    }
176}
177
178impl CachedSuperSegments {
179    fn get_for_segment_index(&self, segment_index: SegmentIndex) -> Option<&SuperSegment> {
180        self.super_segments.iter().find(|super_segment| {
181            let max_segment_index = super_segment.header.max_segment_index.as_inner();
182            let first_segment_index = max_segment_index
183                - SegmentIndex::from(u64::from(super_segment.header.num_segments))
184                + SegmentIndex::ONE;
185
186            (first_segment_index..=max_segment_index).contains(&segment_index)
187        })
188    }
189
190    fn add(&mut self, super_segment: SuperSegment) {
191        if self.super_segments.len() == CACHED_SUPER_SEGMENTS_CAPACITY {
192            self.super_segments.pop_front();
193        }
194
195        self.super_segments.push_back(super_segment);
196    }
197}
198
199/// Temporary in-memory cache of the last archived segment
200#[derive(Debug)]
201struct CachedArchivedSegment {
202    segment_index: SegmentIndex,
203    segment: NewArchivedSegment,
204    last_used_at: Instant,
205}
206
207#[derive(Debug)]
208struct ShardMembershipConnectionsState {
209    last_update: Instant,
210    info: Vec<FarmerShardMembershipInfo>,
211}
212
213#[derive(Debug, Default)]
214struct ShardMembershipConnections {
215    connections: HashMap<ConnectionId, ShardMembershipConnectionsState>,
216}
217
218/// Farmer RPC configuration
219#[derive(Debug)]
220pub struct FarmerRpcConfig<BCI, CSS> {
221    /// IP and port (TCP) on which to listen for farmer RPC requests
222    pub listen_on: SocketAddr,
223    /// Genesis beacon chain block
224    pub genesis_block: OwnedBeaconChainBlock,
225    /// Consensus constants
226    pub consensus_constants: ConsensusConstants,
227    /// Max pieces in a sector
228    pub max_pieces_in_sector: u16,
229    /// New slot notifications
230    pub new_slot_notification_receiver: mpsc::Receiver<NewSlotNotification>,
231    /// Block sealing notifications
232    pub block_sealing_notification_receiver: mpsc::Receiver<BlockSealNotification>,
233    /// Super segment notifications
234    pub new_super_segment_notification_receiver: mpsc::Receiver<SuperSegment>,
235    /// Shard membership updates
236    pub shard_membership_updates_sender: mpsc::Sender<Vec<FarmerShardMembershipInfo>>,
237    /// DSN bootstrap nodes
238    pub dsn_bootstrap_nodes: Vec<Multiaddr>,
239    /// Beacon chain info
240    pub beacon_chain_info: BCI,
241    /// Chain sync status
242    pub chain_sync_status: CSS,
243    /// Erasure coding instance
244    pub erasure_coding: ErasureCoding,
245}
246
247/// Worker that drives RPC server tasks
248#[derive(Debug)]
249pub struct FarmerRpcWorker<BCI, CSS>
250where
251    BCI: BeaconChainInfo,
252    CSS: ChainSyncStatus,
253{
254    server: Option<Server>,
255    rpc: Option<FarmerRpc<BCI, CSS>>,
256    new_slot_notification_receiver: mpsc::Receiver<NewSlotNotification>,
257    block_sealing_notification_receiver: mpsc::Receiver<BlockSealNotification>,
258    new_super_segment_notification_receiver: mpsc::Receiver<SuperSegment>,
259    solution_response_senders: Arc<Mutex<LruMap<SlotNumber, mpsc::Sender<Solution>>>>,
260    block_sealing_senders: Arc<Mutex<BlockSignatureSenders>>,
261    slot_info_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
262    block_sealing_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
263    new_super_segment_header_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
264    cached_archived_segment: Arc<AsyncMutex<Option<CachedArchivedSegment>>>,
265    cached_super_segments: Arc<Mutex<CachedSuperSegments>>,
266}
267
268impl<BCI, CSS> FarmerRpcWorker<BCI, CSS>
269where
270    BCI: BeaconChainInfo,
271    CSS: ChainSyncStatus,
272{
273    /// Creates a new farmer RPC worker
274    pub async fn new(config: FarmerRpcConfig<BCI, CSS>) -> io::Result<Self> {
275        let server = Server::builder()
276            .set_config(ServerConfig::builder().ws_only().build())
277            .build(config.listen_on)
278            .await?;
279
280        let address = server.local_addr()?;
281        info!(%address, "Started farmer RPC server");
282
283        let block_authoring_delay = u64::from(config.consensus_constants.block_authoring_delay);
284        let block_authoring_delay = usize::try_from(block_authoring_delay)
285            .expect("Block authoring delay will never exceed usize on any platform; qed");
286        let solution_response_senders_capacity = u32::try_from(block_authoring_delay)
287            .expect("Always a tiny constant in the protocol; qed");
288
289        let slot_info_subscriptions = Arc::default();
290        let block_sealing_subscriptions = Arc::default();
291
292        let solution_response_senders = Arc::new(Mutex::new(LruMap::new(ByLength::new(
293            solution_response_senders_capacity,
294        ))));
295        let block_sealing_senders = Arc::default();
296        let new_super_segment_header_subscriptions = Arc::default();
297        let cached_archived_segment = Arc::default();
298        let cached_super_segments = Arc::default();
299
300        let rpc = FarmerRpc {
301            genesis_block: config.genesis_block,
302            solution_response_senders: Arc::clone(&solution_response_senders),
303            block_sealing_senders: Arc::clone(&block_sealing_senders),
304            dsn_bootstrap_nodes: config.dsn_bootstrap_nodes,
305            beacon_chain_info: config.beacon_chain_info,
306            chain_sync_status: config.chain_sync_status,
307            consensus_constants: config.consensus_constants,
308            max_pieces_in_sector: config.max_pieces_in_sector,
309            slot_info_subscriptions: Arc::clone(&slot_info_subscriptions),
310            block_sealing_subscriptions: Arc::clone(&block_sealing_subscriptions),
311            new_super_segment_header_subscriptions: Arc::clone(
312                &new_super_segment_header_subscriptions,
313            ),
314            cached_archived_segment: Arc::clone(&cached_archived_segment),
315            cached_super_segments: Arc::clone(&cached_super_segments),
316            shard_membership_connections: Arc::default(),
317            shard_membership_updates_sender: config.shard_membership_updates_sender,
318            erasure_coding: config.erasure_coding,
319        };
320
321        Ok(Self {
322            server: Some(server),
323            rpc: Some(rpc),
324            new_slot_notification_receiver: config.new_slot_notification_receiver,
325            block_sealing_notification_receiver: config.block_sealing_notification_receiver,
326            new_super_segment_notification_receiver: config.new_super_segment_notification_receiver,
327            solution_response_senders,
328            block_sealing_senders,
329            slot_info_subscriptions,
330            block_sealing_subscriptions,
331            new_super_segment_header_subscriptions,
332            cached_archived_segment,
333            cached_super_segments,
334        })
335    }
336
337    /// Drive RPC server tasks
338    pub async fn run(mut self) {
339        let server = self.server.take().expect("Called only once from here; qed");
340        let rpc = self.rpc.take().expect("Called only once from here; qed");
341        let mut server_fut = server.start(rpc.into_rpc()).stopped().boxed().fuse();
342
343        // Also send periodic updates in addition to the subscription response
344        let mut archived_segment_cache_cleanup_interval =
345            tokio::time::interval(CACHED_ARCHIVED_SEGMENT_TIMEOUT);
346        archived_segment_cache_cleanup_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
347
348        loop {
349            select! {
350                _ = server_fut => {}
351                maybe_new_slot_notification = self.new_slot_notification_receiver.next() => {
352                    let Some(new_slot_notification) = maybe_new_slot_notification else {
353                        break;
354                    };
355
356                    self.handle_new_slot_notification(new_slot_notification).await;
357                }
358                maybe_block_sealing_notification = self.block_sealing_notification_receiver.next() => {
359                    let Some(block_sealing_notification) = maybe_block_sealing_notification else {
360                        break;
361                    };
362
363                    self.handle_block_sealing_notification(block_sealing_notification).await;
364                }
365                maybe_new_super_segment = self.new_super_segment_notification_receiver.next() => {
366                    let Some(new_super_segment) = maybe_new_super_segment else {
367                        break;
368                    };
369
370                    self.handle_new_super_segment(new_super_segment).await;
371                }
372                _ = archived_segment_cache_cleanup_interval.tick().fuse() => {
373                    if let Some(mut maybe_cached_archived_segment) = self.cached_archived_segment.try_lock()
374                        && let Some(cached_archived_segment) = maybe_cached_archived_segment.as_ref()
375                        && cached_archived_segment.last_used_at.elapsed() >= CACHED_ARCHIVED_SEGMENT_TIMEOUT
376                    {
377                        maybe_cached_archived_segment.take();
378                    }
379                }
380            }
381        }
382    }
383
384    async fn handle_new_slot_notification(&mut self, new_slot_notification: NewSlotNotification) {
385        let NewSlotNotification {
386            new_slot_info,
387            solution_sender,
388        } = new_slot_notification;
389
390        let NewSlotInfo {
391            slot,
392            proof_of_time,
393            solution_range,
394            shard_membership_entropy,
395            num_shards,
396        } = new_slot_info;
397
398        // Store solution sender so that we can retrieve it when solution comes from
399        // the farmer
400        let mut solution_response_senders = self.solution_response_senders.lock();
401        if solution_response_senders.peek(&slot).is_none() {
402            solution_response_senders.insert(slot, solution_sender);
403        }
404
405        let global_challenge = proof_of_time.derive_global_challenge(slot);
406
407        // This will be sent to the farmer
408        let slot_info = SlotInfo {
409            slot,
410            global_challenge,
411            solution_range: solution_range.to_leaf_shard(num_shards),
412            shard_membership_entropy,
413            num_shards,
414        };
415        let slot_info = serde_json::value::to_raw_value(&slot_info)
416            .expect("Serialization of slot info never fails; qed");
417
418        self.slot_info_subscriptions.lock().retain_mut(|sink| {
419            match sink.try_send(slot_info.clone()) {
420                Ok(()) => true,
421                Err(error) => match error {
422                    TrySendError::Closed(_) => {
423                        // Remove closed receivers
424                        false
425                    }
426                    TrySendError::Full(_) => {
427                        warn!(
428                            subscription_id = ?sink.subscription_id(),
429                            "Slot info receiver is too slow, dropping notification"
430                        );
431                        true
432                    }
433                },
434            }
435        });
436    }
437
438    async fn handle_block_sealing_notification(
439        &mut self,
440        block_sealing_notification: BlockSealNotification,
441    ) {
442        let BlockSealNotification {
443            pre_seal_hash,
444            public_key_hash,
445            seal_sender,
446        } = block_sealing_notification;
447
448        // Store signature sender so that we can retrieve it when a solution comes from the farmer
449        {
450            let mut block_sealing_senders = self.block_sealing_senders.lock();
451
452            if block_sealing_senders.current_pre_seal_hash != pre_seal_hash {
453                block_sealing_senders.current_pre_seal_hash = pre_seal_hash;
454                block_sealing_senders.senders.clear();
455            }
456
457            block_sealing_senders.senders.push(seal_sender);
458        }
459
460        // This will be sent to the farmer
461        let block_seal_info = BlockSealInfo {
462            pre_seal_hash,
463            public_key_hash,
464        };
465        let block_seal_info = serde_json::value::to_raw_value(&block_seal_info)
466            .expect("Serialization of block seal info never fails; qed");
467
468        self.block_sealing_subscriptions.lock().retain_mut(|sink| {
469            match sink.try_send(block_seal_info.clone()) {
470                Ok(()) => true,
471                Err(error) => match error {
472                    TrySendError::Closed(_) => {
473                        // Remove closed receivers
474                        false
475                    }
476                    TrySendError::Full(_) => {
477                        warn!(
478                            subscription_id = ?sink.subscription_id(),
479                            "Block seal info receiver is too slow, dropping notification"
480                        );
481                        true
482                    }
483                },
484            }
485        });
486    }
487
488    async fn handle_new_super_segment(&mut self, super_segment: SuperSegment) {
489        // This will be sent to the farmer
490        let super_segment_header = serde_json::value::to_raw_value(&super_segment.header)
491            .expect("Serialization of super segment info never fails; qed");
492
493        self.cached_super_segments.lock().add(super_segment);
494
495        self.new_super_segment_header_subscriptions
496            .lock()
497            .retain_mut(|sink| {
498                let subscription_id = sink.subscription_id();
499
500                match sink.try_send(super_segment_header.clone()) {
501                    Ok(()) => true,
502                    Err(error) => match error {
503                        TrySendError::Closed(_) => false,
504                        TrySendError::Full(_) => {
505                            warn!(
506                                ?subscription_id,
507                                "Super segment receiver is too slow, dropping notification"
508                            );
509                            true
510                        }
511                    },
512                }
513            });
514    }
515}
516
517/// Implements the [`FarmerRpcApiServer`] trait for a farmer to connect to
518#[derive(Debug)]
519struct FarmerRpc<BCI, CSS>
520where
521    BCI: BeaconChainInfo,
522    CSS: ChainSyncStatus,
523{
524    genesis_block: OwnedBeaconChainBlock,
525    solution_response_senders: Arc<Mutex<LruMap<SlotNumber, mpsc::Sender<Solution>>>>,
526    block_sealing_senders: Arc<Mutex<BlockSignatureSenders>>,
527    dsn_bootstrap_nodes: Vec<Multiaddr>,
528    beacon_chain_info: BCI,
529    chain_sync_status: CSS,
530    consensus_constants: ConsensusConstants,
531    max_pieces_in_sector: u16,
532    slot_info_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
533    block_sealing_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
534    new_super_segment_header_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
535    cached_archived_segment: Arc<AsyncMutex<Option<CachedArchivedSegment>>>,
536    cached_super_segments: Arc<Mutex<CachedSuperSegments>>,
537    shard_membership_connections: Arc<Mutex<ShardMembershipConnections>>,
538    shard_membership_updates_sender: mpsc::Sender<Vec<FarmerShardMembershipInfo>>,
539    erasure_coding: ErasureCoding,
540}
541
542#[async_trait]
543impl<BCI, CSS> FarmerRpcApiServer for FarmerRpc<BCI, CSS>
544where
545    BCI: BeaconChainInfo,
546    CSS: ChainSyncStatus,
547{
548    fn get_farmer_app_info(&self) -> Result<FarmerAppInfo, Error> {
549        let max_segment_index = self
550            .beacon_chain_info
551            .last_super_segment_header()
552            .map_or(SegmentIndex::ZERO, |super_segment_header| {
553                super_segment_header.max_segment_index.as_inner()
554            });
555
556        let consensus_constants = &self.consensus_constants;
557        let protocol_info = FarmerProtocolInfo {
558            history_size: HistorySize::from(max_segment_index),
559            max_pieces_in_sector: self.max_pieces_in_sector,
560            recent_segments: consensus_constants.recent_segments,
561            recent_history_fraction: consensus_constants.recent_history_fraction,
562            min_sector_lifetime: consensus_constants.min_sector_lifetime,
563        };
564
565        let farmer_app_info = FarmerAppInfo {
566            genesis_root: *self.genesis_block.header.header().root(),
567            dsn_bootstrap_nodes: self.dsn_bootstrap_nodes.clone(),
568            syncing: self.chain_sync_status.is_syncing(),
569            farming_timeout: consensus_constants
570                .slot_duration
571                .as_duration()
572                .mul_f64(u64::from(consensus_constants.block_authoring_delay) as f64),
573            protocol_info,
574        };
575
576        Ok(farmer_app_info)
577    }
578
579    fn submit_solution_response(&self, solution_response: SolutionResponse) -> Result<(), Error> {
580        let slot = solution_response.slot_number;
581        let public_key_hash = solution_response.solution.public_key_hash;
582        let sector_index = solution_response.solution.sector_index;
583        let mut solution_response_senders = self.solution_response_senders.lock();
584
585        let success = solution_response_senders
586            .peek_mut(&slot)
587            .and_then(|sender| sender.try_send(solution_response.solution).ok())
588            .is_some();
589
590        if !success {
591            warn!(
592                %slot,
593                %sector_index,
594                %public_key_hash,
595                "Solution was ignored, likely because farmer was too slow"
596            );
597
598            return Err(Error::SolutionWasIgnored { slot });
599        }
600
601        Ok(())
602    }
603
604    async fn subscribe_slot_info(
605        &self,
606        subscription_sink: PendingSubscriptionSink,
607    ) -> SubscriptionResult {
608        let subscription = subscription_sink.accept().await?;
609        self.slot_info_subscriptions.lock().push(subscription);
610
611        Ok(())
612    }
613
614    async fn subscribe_block_seal(
615        &self,
616        subscription_sink: PendingSubscriptionSink,
617    ) -> SubscriptionResult {
618        let subscription = subscription_sink.accept().await?;
619        self.block_sealing_subscriptions.lock().push(subscription);
620
621        Ok(())
622    }
623
624    fn submit_block_seal(&self, block_seal: BlockSealResponse) -> Result<(), Error> {
625        let block_sealing_senders = Arc::clone(&self.block_sealing_senders);
626
627        let mut block_sealing_senders = block_sealing_senders.lock();
628
629        if block_sealing_senders.current_pre_seal_hash == block_seal.pre_seal_hash
630            && let Some(sender) = block_sealing_senders.senders.pop()
631        {
632            let _: Result<(), _> = sender.send(block_seal.seal);
633        }
634
635        Ok(())
636    }
637
638    async fn subscribe_new_super_segment_header(
639        &self,
640        subscription_sink: PendingSubscriptionSink,
641    ) -> SubscriptionResult {
642        let subscription = subscription_sink.accept().await?;
643        self.new_super_segment_header_subscriptions
644            .lock()
645            .push(subscription);
646
647        Ok(())
648    }
649
650    async fn super_segment_headers(
651        &self,
652        super_segment_indices: Vec<SuperSegmentIndex>,
653    ) -> Result<Vec<Option<SuperSegmentHeader>>, Error> {
654        if super_segment_indices.len() > MAX_SUPER_SEGMENT_HEADERS_PER_REQUEST {
655            error!(
656                "`super_segment_indices` length exceed the limit: {} ",
657                super_segment_indices.len()
658            );
659
660            return Err(Error::SuperSegmentHeadersLengthExceeded {
661                actual: super_segment_indices.len(),
662            });
663        }
664
665        Ok(super_segment_indices
666            .into_iter()
667            .map(|super_segment_index| {
668                self.beacon_chain_info
669                    .get_super_segment_header(super_segment_index)
670            })
671            .collect())
672    }
673
674    async fn last_super_segment_headers(
675        &self,
676        limit: u32,
677    ) -> Result<Vec<Option<SuperSegmentHeader>>, Error> {
678        if limit as usize > MAX_SUPER_SEGMENT_HEADERS_PER_REQUEST {
679            error!(
680                "Request limit ({}) exceed the server limit: {} ",
681                limit, MAX_SUPER_SEGMENT_HEADERS_PER_REQUEST
682            );
683
684            return Err(Error::SuperSegmentHeadersLengthExceeded {
685                actual: limit as usize,
686            });
687        }
688
689        let last_super_segment_index = self
690            .beacon_chain_info
691            .last_super_segment_header()
692            .map_or(SuperSegmentIndex::ZERO, |super_segment_header| {
693                super_segment_header.index.as_inner()
694            });
695
696        let mut last_super_segment_headers = (SuperSegmentIndex::ZERO..=last_super_segment_index)
697            .rev()
698            .take(limit as usize)
699            .map(|super_segment_index| {
700                self.beacon_chain_info
701                    .get_super_segment_header(super_segment_index)
702            })
703            .collect::<Vec<_>>();
704
705        last_super_segment_headers.reverse();
706
707        Ok(last_super_segment_headers)
708    }
709
710    async fn super_segment_root_for_segment_index(
711        &self,
712        segment_index: SegmentIndex,
713    ) -> Result<Option<SuperSegmentRoot>, Error> {
714        Ok(self
715            .beacon_chain_info
716            .get_super_segment_header_for_segment_index(segment_index)
717            .map(|super_segment_header| super_segment_header.root))
718    }
719
720    // Note: this RPC uses the cached archived segment, which is only updated by archived segments
721    // subscriptions
722    async fn piece(&self, piece_index: PieceIndex) -> Result<Option<Piece>, Error> {
723        let segment_index = piece_index.segment_index();
724        let cached_archived_segment = &mut *self.cached_archived_segment.lock().await;
725
726        if let Some(cached_archived_segment) = cached_archived_segment
727            && cached_archived_segment.segment_index == segment_index
728        {
729            cached_archived_segment.last_used_at = Instant::now();
730
731            return Ok(cached_archived_segment
732                .segment
733                .pieces
734                .pieces()
735                .nth(usize::from(piece_index.position())));
736        }
737
738        if segment_index == SegmentIndex::ZERO {
739            let segment = spawn_blocking({
740                let genesis_block = self.genesis_block.clone();
741                let erasure_coding = self.erasure_coding.clone();
742
743                move || recreate_genesis_segment(&genesis_block, erasure_coding)
744            })
745            .await?;
746            let cached_archived_segment = cached_archived_segment.insert(CachedArchivedSegment {
747                segment_index: SegmentIndex::ZERO,
748                segment,
749                last_used_at: Instant::now(),
750            });
751
752            return Ok(cached_archived_segment
753                .segment
754                .pieces
755                .pieces()
756                .nth(usize::from(piece_index.position())));
757        }
758
759        let (super_segment_index, shard_segment_root_with_position, segment_proof) = {
760            let cached_super_segments = self.cached_super_segments.lock();
761            let Some(super_segment) = cached_super_segments.get_for_segment_index(segment_index)
762            else {
763                return Ok(None);
764            };
765
766            let Some(shard_segment_root_with_position) = super_segment
767                .segment_roots
768                .iter()
769                .nth_back(u64::from(
770                    super_segment.header.max_segment_index.as_inner() - segment_index,
771                ) as usize)
772                .copied()
773            else {
774                error!(
775                    %piece_index,
776                    %segment_index,
777                    super_segment_header = ?super_segment.header,
778                    "Failed to find segment index inside super segment, this should never happen"
779                );
780                return Ok(None);
781            };
782
783            let segment_position = shard_segment_root_with_position.segment_position;
784
785            let Some(segment_proof) = super_segment.proof_for_segment(segment_position) else {
786                error!(
787                    %piece_index,
788                    %segment_index,
789                    %segment_position,
790                    super_segment_header = ?super_segment.header,
791                    "Failed to get segment proof for segment position, this should never happen"
792                );
793
794                return Ok(None);
795            };
796
797            (
798                super_segment.header.index.as_inner(),
799                shard_segment_root_with_position,
800                segment_proof,
801            )
802        };
803
804        let recreate_segment_super_segment_details = RecreateSegmentSuperSegmentDetails {
805            super_segment_index,
806            segment_position: shard_segment_root_with_position.segment_position,
807            segment_proof,
808        };
809
810        if shard_segment_root_with_position.shard_index != ShardIndex::BEACON_CHAIN {
811            // TODO: There will be a need for chain info instances of all live shards to re-derive
812            //  segments here, but there is just a beacon chain here for now
813            unimplemented!("Shard segments for non-beacon chain shards are not supported yet");
814        }
815
816        let last_archived_segment = shard_segment_root_with_position
817            .local_segment_index
818            .checked_sub(LocalSegmentIndex::ONE)
819            .and_then(|last_segment_index| {
820                self.beacon_chain_info
821                    .get_segment_header(last_segment_index)
822            });
823
824        let maybe_segment = recreate_segment(
825            last_archived_segment,
826            &self.beacon_chain_info,
827            self.erasure_coding.clone(),
828            &recreate_segment_super_segment_details,
829            |_| Vec::new(),
830        )
831        .await?;
832
833        let Some(segment) = maybe_segment else {
834            return Ok(None);
835        };
836
837        let cached_archived_segment = cached_archived_segment.insert(CachedArchivedSegment {
838            segment_index,
839            segment,
840            last_used_at: Instant::now(),
841        });
842
843        Ok(cached_archived_segment
844            .segment
845            .pieces
846            .pieces()
847            .nth(usize::from(piece_index.position())))
848    }
849
850    async fn update_shard_membership_info(
851        &self,
852        ext: &Extensions,
853        info: Vec<FarmerShardMembershipInfo>,
854    ) -> Result<(), Error> {
855        let connection_id = ext
856            .get::<ConnectionId>()
857            .expect("`ConnectionId` is always present; qed");
858
859        let shard_membership = {
860            let mut shard_membership_connections = self.shard_membership_connections.lock();
861
862            // TODO: This is a workaround for https://github.com/paritytech/jsonrpsee/issues/1617
863            //  and should be replaced with cleanup on disconnection once that issue is resolved
864            shard_membership_connections
865                .connections
866                .retain(|_connection_id, state| {
867                    state.last_update.elapsed() < SHARD_MEMBERSHIP_EXPIRATION
868                });
869
870            shard_membership_connections.connections.insert(
871                *connection_id,
872                ShardMembershipConnectionsState {
873                    last_update: Instant::now(),
874                    info,
875                },
876            );
877
878            shard_membership_connections
879                .connections
880                .values()
881                .flat_map(|state| state.info.clone())
882                .collect::<Vec<_>>()
883        };
884
885        if let Err(error) = self
886            .shard_membership_updates_sender
887            .clone()
888            .send(shard_membership)
889            .await
890        {
891            warn!(%error, "Failed to send shard membership update");
892        }
893
894        Ok(())
895    }
896}