Skip to main content

ab_node_rpc_server/
lib.rs

1//! RPC API for the farmer
2
3#![feature(try_blocks)]
4
5use ab_archiving::archiver::NewArchivedSegment;
6use ab_client_api::{ChainInfo, ChainSyncStatus};
7use ab_client_archiving::{ArchivedSegmentNotification, recreate_genesis_segment};
8use ab_client_block_authoring::slot_worker::{
9    BlockSealNotification, NewSlotInfo, NewSlotNotification,
10};
11use ab_client_consensus_common::ConsensusConstants;
12use ab_core_primitives::block::header::OwnedBlockHeaderSeal;
13use ab_core_primitives::block::owned::OwnedBeaconChainBlock;
14use ab_core_primitives::hashes::Blake3Hash;
15use ab_core_primitives::pieces::{Piece, PieceIndex};
16use ab_core_primitives::pot::SlotNumber;
17use ab_core_primitives::segments::{HistorySize, LocalSegmentIndex, SegmentHeader, SegmentIndex};
18use ab_core_primitives::solutions::Solution;
19use ab_erasure_coding::ErasureCoding;
20use ab_farmer_components::FarmerProtocolInfo;
21use ab_farmer_rpc_primitives::{
22    BlockSealInfo, BlockSealResponse, FarmerAppInfo, FarmerShardMembershipInfo,
23    MAX_SEGMENT_HEADERS_PER_REQUEST, SHARD_MEMBERSHIP_EXPIRATION, SlotInfo, SolutionResponse,
24};
25use ab_networking::libp2p::Multiaddr;
26use futures::channel::{mpsc, oneshot};
27use futures::{FutureExt, SinkExt, StreamExt, select};
28use jsonrpsee::core::{SubscriptionResult, async_trait};
29use jsonrpsee::proc_macros::rpc;
30use jsonrpsee::server::{Server, ServerConfig};
31use jsonrpsee::types::{ErrorObject, ErrorObjectOwned, SubscriptionId};
32use jsonrpsee::{
33    ConnectionId, Extensions, PendingSubscriptionSink, SubscriptionSink, TrySendError,
34};
35use parking_lot::Mutex;
36use schnellru::{ByLength, LruMap};
37use std::collections::HashMap;
38use std::collections::hash_map::Entry;
39use std::io;
40use std::net::SocketAddr;
41use std::sync::{Arc, Weak};
42use std::time::Instant;
43use tracing::{debug, error, info, warn};
44
45/// Top-level error type for the RPC handler.
46#[derive(Debug, thiserror::Error)]
47pub enum Error {
48    /// Solution was ignored
49    #[error("Solution was ignored for slot {slot}")]
50    SolutionWasIgnored {
51        /// Slot number
52        slot: SlotNumber,
53    },
54    /// Segment headers length exceeded the limit
55    #[error(
56        "Segment headers length exceeded the limit: {actual}/{MAX_SEGMENT_HEADERS_PER_REQUEST}"
57    )]
58    SegmentHeadersLengthExceeded {
59        /// Requested number of segment headers/indices
60        actual: usize,
61    },
62}
63
64impl From<Error> for ErrorObjectOwned {
65    fn from(error: Error) -> Self {
66        let code = match &error {
67            Error::SolutionWasIgnored { .. } => 0,
68            Error::SegmentHeadersLengthExceeded { .. } => 1,
69        };
70
71        ErrorObject::owned(code, error.to_string(), None::<()>)
72    }
73}
74
75/// Provides rpc methods for interacting with the farmer
76#[rpc(server)]
77pub trait FarmerRpcApi {
78    /// Get metadata necessary for farmer operation
79    #[method(name = "getFarmerAppInfo")]
80    fn get_farmer_app_info(&self) -> Result<FarmerAppInfo, Error>;
81
82    #[method(name = "submitSolutionResponse")]
83    fn submit_solution_response(&self, solution_response: SolutionResponse) -> Result<(), Error>;
84
85    /// Slot info subscription
86    #[subscription(
87        name = "subscribeSlotInfo" => "slot_info",
88        unsubscribe = "unsubscribeSlotInfo",
89        item = SlotInfo,
90    )]
91    async fn subscribe_slot_info(&self) -> SubscriptionResult;
92
93    /// Sign block subscription
94    #[subscription(
95        name = "subscribeBlockSealing" => "block_seal",
96        unsubscribe = "unsubscribeBlockSealing",
97        item = BlockSealInfo,
98    )]
99    async fn subscribe_block_seal(&self) -> SubscriptionResult;
100
101    #[method(name = "submitBlockSeal")]
102    fn submit_block_seal(&self, block_seal: BlockSealResponse) -> Result<(), Error>;
103
104    /// Archived segment header subscription
105    #[subscription(
106        name = "subscribeArchivedSegmentHeader" => "archived_segment_header",
107        unsubscribe = "unsubscribeArchivedSegmentHeader",
108        item = SegmentHeader,
109    )]
110    async fn subscribe_archived_segment_header(&self) -> SubscriptionResult;
111
112    #[method(name = "segmentHeaders")]
113    async fn segment_headers(
114        &self,
115        segment_indices: Vec<SegmentIndex>,
116    ) -> Result<Vec<Option<SegmentHeader>>, Error>;
117
118    #[method(name = "piece", blocking)]
119    fn piece(&self, piece_index: PieceIndex) -> Result<Option<Piece>, Error>;
120
121    #[method(name = "acknowledgeArchivedSegmentHeader")]
122    async fn acknowledge_archived_segment_header(
123        &self,
124        segment_index: SegmentIndex,
125    ) -> Result<(), Error>;
126
127    #[method(name = "lastSegmentHeaders")]
128    async fn last_segment_headers(&self, limit: u32) -> Result<Vec<Option<SegmentHeader>>, Error>;
129
130    #[method(name = "updateShardMembershipInfo", with_extensions)]
131    async fn update_shard_membership_info(
132        &self,
133        info: Vec<FarmerShardMembershipInfo>,
134    ) -> Result<(), Error>;
135}
136
137#[derive(Debug, Default)]
138struct ArchivedSegmentHeaderAcknowledgementSenders {
139    segment_index: SegmentIndex,
140    senders: HashMap<SubscriptionId<'static>, mpsc::Sender<()>>,
141}
142
143#[derive(Debug, Default)]
144struct BlockSignatureSenders {
145    current_pre_seal_hash: Blake3Hash,
146    senders: Vec<oneshot::Sender<OwnedBlockHeaderSeal>>,
147}
148
149/// In-memory cache of last archived segment, such that when request comes back right after
150/// archived segment notification, RPC server is able to answer quickly.
151///
152/// We store weak reference, such that archived segment is not persisted for longer than
153/// necessary occupying RAM.
154#[derive(Debug)]
155enum CachedArchivedSegment {
156    /// Special case for genesis segment when requested over RPC
157    Genesis(Arc<NewArchivedSegment>),
158    Weak(Weak<NewArchivedSegment>),
159}
160
161impl CachedArchivedSegment {
162    fn get(&self) -> Option<Arc<NewArchivedSegment>> {
163        match self {
164            CachedArchivedSegment::Genesis(archived_segment) => Some(Arc::clone(archived_segment)),
165            CachedArchivedSegment::Weak(weak_archived_segment) => weak_archived_segment.upgrade(),
166        }
167    }
168}
169
170#[derive(Debug)]
171struct ShardMembershipConnectionsState {
172    last_update: Instant,
173    info: Vec<FarmerShardMembershipInfo>,
174}
175
176#[derive(Debug, Default)]
177struct ShardMembershipConnections {
178    connections: HashMap<ConnectionId, ShardMembershipConnectionsState>,
179}
180
181/// Farmer RPC configuration
182#[derive(Debug)]
183pub struct FarmerRpcConfig<CI, CSS> {
184    /// IP and port (TCP) on which to listen for farmer RPC requests
185    pub listen_on: SocketAddr,
186    /// Genesis beacon beacon chain block
187    pub genesis_block: OwnedBeaconChainBlock,
188    /// Consensus constants
189    pub consensus_constants: ConsensusConstants,
190    /// Max pieces in sector
191    pub max_pieces_in_sector: u16,
192    /// New slot notifications
193    pub new_slot_notification_receiver: mpsc::Receiver<NewSlotNotification>,
194    /// Block sealing notifications
195    pub block_sealing_notification_receiver: mpsc::Receiver<BlockSealNotification>,
196    /// Archived segment notifications
197    pub archived_segment_notification_receiver: mpsc::Receiver<ArchivedSegmentNotification>,
198    /// Shard membership updates
199    pub shard_membership_updates_sender: mpsc::Sender<Vec<FarmerShardMembershipInfo>>,
200    /// DSN bootstrap nodes
201    pub dsn_bootstrap_nodes: Vec<Multiaddr>,
202    /// Beacon chain info
203    pub chain_info: CI,
204    /// Chain sync status
205    pub chain_sync_status: CSS,
206    /// Erasure coding instance
207    pub erasure_coding: ErasureCoding,
208}
209
210/// Worker that drives RPC server tasks
211#[derive(Debug)]
212pub struct FarmerRpcWorker<CI, CSS>
213where
214    CI: ChainInfo<OwnedBeaconChainBlock>,
215    CSS: ChainSyncStatus,
216{
217    server: Option<Server>,
218    rpc: Option<FarmerRpc<CI, CSS>>,
219    new_slot_notification_receiver: mpsc::Receiver<NewSlotNotification>,
220    block_sealing_notification_receiver: mpsc::Receiver<BlockSealNotification>,
221    archived_segment_notification_receiver: mpsc::Receiver<ArchivedSegmentNotification>,
222    solution_response_senders: Arc<Mutex<LruMap<SlotNumber, mpsc::Sender<Solution>>>>,
223    block_sealing_senders: Arc<Mutex<BlockSignatureSenders>>,
224    cached_archived_segment: Arc<Mutex<Option<CachedArchivedSegment>>>,
225    archived_segment_acknowledgement_senders:
226        Arc<Mutex<ArchivedSegmentHeaderAcknowledgementSenders>>,
227    slot_info_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
228    block_sealing_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
229    archived_segment_header_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
230}
231
232impl<CI, CSS> FarmerRpcWorker<CI, CSS>
233where
234    CI: ChainInfo<OwnedBeaconChainBlock>,
235    CSS: ChainSyncStatus,
236{
237    /// Creates a new farmer RPC worker
238    pub async fn new(config: FarmerRpcConfig<CI, CSS>) -> io::Result<Self> {
239        let server = Server::builder()
240            .set_config(ServerConfig::builder().ws_only().build())
241            .build(config.listen_on)
242            .await?;
243
244        let address = server.local_addr()?;
245        info!(%address, "Started farmer RPC server");
246
247        let block_authoring_delay = u64::from(config.consensus_constants.block_authoring_delay);
248        let block_authoring_delay = usize::try_from(block_authoring_delay)
249            .expect("Block authoring delay will never exceed usize on any platform; qed");
250        let solution_response_senders_capacity = u32::try_from(block_authoring_delay)
251            .expect("Always a tiny constant in the protocol; qed");
252
253        let slot_info_subscriptions = Arc::default();
254        let block_sealing_subscriptions = Arc::default();
255
256        let solution_response_senders = Arc::new(Mutex::new(LruMap::new(ByLength::new(
257            solution_response_senders_capacity,
258        ))));
259        let block_sealing_senders = Arc::default();
260        let cached_archived_segment = Arc::default();
261        let archived_segment_header_subscriptions = Arc::default();
262
263        let rpc = FarmerRpc {
264            genesis_block: config.genesis_block,
265            solution_response_senders: Arc::clone(&solution_response_senders),
266            block_sealing_senders: Arc::clone(&block_sealing_senders),
267            dsn_bootstrap_nodes: config.dsn_bootstrap_nodes,
268            chain_info: config.chain_info,
269            cached_archived_segment: Arc::clone(&cached_archived_segment),
270            archived_segment_acknowledgement_senders: Arc::default(),
271            chain_sync_status: config.chain_sync_status,
272            consensus_constants: config.consensus_constants,
273            max_pieces_in_sector: config.max_pieces_in_sector,
274            slot_info_subscriptions: Arc::clone(&slot_info_subscriptions),
275            block_sealing_subscriptions: Arc::clone(&block_sealing_subscriptions),
276            archived_segment_header_subscriptions: Arc::clone(
277                &archived_segment_header_subscriptions,
278            ),
279            shard_membership_connections: Arc::default(),
280            shard_membership_updates_sender: config.shard_membership_updates_sender,
281            erasure_coding: config.erasure_coding,
282        };
283
284        Ok(Self {
285            server: Some(server),
286            rpc: Some(rpc),
287            new_slot_notification_receiver: config.new_slot_notification_receiver,
288            block_sealing_notification_receiver: config.block_sealing_notification_receiver,
289            archived_segment_notification_receiver: config.archived_segment_notification_receiver,
290            solution_response_senders,
291            block_sealing_senders,
292            cached_archived_segment,
293            archived_segment_acknowledgement_senders: Arc::new(Default::default()),
294            slot_info_subscriptions,
295            block_sealing_subscriptions,
296            archived_segment_header_subscriptions,
297        })
298    }
299
300    /// Drive RPC server tasks
301    pub async fn run(mut self) {
302        let server = self.server.take().expect("Called only once from here; qed");
303        let rpc = self.rpc.take().expect("Called only once from here; qed");
304        let mut server_fut = server.start(rpc.into_rpc()).stopped().boxed().fuse();
305
306        loop {
307            select! {
308                _ = server_fut => {}
309                maybe_new_slot_notification = self.new_slot_notification_receiver.next() => {
310                    let Some(new_slot_notification) = maybe_new_slot_notification else {
311                        break;
312                    };
313
314                    self.handle_new_slot_notification(new_slot_notification).await;
315                }
316                maybe_block_sealing_notification = self.block_sealing_notification_receiver.next() => {
317                    let Some(block_sealing_notification) = maybe_block_sealing_notification else {
318                        break;
319                    };
320
321                    self.handle_block_sealing_notification(block_sealing_notification).await;
322                }
323                maybe_archived_segment_notification = self.archived_segment_notification_receiver.next() => {
324                    let Some(archived_segment_notification) = maybe_archived_segment_notification else {
325                        break;
326                    };
327
328                    self.handle_archived_segment_notification(archived_segment_notification).await;
329                }
330            }
331        }
332    }
333
334    async fn handle_new_slot_notification(&mut self, new_slot_notification: NewSlotNotification) {
335        let NewSlotNotification {
336            new_slot_info,
337            solution_sender,
338        } = new_slot_notification;
339
340        let NewSlotInfo {
341            slot,
342            proof_of_time,
343            solution_range,
344            shard_membership_entropy,
345            num_shards,
346        } = new_slot_info;
347
348        // Store solution sender so that we can retrieve it when solution comes from
349        // the farmer
350        let mut solution_response_senders = self.solution_response_senders.lock();
351        if solution_response_senders.peek(&slot).is_none() {
352            solution_response_senders.insert(slot, solution_sender);
353        }
354
355        let global_challenge = proof_of_time.derive_global_challenge(slot);
356
357        // This will be sent to the farmer
358        let slot_info = SlotInfo {
359            slot,
360            global_challenge,
361            solution_range: solution_range.to_leaf_shard(num_shards),
362            shard_membership_entropy,
363            num_shards,
364        };
365        let slot_info = serde_json::value::to_raw_value(&slot_info)
366            .expect("Serialization of slot info never fails; qed");
367
368        self.slot_info_subscriptions.lock().retain_mut(|sink| {
369            match sink.try_send(slot_info.clone()) {
370                Ok(()) => true,
371                Err(error) => match error {
372                    TrySendError::Closed(_) => {
373                        // Remove closed receivers
374                        false
375                    }
376                    TrySendError::Full(_) => {
377                        warn!(
378                            subscription_id = ?sink.subscription_id(),
379                            "Slot info receiver is too slow, dropping notification"
380                        );
381                        true
382                    }
383                },
384            }
385        });
386    }
387
388    async fn handle_block_sealing_notification(
389        &mut self,
390        block_sealing_notification: BlockSealNotification,
391    ) {
392        let BlockSealNotification {
393            pre_seal_hash,
394            public_key_hash,
395            seal_sender,
396        } = block_sealing_notification;
397
398        // Store signature sender so that we can retrieve it when a solution comes from the farmer
399        {
400            let mut block_sealing_senders = self.block_sealing_senders.lock();
401
402            if block_sealing_senders.current_pre_seal_hash != pre_seal_hash {
403                block_sealing_senders.current_pre_seal_hash = pre_seal_hash;
404                block_sealing_senders.senders.clear();
405            }
406
407            block_sealing_senders.senders.push(seal_sender);
408        }
409
410        // This will be sent to the farmer
411        let block_seal_info = BlockSealInfo {
412            pre_seal_hash,
413            public_key_hash,
414        };
415        let block_seal_info = serde_json::value::to_raw_value(&block_seal_info)
416            .expect("Serialization of block seal info never fails; qed");
417
418        self.block_sealing_subscriptions.lock().retain_mut(|sink| {
419            match sink.try_send(block_seal_info.clone()) {
420                Ok(()) => true,
421                Err(error) => match error {
422                    TrySendError::Closed(_) => {
423                        // Remove closed receivers
424                        false
425                    }
426                    TrySendError::Full(_) => {
427                        warn!(
428                            subscription_id = ?sink.subscription_id(),
429                            "Block seal info receiver is too slow, dropping notification"
430                        );
431                        true
432                    }
433                },
434            }
435        });
436    }
437
438    async fn handle_archived_segment_notification(
439        &mut self,
440        archived_segment_notification: ArchivedSegmentNotification,
441    ) {
442        let ArchivedSegmentNotification {
443            archived_segment,
444            acknowledgement_sender,
445        } = archived_segment_notification;
446
447        let segment_index =
448            SegmentIndex::from(archived_segment.segment_header.local_segment_index());
449
450        // TODO: Combine `archived_segment_header_subscriptions` and
451        //  `archived_segment_acknowledgement_senders` under the same lock to avoid potential
452        //  accidental deadlock with future changed
453        self.archived_segment_header_subscriptions
454            .lock()
455            .retain_mut(|sink| {
456                let subscription_id = sink.subscription_id();
457
458                // Store acknowledgment sender so that we can retrieve it when acknowledgment
459                // comes from the farmer
460                let mut archived_segment_acknowledgement_senders =
461                    self.archived_segment_acknowledgement_senders.lock();
462
463                if archived_segment_acknowledgement_senders.segment_index != segment_index {
464                    archived_segment_acknowledgement_senders.segment_index = segment_index;
465                    archived_segment_acknowledgement_senders.senders.clear();
466                }
467
468                let maybe_archived_segment_header = match archived_segment_acknowledgement_senders
469                    .senders
470                    .entry(subscription_id.clone())
471                {
472                    Entry::Occupied(_) => {
473                        // No need to do anything, a farmer is processing a request
474                        None
475                    }
476                    Entry::Vacant(entry) => {
477                        entry.insert(acknowledgement_sender.clone());
478
479                        // This will be sent to the farmer
480                        Some(archived_segment.segment_header)
481                    }
482                };
483
484                self.cached_archived_segment
485                    .lock()
486                    .replace(CachedArchivedSegment::Weak(Arc::downgrade(
487                        &archived_segment,
488                    )));
489
490                // This will be sent to the farmer
491                let maybe_archived_segment_header =
492                    serde_json::value::to_raw_value(&maybe_archived_segment_header)
493                        .expect("Serialization of archived segment info never fails; qed");
494
495                match sink.try_send(maybe_archived_segment_header) {
496                    Ok(()) => true,
497                    Err(error) => match error {
498                        TrySendError::Closed(_) => {
499                            // Remove closed receivers
500                            archived_segment_acknowledgement_senders
501                                .senders
502                                .remove(&subscription_id);
503                            false
504                        }
505                        TrySendError::Full(_) => {
506                            warn!(
507                                ?subscription_id,
508                                "Block seal info receiver is too slow, dropping notification"
509                            );
510                            true
511                        }
512                    },
513                }
514            });
515    }
516}
517
518/// Implements the [`FarmerRpcApiServer`] trait for farmer to connect to
519#[derive(Debug)]
520struct FarmerRpc<CI, CSS>
521where
522    CI: ChainInfo<OwnedBeaconChainBlock>,
523    CSS: ChainSyncStatus,
524{
525    genesis_block: OwnedBeaconChainBlock,
526    solution_response_senders: Arc<Mutex<LruMap<SlotNumber, mpsc::Sender<Solution>>>>,
527    block_sealing_senders: Arc<Mutex<BlockSignatureSenders>>,
528    dsn_bootstrap_nodes: Vec<Multiaddr>,
529    chain_info: CI,
530    cached_archived_segment: Arc<Mutex<Option<CachedArchivedSegment>>>,
531    archived_segment_acknowledgement_senders:
532        Arc<Mutex<ArchivedSegmentHeaderAcknowledgementSenders>>,
533    chain_sync_status: CSS,
534    consensus_constants: ConsensusConstants,
535    max_pieces_in_sector: u16,
536    slot_info_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
537    block_sealing_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
538    archived_segment_header_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
539    shard_membership_connections: Arc<Mutex<ShardMembershipConnections>>,
540    shard_membership_updates_sender: mpsc::Sender<Vec<FarmerShardMembershipInfo>>,
541    erasure_coding: ErasureCoding,
542}
543
544#[async_trait]
545impl<CI, CSS> FarmerRpcApiServer for FarmerRpc<CI, CSS>
546where
547    CI: ChainInfo<OwnedBeaconChainBlock>,
548    CSS: ChainSyncStatus,
549{
550    fn get_farmer_app_info(&self) -> Result<FarmerAppInfo, Error> {
551        let last_segment_index = self
552            .chain_info
553            .max_local_segment_index()
554            .unwrap_or(LocalSegmentIndex::ZERO);
555
556        let consensus_constants = &self.consensus_constants;
557        let protocol_info = FarmerProtocolInfo {
558            history_size: HistorySize::from(SegmentIndex::from(last_segment_index)),
559            max_pieces_in_sector: self.max_pieces_in_sector,
560            recent_segments: consensus_constants.recent_segments,
561            recent_history_fraction: consensus_constants.recent_history_fraction,
562            min_sector_lifetime: consensus_constants.min_sector_lifetime,
563        };
564
565        let farmer_app_info = FarmerAppInfo {
566            genesis_root: *self.genesis_block.header.header().root(),
567            dsn_bootstrap_nodes: self.dsn_bootstrap_nodes.clone(),
568            syncing: self.chain_sync_status.is_syncing(),
569            farming_timeout: consensus_constants
570                .slot_duration
571                .as_duration()
572                .mul_f64(consensus_constants.block_authoring_delay.as_u64() as f64),
573            protocol_info,
574        };
575
576        Ok(farmer_app_info)
577    }
578
579    fn submit_solution_response(&self, solution_response: SolutionResponse) -> Result<(), Error> {
580        let slot = solution_response.slot_number;
581        let public_key_hash = solution_response.solution.public_key_hash;
582        let sector_index = solution_response.solution.sector_index;
583        let mut solution_response_senders = self.solution_response_senders.lock();
584
585        let success = solution_response_senders
586            .peek_mut(&slot)
587            .and_then(|sender| sender.try_send(solution_response.solution).ok())
588            .is_some();
589
590        if !success {
591            warn!(
592                %slot,
593                %sector_index,
594                %public_key_hash,
595                "Solution was ignored, likely because farmer was too slow"
596            );
597
598            return Err(Error::SolutionWasIgnored { slot });
599        }
600
601        Ok(())
602    }
603
604    async fn subscribe_slot_info(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
605        let subscription = pending.accept().await?;
606        self.slot_info_subscriptions.lock().push(subscription);
607
608        Ok(())
609    }
610
611    async fn subscribe_block_seal(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
612        let subscription = pending.accept().await?;
613        self.block_sealing_subscriptions.lock().push(subscription);
614
615        Ok(())
616    }
617
618    fn submit_block_seal(&self, block_seal: BlockSealResponse) -> Result<(), Error> {
619        let block_sealing_senders = self.block_sealing_senders.clone();
620
621        let mut block_sealing_senders = block_sealing_senders.lock();
622
623        if block_sealing_senders.current_pre_seal_hash == block_seal.pre_seal_hash
624            && let Some(sender) = block_sealing_senders.senders.pop()
625        {
626            let _ = sender.send(block_seal.seal);
627        }
628
629        Ok(())
630    }
631
632    async fn subscribe_archived_segment_header(
633        &self,
634        pending: PendingSubscriptionSink,
635    ) -> SubscriptionResult {
636        let subscription = pending.accept().await?;
637        self.archived_segment_header_subscriptions
638            .lock()
639            .push(subscription);
640
641        Ok(())
642    }
643
644    async fn acknowledge_archived_segment_header(
645        &self,
646        segment_index: SegmentIndex,
647    ) -> Result<(), Error> {
648        let archived_segment_acknowledgement_senders =
649            self.archived_segment_acknowledgement_senders.clone();
650
651        let maybe_sender = {
652            let mut archived_segment_acknowledgement_senders_guard =
653                archived_segment_acknowledgement_senders.lock();
654
655            (archived_segment_acknowledgement_senders_guard.segment_index == segment_index)
656                .then(|| {
657                    let last_key = archived_segment_acknowledgement_senders_guard
658                        .senders
659                        .keys()
660                        .next()
661                        .cloned()?;
662
663                    archived_segment_acknowledgement_senders_guard
664                        .senders
665                        .remove(&last_key)
666                })
667                .flatten()
668        };
669
670        if let Some(mut sender) = maybe_sender
671            && let Err(error) = sender.try_send(())
672            && !error.is_disconnected()
673        {
674            warn!(%error, "Failed to acknowledge archived segment");
675        }
676
677        debug!(%segment_index, "Acknowledged archived segment.");
678
679        Ok(())
680    }
681
682    // Note: this RPC uses the cached archived segment, which is only updated by archived segments
683    // subscriptions
684    fn piece(&self, requested_piece_index: PieceIndex) -> Result<Option<Piece>, Error> {
685        let archived_segment = {
686            let mut cached_archived_segment = self.cached_archived_segment.lock();
687
688            match cached_archived_segment
689                .as_ref()
690                .and_then(CachedArchivedSegment::get)
691            {
692                Some(archived_segment) => archived_segment,
693                None => {
694                    if requested_piece_index > SegmentIndex::ZERO.last_piece_index() {
695                        return Ok(None);
696                    }
697
698                    debug!(%requested_piece_index, "Re-creating the genesis segment on demand");
699
700                    // Re-create the genesis segment on demand
701                    let archived_segment = Arc::new(recreate_genesis_segment(
702                        &self.genesis_block,
703                        self.erasure_coding.clone(),
704                    ));
705
706                    cached_archived_segment.replace(CachedArchivedSegment::Genesis(Arc::clone(
707                        &archived_segment,
708                    )));
709
710                    archived_segment
711                }
712            }
713        };
714
715        if requested_piece_index.segment_index()
716            == SegmentIndex::from(archived_segment.segment_header.local_segment_index())
717        {
718            return Ok(archived_segment
719                .pieces
720                .pieces()
721                .nth(usize::from(requested_piece_index.position())));
722        }
723
724        Ok(None)
725    }
726
727    async fn segment_headers(
728        &self,
729        segment_indices: Vec<SegmentIndex>,
730    ) -> Result<Vec<Option<SegmentHeader>>, Error> {
731        if segment_indices.len() > MAX_SEGMENT_HEADERS_PER_REQUEST {
732            error!(
733                "`segment_indices` length exceed the limit: {} ",
734                segment_indices.len()
735            );
736
737            return Err(Error::SegmentHeadersLengthExceeded {
738                actual: segment_indices.len(),
739            });
740        };
741
742        Ok(segment_indices
743            .into_iter()
744            .map(|segment_index| self.chain_info.get_segment_header(segment_index.into()))
745            .collect())
746    }
747
748    async fn last_segment_headers(&self, limit: u32) -> Result<Vec<Option<SegmentHeader>>, Error> {
749        if limit as usize > MAX_SEGMENT_HEADERS_PER_REQUEST {
750            error!(
751                "Request limit ({}) exceed the server limit: {} ",
752                limit, MAX_SEGMENT_HEADERS_PER_REQUEST
753            );
754
755            return Err(Error::SegmentHeadersLengthExceeded {
756                actual: limit as usize,
757            });
758        };
759
760        let last_segment_index = self
761            .chain_info
762            .max_local_segment_index()
763            .unwrap_or(LocalSegmentIndex::ZERO);
764
765        let mut last_segment_headers = (LocalSegmentIndex::ZERO..=last_segment_index)
766            .rev()
767            .take(limit as usize)
768            .map(|segment_index| self.chain_info.get_segment_header(segment_index))
769            .collect::<Vec<_>>();
770
771        last_segment_headers.reverse();
772
773        Ok(last_segment_headers)
774    }
775
776    async fn update_shard_membership_info(
777        &self,
778        extensions: &Extensions,
779        info: Vec<FarmerShardMembershipInfo>,
780    ) -> Result<(), Error> {
781        let connection_id = extensions
782            .get::<ConnectionId>()
783            .expect("`ConnectionId` is always present; qed");
784
785        let shard_membership = {
786            let mut shard_membership_connections = self.shard_membership_connections.lock();
787
788            // TODO: This is a workaround for https://github.com/paritytech/jsonrpsee/issues/1617
789            //  and should be replaced with cleanup on disconnection once that issue is resolved
790            shard_membership_connections
791                .connections
792                .retain(|_connection_id, state| {
793                    state.last_update.elapsed() >= SHARD_MEMBERSHIP_EXPIRATION
794                });
795
796            shard_membership_connections.connections.insert(
797                *connection_id,
798                ShardMembershipConnectionsState {
799                    last_update: Instant::now(),
800                    info,
801                },
802            );
803
804            shard_membership_connections
805                .connections
806                .values()
807                .flat_map(|state| state.info.clone())
808                .collect::<Vec<_>>()
809        };
810
811        if let Err(error) = self
812            .shard_membership_updates_sender
813            .clone()
814            .send(shard_membership)
815            .await
816        {
817            warn!(%error, "Failed to send shard membership update");
818        }
819
820        Ok(())
821    }
822}