ab_node_rpc_server/
lib.rs

1//! RPC API for the farmer
2
3#![feature(try_blocks)]
4
5use ab_archiving::archiver::NewArchivedSegment;
6use ab_client_api::{ChainInfo, ChainSyncStatus};
7use ab_client_archiving::{ArchivedSegmentNotification, recreate_genesis_segment};
8use ab_client_block_authoring::slot_worker::{
9    BlockSealNotification, NewSlotInfo, NewSlotNotification,
10};
11use ab_client_consensus_common::ConsensusConstants;
12use ab_core_primitives::block::header::OwnedBlockHeaderSeal;
13use ab_core_primitives::block::owned::OwnedBeaconChainBlock;
14use ab_core_primitives::hashes::Blake3Hash;
15use ab_core_primitives::pieces::{Piece, PieceIndex};
16use ab_core_primitives::pot::SlotNumber;
17use ab_core_primitives::segments::{HistorySize, SegmentHeader, SegmentIndex};
18use ab_core_primitives::solutions::Solution;
19use ab_erasure_coding::ErasureCoding;
20use ab_farmer_components::FarmerProtocolInfo;
21use ab_farmer_rpc_primitives::{
22    BlockSealInfo, BlockSealResponse, FarmerAppInfo, FarmerShardMembershipInfo,
23    MAX_SEGMENT_HEADERS_PER_REQUEST, SHARD_MEMBERSHIP_EXPIRATION, SlotInfo, SolutionResponse,
24};
25use ab_networking::libp2p::Multiaddr;
26use futures::channel::{mpsc, oneshot};
27use futures::{FutureExt, SinkExt, StreamExt, select};
28use jsonrpsee::core::{SubscriptionResult, async_trait};
29use jsonrpsee::proc_macros::rpc;
30use jsonrpsee::server::{Server, ServerConfig};
31use jsonrpsee::types::{ErrorObject, ErrorObjectOwned, SubscriptionId};
32use jsonrpsee::{
33    ConnectionId, Extensions, PendingSubscriptionSink, SubscriptionSink, TrySendError,
34};
35use parking_lot::Mutex;
36use schnellru::{ByLength, LruMap};
37use std::collections::HashMap;
38use std::collections::hash_map::Entry;
39use std::io;
40use std::net::SocketAddr;
41use std::sync::{Arc, Weak};
42use std::time::Instant;
43use tracing::{debug, error, info, warn};
44
45/// Top-level error type for the RPC handler.
46#[derive(Debug, thiserror::Error)]
47pub enum Error {
48    /// Solution was ignored
49    #[error("Solution was ignored for slot {slot}")]
50    SolutionWasIgnored {
51        /// Slot number
52        slot: SlotNumber,
53    },
54    /// Segment headers length exceeded the limit
55    #[error(
56        "Segment headers length exceeded the limit: {actual}/{MAX_SEGMENT_HEADERS_PER_REQUEST}"
57    )]
58    SegmentHeadersLengthExceeded {
59        /// Requested number of segment headers/indices
60        actual: usize,
61    },
62}
63
64impl From<Error> for ErrorObjectOwned {
65    fn from(error: Error) -> Self {
66        let code = match &error {
67            Error::SolutionWasIgnored { .. } => 0,
68            Error::SegmentHeadersLengthExceeded { .. } => 1,
69        };
70
71        ErrorObject::owned(code, error.to_string(), None::<()>)
72    }
73}
74
75/// Provides rpc methods for interacting with the farmer
76#[rpc(server)]
77pub trait FarmerRpcApi {
78    /// Get metadata necessary for farmer operation
79    #[method(name = "getFarmerAppInfo")]
80    fn get_farmer_app_info(&self) -> Result<FarmerAppInfo, Error>;
81
82    #[method(name = "submitSolutionResponse")]
83    fn submit_solution_response(&self, solution_response: SolutionResponse) -> Result<(), Error>;
84
85    /// Slot info subscription
86    #[subscription(
87        name = "subscribeSlotInfo" => "slot_info",
88        unsubscribe = "unsubscribeSlotInfo",
89        item = SlotInfo,
90    )]
91    async fn subscribe_slot_info(&self) -> SubscriptionResult;
92
93    /// Sign block subscription
94    #[subscription(
95        name = "subscribeBlockSealing" => "block_seal",
96        unsubscribe = "unsubscribeBlockSealing",
97        item = BlockSealInfo,
98    )]
99    async fn subscribe_block_seal(&self) -> SubscriptionResult;
100
101    #[method(name = "submitBlockSeal")]
102    fn submit_block_seal(&self, block_seal: BlockSealResponse) -> Result<(), Error>;
103
104    /// Archived segment header subscription
105    #[subscription(
106        name = "subscribeArchivedSegmentHeader" => "archived_segment_header",
107        unsubscribe = "unsubscribeArchivedSegmentHeader",
108        item = SegmentHeader,
109    )]
110    async fn subscribe_archived_segment_header(&self) -> SubscriptionResult;
111
112    #[method(name = "segmentHeaders")]
113    async fn segment_headers(
114        &self,
115        segment_indices: Vec<SegmentIndex>,
116    ) -> Result<Vec<Option<SegmentHeader>>, Error>;
117
118    #[method(name = "piece", blocking)]
119    fn piece(&self, piece_index: PieceIndex) -> Result<Option<Piece>, Error>;
120
121    #[method(name = "acknowledgeArchivedSegmentHeader")]
122    async fn acknowledge_archived_segment_header(
123        &self,
124        segment_index: SegmentIndex,
125    ) -> Result<(), Error>;
126
127    #[method(name = "lastSegmentHeaders")]
128    async fn last_segment_headers(&self, limit: u32) -> Result<Vec<Option<SegmentHeader>>, Error>;
129
130    #[method(name = "updateShardMembershipInfo", with_extensions)]
131    async fn update_shard_membership_info(
132        &self,
133        info: Vec<FarmerShardMembershipInfo>,
134    ) -> Result<(), Error>;
135}
136
137#[derive(Debug, Default)]
138struct ArchivedSegmentHeaderAcknowledgementSenders {
139    segment_index: SegmentIndex,
140    senders: HashMap<SubscriptionId<'static>, mpsc::Sender<()>>,
141}
142
143#[derive(Debug, Default)]
144struct BlockSignatureSenders {
145    current_pre_seal_hash: Blake3Hash,
146    senders: Vec<oneshot::Sender<OwnedBlockHeaderSeal>>,
147}
148
149/// In-memory cache of last archived segment, such that when request comes back right after
150/// archived segment notification, RPC server is able to answer quickly.
151///
152/// We store weak reference, such that archived segment is not persisted for longer than
153/// necessary occupying RAM.
154#[derive(Debug)]
155enum CachedArchivedSegment {
156    /// Special case for genesis segment when requested over RPC
157    Genesis(Arc<NewArchivedSegment>),
158    Weak(Weak<NewArchivedSegment>),
159}
160
161impl CachedArchivedSegment {
162    fn get(&self) -> Option<Arc<NewArchivedSegment>> {
163        match self {
164            CachedArchivedSegment::Genesis(archived_segment) => Some(Arc::clone(archived_segment)),
165            CachedArchivedSegment::Weak(weak_archived_segment) => weak_archived_segment.upgrade(),
166        }
167    }
168}
169
170#[derive(Debug)]
171struct ShardMembershipConnectionsState {
172    last_update: Instant,
173    info: Vec<FarmerShardMembershipInfo>,
174}
175
176#[derive(Debug, Default)]
177struct ShardMembershipConnections {
178    connections: HashMap<ConnectionId, ShardMembershipConnectionsState>,
179}
180
181/// Farmer RPC configuration
182#[derive(Debug)]
183pub struct FarmerRpcConfig<CI, CSS> {
184    /// IP and port (TCP) on which to listen for farmer RPC requests
185    pub listen_on: SocketAddr,
186    /// Genesis beacon beacon chain block
187    pub genesis_block: OwnedBeaconChainBlock,
188    /// Consensus constants
189    pub consensus_constants: ConsensusConstants,
190    /// Max pieces in sector
191    pub max_pieces_in_sector: u16,
192    /// New slot notifications
193    pub new_slot_notification_receiver: mpsc::Receiver<NewSlotNotification>,
194    /// Block sealing notifications
195    pub block_sealing_notification_receiver: mpsc::Receiver<BlockSealNotification>,
196    /// Archived segment notifications
197    pub archived_segment_notification_receiver: mpsc::Receiver<ArchivedSegmentNotification>,
198    /// Shard membership updates
199    pub shard_membership_updates_sender: mpsc::Sender<Vec<FarmerShardMembershipInfo>>,
200    /// DSN bootstrap nodes
201    pub dsn_bootstrap_nodes: Vec<Multiaddr>,
202    /// Beacon chain info
203    pub chain_info: CI,
204    /// Chain sync status
205    pub chain_sync_status: CSS,
206    /// Erasure coding instance
207    pub erasure_coding: ErasureCoding,
208}
209
210/// Worker that drives RPC server tasks
211#[derive(Debug)]
212pub struct FarmerRpcWorker<CI, CSS>
213where
214    CI: ChainInfo<OwnedBeaconChainBlock>,
215    CSS: ChainSyncStatus,
216{
217    server: Option<Server>,
218    rpc: Option<FarmerRpc<CI, CSS>>,
219    new_slot_notification_receiver: mpsc::Receiver<NewSlotNotification>,
220    block_sealing_notification_receiver: mpsc::Receiver<BlockSealNotification>,
221    archived_segment_notification_receiver: mpsc::Receiver<ArchivedSegmentNotification>,
222    solution_response_senders: Arc<Mutex<LruMap<SlotNumber, mpsc::Sender<Solution>>>>,
223    block_sealing_senders: Arc<Mutex<BlockSignatureSenders>>,
224    cached_archived_segment: Arc<Mutex<Option<CachedArchivedSegment>>>,
225    archived_segment_acknowledgement_senders:
226        Arc<Mutex<ArchivedSegmentHeaderAcknowledgementSenders>>,
227    slot_info_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
228    block_sealing_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
229    archived_segment_header_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
230}
231
232impl<CI, CSS> FarmerRpcWorker<CI, CSS>
233where
234    CI: ChainInfo<OwnedBeaconChainBlock>,
235    CSS: ChainSyncStatus,
236{
237    /// Creates a new farmer RPC worker
238    pub async fn new(config: FarmerRpcConfig<CI, CSS>) -> io::Result<Self> {
239        let server = Server::builder()
240            .set_config(ServerConfig::builder().ws_only().build())
241            .build(config.listen_on)
242            .await?;
243
244        let address = server.local_addr()?;
245        info!(%address, "Started farmer RPC server");
246
247        let block_authoring_delay = u64::from(config.consensus_constants.block_authoring_delay);
248        let block_authoring_delay = usize::try_from(block_authoring_delay)
249            .expect("Block authoring delay will never exceed usize on any platform; qed");
250        let solution_response_senders_capacity = u32::try_from(block_authoring_delay)
251            .expect("Always a tiny constant in the protocol; qed");
252
253        let slot_info_subscriptions = Arc::default();
254        let block_sealing_subscriptions = Arc::default();
255
256        let solution_response_senders = Arc::new(Mutex::new(LruMap::new(ByLength::new(
257            solution_response_senders_capacity,
258        ))));
259        let block_sealing_senders = Arc::default();
260        let cached_archived_segment = Arc::default();
261        let archived_segment_header_subscriptions = Arc::default();
262
263        let rpc = FarmerRpc {
264            genesis_block: config.genesis_block,
265            solution_response_senders: Arc::clone(&solution_response_senders),
266            block_sealing_senders: Arc::clone(&block_sealing_senders),
267            dsn_bootstrap_nodes: config.dsn_bootstrap_nodes,
268            chain_info: config.chain_info,
269            cached_archived_segment: Arc::clone(&cached_archived_segment),
270            archived_segment_acknowledgement_senders: Arc::default(),
271            chain_sync_status: config.chain_sync_status,
272            consensus_constants: config.consensus_constants,
273            max_pieces_in_sector: config.max_pieces_in_sector,
274            slot_info_subscriptions: Arc::clone(&slot_info_subscriptions),
275            block_sealing_subscriptions: Arc::clone(&block_sealing_subscriptions),
276            archived_segment_header_subscriptions: Arc::clone(
277                &archived_segment_header_subscriptions,
278            ),
279            shard_membership_connections: Arc::default(),
280            shard_membership_updates_sender: config.shard_membership_updates_sender,
281            erasure_coding: config.erasure_coding,
282        };
283
284        Ok(Self {
285            server: Some(server),
286            rpc: Some(rpc),
287            new_slot_notification_receiver: config.new_slot_notification_receiver,
288            block_sealing_notification_receiver: config.block_sealing_notification_receiver,
289            archived_segment_notification_receiver: config.archived_segment_notification_receiver,
290            solution_response_senders,
291            block_sealing_senders,
292            cached_archived_segment,
293            archived_segment_acknowledgement_senders: Arc::new(Default::default()),
294            slot_info_subscriptions,
295            block_sealing_subscriptions,
296            archived_segment_header_subscriptions,
297        })
298    }
299
300    /// Drive RPC server tasks
301    pub async fn run(mut self) {
302        let server = self.server.take().expect("Called only once from here; qed");
303        let rpc = self.rpc.take().expect("Called only once from here; qed");
304        let mut server_fut = server.start(rpc.into_rpc()).stopped().boxed().fuse();
305
306        loop {
307            select! {
308                _ = server_fut => {}
309                maybe_new_slot_notification = self.new_slot_notification_receiver.next() => {
310                    let Some(new_slot_notification) = maybe_new_slot_notification else {
311                        break;
312                    };
313
314                    self.handle_new_slot_notification(new_slot_notification).await;
315                }
316                maybe_block_sealing_notification = self.block_sealing_notification_receiver.next() => {
317                    let Some(block_sealing_notification) = maybe_block_sealing_notification else {
318                        break;
319                    };
320
321                    self.handle_block_sealing_notification(block_sealing_notification).await;
322                }
323                maybe_archived_segment_notification = self.archived_segment_notification_receiver.next() => {
324                    let Some(archived_segment_notification) = maybe_archived_segment_notification else {
325                        break;
326                    };
327
328                    self.handle_archived_segment_notification(archived_segment_notification).await;
329                }
330            }
331        }
332    }
333
334    async fn handle_new_slot_notification(&mut self, new_slot_notification: NewSlotNotification) {
335        let NewSlotNotification {
336            new_slot_info,
337            solution_sender,
338        } = new_slot_notification;
339
340        let NewSlotInfo {
341            slot,
342            proof_of_time,
343            solution_range,
344            shard_membership_entropy,
345            num_shards,
346        } = new_slot_info;
347
348        // Store solution sender so that we can retrieve it when solution comes from
349        // the farmer
350        let mut solution_response_senders = self.solution_response_senders.lock();
351        if solution_response_senders.peek(&slot).is_none() {
352            solution_response_senders.insert(slot, solution_sender);
353        }
354
355        let global_challenge = proof_of_time.derive_global_challenge(slot);
356
357        // This will be sent to the farmer
358        let slot_info = SlotInfo {
359            slot,
360            global_challenge,
361            solution_range: solution_range.to_leaf_shard(num_shards),
362            shard_membership_entropy,
363            num_shards,
364        };
365        let slot_info = serde_json::value::to_raw_value(&slot_info)
366            .expect("Serialization of slot info never fails; qed");
367
368        self.slot_info_subscriptions.lock().retain_mut(|sink| {
369            match sink.try_send(slot_info.clone()) {
370                Ok(()) => true,
371                Err(error) => match error {
372                    TrySendError::Closed(_) => {
373                        // Remove closed receivers
374                        false
375                    }
376                    TrySendError::Full(_) => {
377                        warn!(
378                            subscription_id = ?sink.subscription_id(),
379                            "Slot info receiver is too slow, dropping notification"
380                        );
381                        true
382                    }
383                },
384            }
385        });
386    }
387
388    async fn handle_block_sealing_notification(
389        &mut self,
390        block_sealing_notification: BlockSealNotification,
391    ) {
392        let BlockSealNotification {
393            pre_seal_hash,
394            public_key_hash,
395            seal_sender,
396        } = block_sealing_notification;
397
398        // Store signature sender so that we can retrieve it when a solution comes from the farmer
399        {
400            let mut block_sealing_senders = self.block_sealing_senders.lock();
401
402            if block_sealing_senders.current_pre_seal_hash != pre_seal_hash {
403                block_sealing_senders.current_pre_seal_hash = pre_seal_hash;
404                block_sealing_senders.senders.clear();
405            }
406
407            block_sealing_senders.senders.push(seal_sender);
408        }
409
410        // This will be sent to the farmer
411        let block_seal_info = BlockSealInfo {
412            pre_seal_hash,
413            public_key_hash,
414        };
415        let block_seal_info = serde_json::value::to_raw_value(&block_seal_info)
416            .expect("Serialization of block seal info never fails; qed");
417
418        self.block_sealing_subscriptions.lock().retain_mut(|sink| {
419            match sink.try_send(block_seal_info.clone()) {
420                Ok(()) => true,
421                Err(error) => match error {
422                    TrySendError::Closed(_) => {
423                        // Remove closed receivers
424                        false
425                    }
426                    TrySendError::Full(_) => {
427                        warn!(
428                            subscription_id = ?sink.subscription_id(),
429                            "Block seal info receiver is too slow, dropping notification"
430                        );
431                        true
432                    }
433                },
434            }
435        });
436    }
437
438    async fn handle_archived_segment_notification(
439        &mut self,
440        archived_segment_notification: ArchivedSegmentNotification,
441    ) {
442        let ArchivedSegmentNotification {
443            archived_segment,
444            acknowledgement_sender,
445        } = archived_segment_notification;
446
447        let segment_index = archived_segment.segment_header.segment_index();
448
449        // TODO: Combine `archived_segment_header_subscriptions` and
450        //  `archived_segment_acknowledgement_senders` under the same lock to avoid potential
451        //  accidental deadlock with future changed
452        self.archived_segment_header_subscriptions
453            .lock()
454            .retain_mut(|sink| {
455                let subscription_id = sink.subscription_id();
456
457                // Store acknowledgment sender so that we can retrieve it when acknowledgment
458                // comes from the farmer
459                let mut archived_segment_acknowledgement_senders =
460                    self.archived_segment_acknowledgement_senders.lock();
461
462                if archived_segment_acknowledgement_senders.segment_index != segment_index {
463                    archived_segment_acknowledgement_senders.segment_index = segment_index;
464                    archived_segment_acknowledgement_senders.senders.clear();
465                }
466
467                let maybe_archived_segment_header = match archived_segment_acknowledgement_senders
468                    .senders
469                    .entry(subscription_id.clone())
470                {
471                    Entry::Occupied(_) => {
472                        // No need to do anything, a farmer is processing a request
473                        None
474                    }
475                    Entry::Vacant(entry) => {
476                        entry.insert(acknowledgement_sender.clone());
477
478                        // This will be sent to the farmer
479                        Some(archived_segment.segment_header)
480                    }
481                };
482
483                self.cached_archived_segment
484                    .lock()
485                    .replace(CachedArchivedSegment::Weak(Arc::downgrade(
486                        &archived_segment,
487                    )));
488
489                // This will be sent to the farmer
490                let maybe_archived_segment_header =
491                    serde_json::value::to_raw_value(&maybe_archived_segment_header)
492                        .expect("Serialization of archived segment info never fails; qed");
493
494                match sink.try_send(maybe_archived_segment_header) {
495                    Ok(()) => true,
496                    Err(error) => match error {
497                        TrySendError::Closed(_) => {
498                            // Remove closed receivers
499                            archived_segment_acknowledgement_senders
500                                .senders
501                                .remove(&subscription_id);
502                            false
503                        }
504                        TrySendError::Full(_) => {
505                            warn!(
506                                ?subscription_id,
507                                "Block seal info receiver is too slow, dropping notification"
508                            );
509                            true
510                        }
511                    },
512                }
513            });
514    }
515}
516
517/// Implements the [`FarmerRpcApiServer`] trait for farmer to connect to
518#[derive(Debug)]
519struct FarmerRpc<CI, CSS>
520where
521    CI: ChainInfo<OwnedBeaconChainBlock>,
522    CSS: ChainSyncStatus,
523{
524    genesis_block: OwnedBeaconChainBlock,
525    solution_response_senders: Arc<Mutex<LruMap<SlotNumber, mpsc::Sender<Solution>>>>,
526    block_sealing_senders: Arc<Mutex<BlockSignatureSenders>>,
527    dsn_bootstrap_nodes: Vec<Multiaddr>,
528    chain_info: CI,
529    cached_archived_segment: Arc<Mutex<Option<CachedArchivedSegment>>>,
530    archived_segment_acknowledgement_senders:
531        Arc<Mutex<ArchivedSegmentHeaderAcknowledgementSenders>>,
532    chain_sync_status: CSS,
533    consensus_constants: ConsensusConstants,
534    max_pieces_in_sector: u16,
535    slot_info_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
536    block_sealing_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
537    archived_segment_header_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
538    shard_membership_connections: Arc<Mutex<ShardMembershipConnections>>,
539    shard_membership_updates_sender: mpsc::Sender<Vec<FarmerShardMembershipInfo>>,
540    erasure_coding: ErasureCoding,
541}
542
543#[async_trait]
544impl<CI, CSS> FarmerRpcApiServer for FarmerRpc<CI, CSS>
545where
546    CI: ChainInfo<OwnedBeaconChainBlock>,
547    CSS: ChainSyncStatus,
548{
549    fn get_farmer_app_info(&self) -> Result<FarmerAppInfo, Error> {
550        let last_segment_index = self
551            .chain_info
552            .max_segment_index()
553            .unwrap_or(SegmentIndex::ZERO);
554
555        let consensus_constants = &self.consensus_constants;
556        let protocol_info = FarmerProtocolInfo {
557            history_size: HistorySize::from(last_segment_index),
558            max_pieces_in_sector: self.max_pieces_in_sector,
559            recent_segments: consensus_constants.recent_segments,
560            recent_history_fraction: consensus_constants.recent_history_fraction,
561            min_sector_lifetime: consensus_constants.min_sector_lifetime,
562        };
563
564        let farmer_app_info = FarmerAppInfo {
565            genesis_root: *self.genesis_block.header.header().root(),
566            dsn_bootstrap_nodes: self.dsn_bootstrap_nodes.clone(),
567            syncing: self.chain_sync_status.is_syncing(),
568            farming_timeout: consensus_constants
569                .slot_duration
570                .as_duration()
571                .mul_f64(consensus_constants.block_authoring_delay.as_u64() as f64),
572            protocol_info,
573        };
574
575        Ok(farmer_app_info)
576    }
577
578    fn submit_solution_response(&self, solution_response: SolutionResponse) -> Result<(), Error> {
579        let slot = solution_response.slot_number;
580        let public_key_hash = solution_response.solution.public_key_hash;
581        let sector_index = solution_response.solution.sector_index;
582        let mut solution_response_senders = self.solution_response_senders.lock();
583
584        let success = solution_response_senders
585            .peek_mut(&slot)
586            .and_then(|sender| sender.try_send(solution_response.solution).ok())
587            .is_some();
588
589        if !success {
590            warn!(
591                %slot,
592                %sector_index,
593                %public_key_hash,
594                "Solution was ignored, likely because farmer was too slow"
595            );
596
597            return Err(Error::SolutionWasIgnored { slot });
598        }
599
600        Ok(())
601    }
602
603    async fn subscribe_slot_info(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
604        let subscription = pending.accept().await?;
605        self.slot_info_subscriptions.lock().push(subscription);
606
607        Ok(())
608    }
609
610    async fn subscribe_block_seal(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
611        let subscription = pending.accept().await?;
612        self.block_sealing_subscriptions.lock().push(subscription);
613
614        Ok(())
615    }
616
617    fn submit_block_seal(&self, block_seal: BlockSealResponse) -> Result<(), Error> {
618        let block_sealing_senders = self.block_sealing_senders.clone();
619
620        let mut block_sealing_senders = block_sealing_senders.lock();
621
622        if block_sealing_senders.current_pre_seal_hash == block_seal.pre_seal_hash
623            && let Some(sender) = block_sealing_senders.senders.pop()
624        {
625            let _ = sender.send(block_seal.seal);
626        }
627
628        Ok(())
629    }
630
631    async fn subscribe_archived_segment_header(
632        &self,
633        pending: PendingSubscriptionSink,
634    ) -> SubscriptionResult {
635        let subscription = pending.accept().await?;
636        self.archived_segment_header_subscriptions
637            .lock()
638            .push(subscription);
639
640        Ok(())
641    }
642
643    async fn acknowledge_archived_segment_header(
644        &self,
645        segment_index: SegmentIndex,
646    ) -> Result<(), Error> {
647        let archived_segment_acknowledgement_senders =
648            self.archived_segment_acknowledgement_senders.clone();
649
650        let maybe_sender = {
651            let mut archived_segment_acknowledgement_senders_guard =
652                archived_segment_acknowledgement_senders.lock();
653
654            (archived_segment_acknowledgement_senders_guard.segment_index == segment_index)
655                .then(|| {
656                    let last_key = archived_segment_acknowledgement_senders_guard
657                        .senders
658                        .keys()
659                        .next()
660                        .cloned()?;
661
662                    archived_segment_acknowledgement_senders_guard
663                        .senders
664                        .remove(&last_key)
665                })
666                .flatten()
667        };
668
669        if let Some(mut sender) = maybe_sender
670            && let Err(error) = sender.try_send(())
671            && !error.is_disconnected()
672        {
673            warn!(%error, "Failed to acknowledge archived segment");
674        }
675
676        debug!(%segment_index, "Acknowledged archived segment.");
677
678        Ok(())
679    }
680
681    // Note: this RPC uses the cached archived segment, which is only updated by archived segments
682    // subscriptions
683    fn piece(&self, requested_piece_index: PieceIndex) -> Result<Option<Piece>, Error> {
684        let archived_segment = {
685            let mut cached_archived_segment = self.cached_archived_segment.lock();
686
687            match cached_archived_segment
688                .as_ref()
689                .and_then(CachedArchivedSegment::get)
690            {
691                Some(archived_segment) => archived_segment,
692                None => {
693                    if requested_piece_index > SegmentIndex::ZERO.last_piece_index() {
694                        return Ok(None);
695                    }
696
697                    debug!(%requested_piece_index, "Re-creating the genesis segment on demand");
698
699                    // Re-create the genesis segment on demand
700                    let archived_segment = Arc::new(recreate_genesis_segment(
701                        &self.genesis_block,
702                        self.erasure_coding.clone(),
703                    ));
704
705                    cached_archived_segment.replace(CachedArchivedSegment::Genesis(Arc::clone(
706                        &archived_segment,
707                    )));
708
709                    archived_segment
710                }
711            }
712        };
713
714        if requested_piece_index.segment_index() == archived_segment.segment_header.segment_index()
715        {
716            return Ok(archived_segment
717                .pieces
718                .pieces()
719                .nth(requested_piece_index.position() as usize));
720        }
721
722        Ok(None)
723    }
724
725    async fn segment_headers(
726        &self,
727        segment_indices: Vec<SegmentIndex>,
728    ) -> Result<Vec<Option<SegmentHeader>>, Error> {
729        if segment_indices.len() > MAX_SEGMENT_HEADERS_PER_REQUEST {
730            error!(
731                "`segment_indices` length exceed the limit: {} ",
732                segment_indices.len()
733            );
734
735            return Err(Error::SegmentHeadersLengthExceeded {
736                actual: segment_indices.len(),
737            });
738        };
739
740        Ok(segment_indices
741            .into_iter()
742            .map(|segment_index| self.chain_info.get_segment_header(segment_index))
743            .collect())
744    }
745
746    async fn last_segment_headers(&self, limit: u32) -> Result<Vec<Option<SegmentHeader>>, Error> {
747        if limit as usize > MAX_SEGMENT_HEADERS_PER_REQUEST {
748            error!(
749                "Request limit ({}) exceed the server limit: {} ",
750                limit, MAX_SEGMENT_HEADERS_PER_REQUEST
751            );
752
753            return Err(Error::SegmentHeadersLengthExceeded {
754                actual: limit as usize,
755            });
756        };
757
758        let last_segment_index = self
759            .chain_info
760            .max_segment_index()
761            .unwrap_or(SegmentIndex::ZERO);
762
763        let mut last_segment_headers = (SegmentIndex::ZERO..=last_segment_index)
764            .rev()
765            .take(limit as usize)
766            .map(|segment_index| self.chain_info.get_segment_header(segment_index))
767            .collect::<Vec<_>>();
768
769        last_segment_headers.reverse();
770
771        Ok(last_segment_headers)
772    }
773
774    async fn update_shard_membership_info(
775        &self,
776        extensions: &Extensions,
777        info: Vec<FarmerShardMembershipInfo>,
778    ) -> Result<(), Error> {
779        let connection_id = extensions
780            .get::<ConnectionId>()
781            .expect("`ConnectionId` is always present; qed");
782
783        let shard_membership = {
784            let mut shard_membership_connections = self.shard_membership_connections.lock();
785
786            // TODO: This is a workaround for https://github.com/paritytech/jsonrpsee/issues/1617
787            //  and should be replaced with cleanup on disconnection once that issue is resolved
788            shard_membership_connections
789                .connections
790                .retain(|_connection_id, state| {
791                    state.last_update.elapsed() >= SHARD_MEMBERSHIP_EXPIRATION
792                });
793
794            shard_membership_connections.connections.insert(
795                *connection_id,
796                ShardMembershipConnectionsState {
797                    last_update: Instant::now(),
798                    info,
799                },
800            );
801
802            shard_membership_connections
803                .connections
804                .values()
805                .flat_map(|state| state.info.clone())
806                .collect::<Vec<_>>()
807        };
808
809        if let Err(error) = self
810            .shard_membership_updates_sender
811            .clone()
812            .send(shard_membership)
813            .await
814        {
815            warn!(%error, "Failed to send shard membership update");
816        }
817
818        Ok(())
819    }
820}