Skip to main content

ab_node_rpc_server/
lib.rs

1//! RPC API for the farmer
2
3use ab_archiving::archiver::NewArchivedSegment;
4use ab_client_api::{BeaconChainInfo, ChainSyncStatus};
5use ab_client_archiving::recreate::{
6    RecreateSegmentError, RecreateSegmentSuperSegmentDetails, recreate_genesis_segment,
7    recreate_segment,
8};
9use ab_client_block_authoring::slot_worker::{
10    BlockSealNotification, NewSlotInfo, NewSlotNotification,
11};
12use ab_client_consensus_common::ConsensusConstants;
13use ab_core_primitives::block::header::OwnedBlockHeaderSeal;
14use ab_core_primitives::block::owned::OwnedBeaconChainBlock;
15use ab_core_primitives::hashes::Blake3Hash;
16use ab_core_primitives::pieces::{Piece, PieceIndex};
17use ab_core_primitives::pot::SlotNumber;
18use ab_core_primitives::segments::{
19    HistorySize, LocalSegmentIndex, SegmentIndex, SuperSegment, SuperSegmentHeader,
20    SuperSegmentIndex, SuperSegmentRoot,
21};
22use ab_core_primitives::shard::ShardIndex;
23use ab_core_primitives::solutions::Solution;
24use ab_erasure_coding::ErasureCoding;
25use ab_farmer_components::FarmerProtocolInfo;
26use ab_farmer_rpc_primitives::{
27    BlockSealInfo, BlockSealResponse, FarmerAppInfo, FarmerShardMembershipInfo,
28    MAX_SUPER_SEGMENT_HEADERS_PER_REQUEST, SHARD_MEMBERSHIP_EXPIRATION, SlotInfo, SolutionResponse,
29};
30use ab_networking::libp2p::Multiaddr;
31use async_lock::Mutex as AsyncMutex;
32use futures::channel::{mpsc, oneshot};
33use futures::{FutureExt, SinkExt, StreamExt, select};
34use jsonrpsee::core::{SubscriptionResult, async_trait};
35use jsonrpsee::proc_macros::rpc;
36use jsonrpsee::server::{Server, ServerConfig};
37use jsonrpsee::tokio::task::{JoinError, spawn_blocking};
38use jsonrpsee::tokio::time::MissedTickBehavior;
39use jsonrpsee::types::{ErrorObject, ErrorObjectOwned};
40use jsonrpsee::{
41    ConnectionId, Extensions, PendingSubscriptionSink, SubscriptionSink, TrySendError,
42};
43use parking_lot::Mutex;
44use schnellru::{ByLength, LruMap};
45use std::collections::{HashMap, VecDeque};
46use std::io;
47use std::net::SocketAddr;
48use std::sync::Arc;
49use std::time::{Duration, Instant};
50use tracing::{error, info, warn};
51
52const CACHED_SUPER_SEGMENTS_CAPACITY: usize = 5;
53const CACHED_ARCHIVED_SEGMENT_TIMEOUT: Duration = Duration::from_mins(1);
54
55/// Top-level error type for the RPC handler.
56#[derive(Debug, thiserror::Error)]
57pub enum Error {
58    /// Solution was ignored
59    #[error("Solution was ignored for slot {slot}")]
60    SolutionWasIgnored {
61        /// Slot number
62        slot: SlotNumber,
63    },
64    /// Super segment headers length exceeded the limit
65    #[error(
66        "Super segment headers length exceeded the limit: \
67        {actual}/{MAX_SUPER_SEGMENT_HEADERS_PER_REQUEST}"
68    )]
69    SuperSegmentHeadersLengthExceeded {
70        /// Requested number of super segment headers/indices
71        actual: usize,
72    },
73    /// Failed to recreate segment
74    #[error("Failed to recreate segment: {0}")]
75    FailedToRecreateSegment(#[from] RecreateSegmentError),
76    /// Blocking task join error
77    #[error("Blocking task join error: {0}")]
78    BlockingTaskJoinError(#[from] JoinError),
79}
80
81impl From<Error> for ErrorObjectOwned {
82    fn from(error: Error) -> Self {
83        let code = match &error {
84            Error::SolutionWasIgnored { .. } => 0,
85            Error::SuperSegmentHeadersLengthExceeded { .. } => 1,
86            Error::FailedToRecreateSegment(_) => 2,
87            Error::BlockingTaskJoinError(_) => 3,
88        };
89
90        ErrorObject::owned(code, error.to_string(), None::<()>)
91    }
92}
93
94/// Provides rpc methods for interacting with the farmer
95#[rpc(server)]
96pub trait FarmerRpcApi {
97    /// Get metadata necessary for farmer operation
98    #[method(name = "getFarmerAppInfo")]
99    fn get_farmer_app_info(&self) -> Result<FarmerAppInfo, Error>;
100
101    #[method(name = "submitSolutionResponse")]
102    fn submit_solution_response(&self, solution_response: SolutionResponse) -> Result<(), Error>;
103
104    /// Slot info subscription
105    #[subscription(
106        name = "subscribeSlotInfo" => "slot_info",
107        unsubscribe = "unsubscribeSlotInfo",
108        item = SlotInfo,
109    )]
110    async fn subscribe_slot_info(&self) -> SubscriptionResult;
111
112    /// Sign block subscription
113    #[subscription(
114        name = "subscribeBlockSealing" => "block_seal",
115        unsubscribe = "unsubscribeBlockSealing",
116        item = BlockSealInfo,
117    )]
118    async fn subscribe_block_seal(&self) -> SubscriptionResult;
119
120    #[method(name = "submitBlockSeal")]
121    fn submit_block_seal(&self, block_seal: BlockSealResponse) -> Result<(), Error>;
122
123    /// New super segment header subscription
124    #[subscription(
125        name = "subscribeNewSuperSegmentHeader" => "new_super_segment_header",
126        unsubscribe = "unsubscribeNewSuperSegmentHeader",
127        item = SuperSegmentHeader,
128    )]
129    async fn subscribe_new_super_segment_header(&self) -> SubscriptionResult;
130
131    #[method(name = "superSegmentHeaders")]
132    async fn super_segment_headers(
133        &self,
134        super_segment_indices: Vec<SuperSegmentIndex>,
135    ) -> Result<Vec<Option<SuperSegmentHeader>>, Error>;
136
137    #[method(name = "lastSuperSegmentHeaders")]
138    async fn last_super_segment_headers(
139        &self,
140        limit: u32,
141    ) -> Result<Vec<Option<SuperSegmentHeader>>, Error>;
142
143    #[method(name = "superSegmentRootForSegmentIndex")]
144    async fn super_segment_root_for_segment_index(
145        &self,
146        segment_index: SegmentIndex,
147    ) -> Result<Option<SuperSegmentRoot>, Error>;
148
149    #[method(name = "piece")]
150    async fn piece(&self, piece_index: PieceIndex) -> Result<Option<Piece>, Error>;
151
152    #[method(name = "updateShardMembershipInfo", with_extensions)]
153    async fn update_shard_membership_info(
154        &self,
155        info: Vec<FarmerShardMembershipInfo>,
156    ) -> Result<(), Error>;
157}
158
159#[derive(Debug, Default)]
160struct BlockSignatureSenders {
161    current_pre_seal_hash: Blake3Hash,
162    senders: Vec<oneshot::Sender<OwnedBlockHeaderSeal>>,
163}
164
165#[derive(Debug)]
166struct CachedSuperSegments {
167    super_segments: VecDeque<SuperSegment>,
168}
169
170impl Default for CachedSuperSegments {
171    fn default() -> Self {
172        Self {
173            super_segments: VecDeque::with_capacity(CACHED_SUPER_SEGMENTS_CAPACITY),
174        }
175    }
176}
177
178impl CachedSuperSegments {
179    fn get_for_segment_index(&self, segment_index: SegmentIndex) -> Option<&SuperSegment> {
180        self.super_segments.iter().find(|super_segment| {
181            let max_segment_index = super_segment.header.max_segment_index.as_inner();
182            let first_segment_index = max_segment_index
183                - SegmentIndex::from(u64::from(super_segment.header.num_segments))
184                + SegmentIndex::ONE;
185
186            (first_segment_index..=max_segment_index).contains(&segment_index)
187        })
188    }
189
190    fn add(&mut self, super_segment: SuperSegment) {
191        if self.super_segments.len() == CACHED_SUPER_SEGMENTS_CAPACITY {
192            self.super_segments.pop_front();
193        }
194
195        self.super_segments.push_back(super_segment);
196    }
197}
198
199/// Temporary in-memory cache of the last archived segment
200#[derive(Debug)]
201struct CachedArchivedSegment {
202    segment_index: SegmentIndex,
203    segment: NewArchivedSegment,
204    last_used_at: Instant,
205}
206
207#[derive(Debug)]
208struct ShardMembershipConnectionsState {
209    last_update: Instant,
210    info: Vec<FarmerShardMembershipInfo>,
211}
212
213#[derive(Debug, Default)]
214struct ShardMembershipConnections {
215    connections: HashMap<ConnectionId, ShardMembershipConnectionsState>,
216}
217
218/// Farmer RPC configuration
219#[derive(Debug)]
220pub struct FarmerRpcConfig<BCI, CSS> {
221    /// IP and port (TCP) on which to listen for farmer RPC requests
222    pub listen_on: SocketAddr,
223    /// Genesis beacon chain block
224    pub genesis_block: OwnedBeaconChainBlock,
225    /// Consensus constants
226    pub consensus_constants: ConsensusConstants,
227    /// Max pieces in a sector
228    pub max_pieces_in_sector: u16,
229    /// New slot notifications
230    pub new_slot_notification_receiver: mpsc::Receiver<NewSlotNotification>,
231    /// Block sealing notifications
232    pub block_sealing_notification_receiver: mpsc::Receiver<BlockSealNotification>,
233    /// Super segment notifications
234    pub new_super_segment_notification_receiver: mpsc::Receiver<SuperSegment>,
235    /// Shard membership updates
236    pub shard_membership_updates_sender: mpsc::Sender<Vec<FarmerShardMembershipInfo>>,
237    /// DSN bootstrap nodes
238    pub dsn_bootstrap_nodes: Vec<Multiaddr>,
239    /// Beacon chain info
240    pub beacon_chain_info: BCI,
241    /// Chain sync status
242    pub chain_sync_status: CSS,
243    /// Erasure coding instance
244    pub erasure_coding: ErasureCoding,
245}
246
247/// Worker that drives RPC server tasks
248#[derive(Debug)]
249pub struct FarmerRpcWorker<BCI, CSS>
250where
251    BCI: BeaconChainInfo,
252    CSS: ChainSyncStatus,
253{
254    server: Option<Server>,
255    rpc: Option<FarmerRpc<BCI, CSS>>,
256    new_slot_notification_receiver: mpsc::Receiver<NewSlotNotification>,
257    block_sealing_notification_receiver: mpsc::Receiver<BlockSealNotification>,
258    new_super_segment_notification_receiver: mpsc::Receiver<SuperSegment>,
259    solution_response_senders: Arc<Mutex<LruMap<SlotNumber, mpsc::Sender<Solution>>>>,
260    block_sealing_senders: Arc<Mutex<BlockSignatureSenders>>,
261    slot_info_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
262    block_sealing_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
263    new_super_segment_header_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
264    cached_archived_segment: Arc<AsyncMutex<Option<CachedArchivedSegment>>>,
265    cached_super_segments: Arc<Mutex<CachedSuperSegments>>,
266}
267
268impl<BCI, CSS> FarmerRpcWorker<BCI, CSS>
269where
270    BCI: BeaconChainInfo,
271    CSS: ChainSyncStatus,
272{
273    /// Creates a new farmer RPC worker
274    pub async fn new(config: FarmerRpcConfig<BCI, CSS>) -> io::Result<Self> {
275        let server = Server::builder()
276            .set_config(ServerConfig::builder().ws_only().build())
277            .build(config.listen_on)
278            .await?;
279
280        let address = server.local_addr()?;
281        info!(%address, "Started farmer RPC server");
282
283        let block_authoring_delay = u64::from(config.consensus_constants.block_authoring_delay);
284        let block_authoring_delay = usize::try_from(block_authoring_delay)
285            .expect("Block authoring delay will never exceed usize on any platform; qed");
286        let solution_response_senders_capacity = u32::try_from(block_authoring_delay)
287            .expect("Always a tiny constant in the protocol; qed");
288
289        let slot_info_subscriptions = Arc::default();
290        let block_sealing_subscriptions = Arc::default();
291
292        let solution_response_senders = Arc::new(Mutex::new(LruMap::new(ByLength::new(
293            solution_response_senders_capacity,
294        ))));
295        let block_sealing_senders = Arc::default();
296        let new_super_segment_header_subscriptions = Arc::default();
297        let cached_archived_segment = Arc::default();
298        let cached_super_segments = Arc::default();
299
300        let rpc = FarmerRpc {
301            genesis_block: config.genesis_block,
302            solution_response_senders: Arc::clone(&solution_response_senders),
303            block_sealing_senders: Arc::clone(&block_sealing_senders),
304            dsn_bootstrap_nodes: config.dsn_bootstrap_nodes,
305            beacon_chain_info: config.beacon_chain_info,
306            chain_sync_status: config.chain_sync_status,
307            consensus_constants: config.consensus_constants,
308            max_pieces_in_sector: config.max_pieces_in_sector,
309            slot_info_subscriptions: Arc::clone(&slot_info_subscriptions),
310            block_sealing_subscriptions: Arc::clone(&block_sealing_subscriptions),
311            new_super_segment_header_subscriptions: Arc::clone(
312                &new_super_segment_header_subscriptions,
313            ),
314            cached_archived_segment: Arc::clone(&cached_archived_segment),
315            cached_super_segments: Arc::clone(&cached_super_segments),
316            shard_membership_connections: Arc::default(),
317            shard_membership_updates_sender: config.shard_membership_updates_sender,
318            erasure_coding: config.erasure_coding,
319        };
320
321        Ok(Self {
322            server: Some(server),
323            rpc: Some(rpc),
324            new_slot_notification_receiver: config.new_slot_notification_receiver,
325            block_sealing_notification_receiver: config.block_sealing_notification_receiver,
326            new_super_segment_notification_receiver: config.new_super_segment_notification_receiver,
327            solution_response_senders,
328            block_sealing_senders,
329            slot_info_subscriptions,
330            block_sealing_subscriptions,
331            new_super_segment_header_subscriptions,
332            cached_archived_segment,
333            cached_super_segments,
334        })
335    }
336
337    /// Drive RPC server tasks
338    pub async fn run(mut self) {
339        let server = self.server.take().expect("Called only once from here; qed");
340        let rpc = self.rpc.take().expect("Called only once from here; qed");
341        let mut server_fut = server.start(rpc.into_rpc()).stopped().boxed().fuse();
342
343        // Also send periodic updates in addition to the subscription response
344        let mut archived_segment_cache_cleanup_interval =
345            tokio::time::interval(CACHED_ARCHIVED_SEGMENT_TIMEOUT);
346        archived_segment_cache_cleanup_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
347
348        loop {
349            select! {
350                _ = server_fut => {}
351                maybe_new_slot_notification = self.new_slot_notification_receiver.next() => {
352                    let Some(new_slot_notification) = maybe_new_slot_notification else {
353                        break;
354                    };
355
356                    self.handle_new_slot_notification(new_slot_notification).await;
357                }
358                maybe_block_sealing_notification = self.block_sealing_notification_receiver.next() => {
359                    let Some(block_sealing_notification) = maybe_block_sealing_notification else {
360                        break;
361                    };
362
363                    self.handle_block_sealing_notification(block_sealing_notification).await;
364                }
365                maybe_new_super_segment = self.new_super_segment_notification_receiver.next() => {
366                    let Some(new_super_segment) = maybe_new_super_segment else {
367                        break;
368                    };
369
370                    self.handle_new_super_segment(new_super_segment).await;
371                }
372                _ = archived_segment_cache_cleanup_interval.tick().fuse() => {
373                    if let Some(mut maybe_cached_archived_segment) = self.cached_archived_segment.try_lock()
374                        && let Some(cached_archived_segment) = maybe_cached_archived_segment.as_ref()
375                        && cached_archived_segment.last_used_at.elapsed() >= CACHED_ARCHIVED_SEGMENT_TIMEOUT
376                    {
377                        maybe_cached_archived_segment.take();
378                    }
379                }
380            }
381        }
382    }
383
384    async fn handle_new_slot_notification(&mut self, new_slot_notification: NewSlotNotification) {
385        let NewSlotNotification {
386            new_slot_info,
387            solution_sender,
388        } = new_slot_notification;
389
390        let NewSlotInfo {
391            slot,
392            proof_of_time,
393            solution_range,
394            shard_membership_entropy,
395            num_shards,
396        } = new_slot_info;
397
398        // Store solution sender so that we can retrieve it when solution comes from
399        // the farmer
400        let mut solution_response_senders = self.solution_response_senders.lock();
401        if solution_response_senders.peek(&slot).is_none() {
402            solution_response_senders.insert(slot, solution_sender);
403        }
404
405        let global_challenge = proof_of_time.derive_global_challenge(slot);
406
407        // This will be sent to the farmer
408        let slot_info = SlotInfo {
409            slot,
410            global_challenge,
411            solution_range: solution_range.to_leaf_shard(num_shards),
412            shard_membership_entropy,
413            num_shards,
414        };
415        let slot_info = serde_json::value::to_raw_value(&slot_info)
416            .expect("Serialization of slot info never fails; qed");
417
418        self.slot_info_subscriptions.lock().retain_mut(|sink| {
419            match sink.try_send(slot_info.clone()) {
420                Ok(()) => true,
421                Err(error) => match error {
422                    TrySendError::Closed(_) => {
423                        // Remove closed receivers
424                        false
425                    }
426                    TrySendError::Full(_) => {
427                        warn!(
428                            subscription_id = ?sink.subscription_id(),
429                            "Slot info receiver is too slow, dropping notification"
430                        );
431                        true
432                    }
433                },
434            }
435        });
436    }
437
438    async fn handle_block_sealing_notification(
439        &mut self,
440        block_sealing_notification: BlockSealNotification,
441    ) {
442        let BlockSealNotification {
443            pre_seal_hash,
444            public_key_hash,
445            seal_sender,
446        } = block_sealing_notification;
447
448        // Store signature sender so that we can retrieve it when a solution comes from the farmer
449        {
450            let mut block_sealing_senders = self.block_sealing_senders.lock();
451
452            if block_sealing_senders.current_pre_seal_hash != pre_seal_hash {
453                block_sealing_senders.current_pre_seal_hash = pre_seal_hash;
454                block_sealing_senders.senders.clear();
455            }
456
457            block_sealing_senders.senders.push(seal_sender);
458        }
459
460        // This will be sent to the farmer
461        let block_seal_info = BlockSealInfo {
462            pre_seal_hash,
463            public_key_hash,
464        };
465        let block_seal_info = serde_json::value::to_raw_value(&block_seal_info)
466            .expect("Serialization of block seal info never fails; qed");
467
468        self.block_sealing_subscriptions.lock().retain_mut(|sink| {
469            match sink.try_send(block_seal_info.clone()) {
470                Ok(()) => true,
471                Err(error) => match error {
472                    TrySendError::Closed(_) => {
473                        // Remove closed receivers
474                        false
475                    }
476                    TrySendError::Full(_) => {
477                        warn!(
478                            subscription_id = ?sink.subscription_id(),
479                            "Block seal info receiver is too slow, dropping notification"
480                        );
481                        true
482                    }
483                },
484            }
485        });
486    }
487
488    async fn handle_new_super_segment(&mut self, super_segment: SuperSegment) {
489        // This will be sent to the farmer
490        let super_segment_header = serde_json::value::to_raw_value(&super_segment.header)
491            .expect("Serialization of super segment info never fails; qed");
492
493        self.cached_super_segments.lock().add(super_segment);
494
495        self.new_super_segment_header_subscriptions
496            .lock()
497            .retain_mut(|sink| {
498                let subscription_id = sink.subscription_id();
499
500                match sink.try_send(super_segment_header.clone()) {
501                    Ok(()) => true,
502                    Err(error) => match error {
503                        TrySendError::Closed(_) => false,
504                        TrySendError::Full(_) => {
505                            warn!(
506                                ?subscription_id,
507                                "Super segment receiver is too slow, dropping notification"
508                            );
509                            true
510                        }
511                    },
512                }
513            });
514    }
515}
516
517/// Implements the [`FarmerRpcApiServer`] trait for a farmer to connect to
518#[derive(Debug)]
519struct FarmerRpc<BCI, CSS>
520where
521    BCI: BeaconChainInfo,
522    CSS: ChainSyncStatus,
523{
524    genesis_block: OwnedBeaconChainBlock,
525    solution_response_senders: Arc<Mutex<LruMap<SlotNumber, mpsc::Sender<Solution>>>>,
526    block_sealing_senders: Arc<Mutex<BlockSignatureSenders>>,
527    dsn_bootstrap_nodes: Vec<Multiaddr>,
528    beacon_chain_info: BCI,
529    chain_sync_status: CSS,
530    consensus_constants: ConsensusConstants,
531    max_pieces_in_sector: u16,
532    slot_info_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
533    block_sealing_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
534    new_super_segment_header_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
535    cached_archived_segment: Arc<AsyncMutex<Option<CachedArchivedSegment>>>,
536    cached_super_segments: Arc<Mutex<CachedSuperSegments>>,
537    shard_membership_connections: Arc<Mutex<ShardMembershipConnections>>,
538    shard_membership_updates_sender: mpsc::Sender<Vec<FarmerShardMembershipInfo>>,
539    erasure_coding: ErasureCoding,
540}
541
542#[async_trait]
543impl<BCI, CSS> FarmerRpcApiServer for FarmerRpc<BCI, CSS>
544where
545    BCI: BeaconChainInfo,
546    CSS: ChainSyncStatus,
547{
548    fn get_farmer_app_info(&self) -> Result<FarmerAppInfo, Error> {
549        let max_segment_index = self
550            .beacon_chain_info
551            .last_super_segment_header()
552            .map(|super_segment_header| super_segment_header.max_segment_index.as_inner())
553            .unwrap_or(SegmentIndex::ZERO);
554
555        let consensus_constants = &self.consensus_constants;
556        let protocol_info = FarmerProtocolInfo {
557            history_size: HistorySize::from(max_segment_index),
558            max_pieces_in_sector: self.max_pieces_in_sector,
559            recent_segments: consensus_constants.recent_segments,
560            recent_history_fraction: consensus_constants.recent_history_fraction,
561            min_sector_lifetime: consensus_constants.min_sector_lifetime,
562        };
563
564        let farmer_app_info = FarmerAppInfo {
565            genesis_root: *self.genesis_block.header.header().root(),
566            dsn_bootstrap_nodes: self.dsn_bootstrap_nodes.clone(),
567            syncing: self.chain_sync_status.is_syncing(),
568            farming_timeout: consensus_constants
569                .slot_duration
570                .as_duration()
571                .mul_f64(u64::from(consensus_constants.block_authoring_delay) as f64),
572            protocol_info,
573        };
574
575        Ok(farmer_app_info)
576    }
577
578    fn submit_solution_response(&self, solution_response: SolutionResponse) -> Result<(), Error> {
579        let slot = solution_response.slot_number;
580        let public_key_hash = solution_response.solution.public_key_hash;
581        let sector_index = solution_response.solution.sector_index;
582        let mut solution_response_senders = self.solution_response_senders.lock();
583
584        let success = solution_response_senders
585            .peek_mut(&slot)
586            .and_then(|sender| sender.try_send(solution_response.solution).ok())
587            .is_some();
588
589        if !success {
590            warn!(
591                %slot,
592                %sector_index,
593                %public_key_hash,
594                "Solution was ignored, likely because farmer was too slow"
595            );
596
597            return Err(Error::SolutionWasIgnored { slot });
598        }
599
600        Ok(())
601    }
602
603    async fn subscribe_slot_info(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
604        let subscription = pending.accept().await?;
605        self.slot_info_subscriptions.lock().push(subscription);
606
607        Ok(())
608    }
609
610    async fn subscribe_block_seal(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
611        let subscription = pending.accept().await?;
612        self.block_sealing_subscriptions.lock().push(subscription);
613
614        Ok(())
615    }
616
617    fn submit_block_seal(&self, block_seal: BlockSealResponse) -> Result<(), Error> {
618        let block_sealing_senders = self.block_sealing_senders.clone();
619
620        let mut block_sealing_senders = block_sealing_senders.lock();
621
622        if block_sealing_senders.current_pre_seal_hash == block_seal.pre_seal_hash
623            && let Some(sender) = block_sealing_senders.senders.pop()
624        {
625            let _ = sender.send(block_seal.seal);
626        }
627
628        Ok(())
629    }
630
631    async fn subscribe_new_super_segment_header(
632        &self,
633        pending: PendingSubscriptionSink,
634    ) -> SubscriptionResult {
635        let subscription = pending.accept().await?;
636        self.new_super_segment_header_subscriptions
637            .lock()
638            .push(subscription);
639
640        Ok(())
641    }
642
643    async fn super_segment_headers(
644        &self,
645        super_segment_indices: Vec<SuperSegmentIndex>,
646    ) -> Result<Vec<Option<SuperSegmentHeader>>, Error> {
647        if super_segment_indices.len() > MAX_SUPER_SEGMENT_HEADERS_PER_REQUEST {
648            error!(
649                "`super_segment_indices` length exceed the limit: {} ",
650                super_segment_indices.len()
651            );
652
653            return Err(Error::SuperSegmentHeadersLengthExceeded {
654                actual: super_segment_indices.len(),
655            });
656        };
657
658        Ok(super_segment_indices
659            .into_iter()
660            .map(|super_segment_index| {
661                self.beacon_chain_info
662                    .get_super_segment_header(super_segment_index)
663            })
664            .collect())
665    }
666
667    async fn last_super_segment_headers(
668        &self,
669        limit: u32,
670    ) -> Result<Vec<Option<SuperSegmentHeader>>, Error> {
671        if limit as usize > MAX_SUPER_SEGMENT_HEADERS_PER_REQUEST {
672            error!(
673                "Request limit ({}) exceed the server limit: {} ",
674                limit, MAX_SUPER_SEGMENT_HEADERS_PER_REQUEST
675            );
676
677            return Err(Error::SuperSegmentHeadersLengthExceeded {
678                actual: limit as usize,
679            });
680        };
681
682        let last_super_segment_index = self
683            .beacon_chain_info
684            .last_super_segment_header()
685            .map(|super_segment_header| super_segment_header.index.as_inner())
686            .unwrap_or(SuperSegmentIndex::ZERO);
687
688        let mut last_super_segment_headers = (SuperSegmentIndex::ZERO..=last_super_segment_index)
689            .rev()
690            .take(limit as usize)
691            .map(|super_segment_index| {
692                self.beacon_chain_info
693                    .get_super_segment_header(super_segment_index)
694            })
695            .collect::<Vec<_>>();
696
697        last_super_segment_headers.reverse();
698
699        Ok(last_super_segment_headers)
700    }
701
702    async fn super_segment_root_for_segment_index(
703        &self,
704        segment_index: SegmentIndex,
705    ) -> Result<Option<SuperSegmentRoot>, Error> {
706        Ok(self
707            .beacon_chain_info
708            .get_super_segment_header_for_segment_index(segment_index)
709            .map(|super_segment_header| super_segment_header.root))
710    }
711
712    // Note: this RPC uses the cached archived segment, which is only updated by archived segments
713    // subscriptions
714    async fn piece(&self, requested_piece_index: PieceIndex) -> Result<Option<Piece>, Error> {
715        let segment_index = requested_piece_index.segment_index();
716        let cached_archived_segment = &mut *self.cached_archived_segment.lock().await;
717
718        if let Some(cached_archived_segment) = cached_archived_segment
719            && cached_archived_segment.segment_index == segment_index
720        {
721            cached_archived_segment.last_used_at = Instant::now();
722
723            return Ok(cached_archived_segment
724                .segment
725                .pieces
726                .pieces()
727                .nth(usize::from(requested_piece_index.position())));
728        }
729
730        if segment_index == SegmentIndex::ZERO {
731            let segment = spawn_blocking({
732                let genesis_block = self.genesis_block.clone();
733                let erasure_coding = self.erasure_coding.clone();
734
735                move || recreate_genesis_segment(&genesis_block, erasure_coding)
736            })
737            .await?;
738            let cached_archived_segment = cached_archived_segment.insert(CachedArchivedSegment {
739                segment_index: SegmentIndex::ZERO,
740                segment,
741                last_used_at: Instant::now(),
742            });
743
744            return Ok(cached_archived_segment
745                .segment
746                .pieces
747                .pieces()
748                .nth(usize::from(requested_piece_index.position())));
749        }
750
751        let (super_segment_index, shard_segment_root_with_position, segment_proof) = {
752            let cached_super_segments = self.cached_super_segments.lock();
753            let Some(super_segment) = cached_super_segments.get_for_segment_index(segment_index)
754            else {
755                return Ok(None);
756            };
757
758            let Some(shard_segment_root_with_position) = super_segment
759                .segment_roots
760                .iter()
761                .nth_back(u64::from(
762                    super_segment.header.max_segment_index.as_inner() - segment_index,
763                ) as usize)
764                .copied()
765            else {
766                error!(
767                    %requested_piece_index,
768                    %segment_index,
769                    super_segment_header = ?super_segment.header,
770                    "Failed to find segment index inside super segment, this should never happen"
771                );
772                return Ok(None);
773            };
774
775            let segment_position = shard_segment_root_with_position.segment_position;
776
777            let Some(segment_proof) = super_segment.proof_for_segment(segment_position) else {
778                error!(
779                    %requested_piece_index,
780                    %segment_index,
781                    %segment_position,
782                    super_segment_header = ?super_segment.header,
783                    "Failed to get segment proof for segment position, this should never happen"
784                );
785
786                return Ok(None);
787            };
788
789            (
790                super_segment.header.index.as_inner(),
791                shard_segment_root_with_position,
792                segment_proof,
793            )
794        };
795
796        let recreate_segment_super_segment_details = RecreateSegmentSuperSegmentDetails {
797            super_segment_index,
798            segment_position: shard_segment_root_with_position.segment_position,
799            segment_proof,
800        };
801
802        if shard_segment_root_with_position.shard_index != ShardIndex::BEACON_CHAIN {
803            // TODO: There will be a need for chain info instances of all live shards to re-derive
804            //  segments here, but there is just a beacon chain here for now
805            unimplemented!("Shard segments for non-beacon chain shards are not supported yet");
806        }
807
808        let last_archived_segment = shard_segment_root_with_position
809            .local_segment_index
810            .checked_sub(LocalSegmentIndex::ONE)
811            .and_then(|last_segment_index| {
812                self.beacon_chain_info
813                    .get_segment_header(last_segment_index)
814            });
815
816        let maybe_segment = recreate_segment(
817            last_archived_segment,
818            &self.beacon_chain_info,
819            self.erasure_coding.clone(),
820            &recreate_segment_super_segment_details,
821            |_| Vec::new(),
822        )
823        .await?;
824
825        let Some(segment) = maybe_segment else {
826            return Ok(None);
827        };
828
829        let cached_archived_segment = cached_archived_segment.insert(CachedArchivedSegment {
830            segment_index,
831            segment,
832            last_used_at: Instant::now(),
833        });
834
835        Ok(cached_archived_segment
836            .segment
837            .pieces
838            .pieces()
839            .nth(usize::from(requested_piece_index.position())))
840    }
841
842    async fn update_shard_membership_info(
843        &self,
844        extensions: &Extensions,
845        info: Vec<FarmerShardMembershipInfo>,
846    ) -> Result<(), Error> {
847        let connection_id = extensions
848            .get::<ConnectionId>()
849            .expect("`ConnectionId` is always present; qed");
850
851        let shard_membership = {
852            let mut shard_membership_connections = self.shard_membership_connections.lock();
853
854            // TODO: This is a workaround for https://github.com/paritytech/jsonrpsee/issues/1617
855            //  and should be replaced with cleanup on disconnection once that issue is resolved
856            shard_membership_connections
857                .connections
858                .retain(|_connection_id, state| {
859                    state.last_update.elapsed() < SHARD_MEMBERSHIP_EXPIRATION
860                });
861
862            shard_membership_connections.connections.insert(
863                *connection_id,
864                ShardMembershipConnectionsState {
865                    last_update: Instant::now(),
866                    info,
867                },
868            );
869
870            shard_membership_connections
871                .connections
872                .values()
873                .flat_map(|state| state.info.clone())
874                .collect::<Vec<_>>()
875        };
876
877        if let Err(error) = self
878            .shard_membership_updates_sender
879            .clone()
880            .send(shard_membership)
881            .await
882        {
883            warn!(%error, "Failed to send shard membership update");
884        }
885
886        Ok(())
887    }
888}