ab_node_rpc_server/
lib.rs

1//! RPC API for the farmer
2
3#![feature(try_blocks)]
4
5use ab_archiving::archiver::NewArchivedSegment;
6use ab_client_api::{ChainInfo, ChainSyncStatus};
7use ab_client_archiving::{ArchivedSegmentNotification, recreate_genesis_segment};
8use ab_client_block_authoring::slot_worker::{BlockSealNotification, NewSlotNotification};
9use ab_client_consensus_common::ConsensusConstants;
10use ab_core_primitives::block::header::OwnedBlockHeaderSeal;
11use ab_core_primitives::block::owned::OwnedBeaconChainBlock;
12use ab_core_primitives::hashes::Blake3Hash;
13use ab_core_primitives::pieces::{Piece, PieceIndex};
14use ab_core_primitives::pot::SlotNumber;
15use ab_core_primitives::segments::{HistorySize, SegmentHeader, SegmentIndex};
16use ab_core_primitives::solutions::Solution;
17use ab_erasure_coding::ErasureCoding;
18use ab_farmer_components::FarmerProtocolInfo;
19use ab_farmer_rpc_primitives::{
20    BlockSealInfo, BlockSealResponse, FarmerAppInfo, MAX_SEGMENT_HEADERS_PER_REQUEST, SlotInfo,
21    SolutionResponse,
22};
23use ab_networking::libp2p::Multiaddr;
24use futures::channel::{mpsc, oneshot};
25use futures::{StreamExt, select};
26use jsonrpsee::core::{SubscriptionResult, async_trait};
27use jsonrpsee::proc_macros::rpc;
28use jsonrpsee::types::{ErrorObject, ErrorObjectOwned, SubscriptionId};
29use jsonrpsee::{PendingSubscriptionSink, SubscriptionSink, TrySendError};
30use parking_lot::Mutex;
31use schnellru::{ByLength, LruMap};
32use std::collections::HashMap;
33use std::collections::hash_map::Entry;
34use std::sync::{Arc, Weak};
35use tracing::{debug, error, warn};
36
37/// Top-level error type for the RPC handler.
38#[derive(Debug, thiserror::Error)]
39pub enum Error {
40    /// Solution was ignored
41    #[error("Solution was ignored for slot {slot}")]
42    SolutionWasIgnored {
43        /// Slot number
44        slot: SlotNumber,
45    },
46    /// Segment headers length exceeded the limit
47    #[error(
48        "Segment headers length exceeded the limit: {actual}/{MAX_SEGMENT_HEADERS_PER_REQUEST}"
49    )]
50    SegmentHeadersLengthExceeded {
51        /// Requested number of segment headers/indices
52        actual: usize,
53    },
54}
55
56impl From<Error> for ErrorObjectOwned {
57    fn from(error: Error) -> Self {
58        match &error {
59            Error::SolutionWasIgnored { .. } => {
60                ErrorObject::owned(0, error.to_string(), None::<()>)
61            }
62            Error::SegmentHeadersLengthExceeded { .. } => {
63                ErrorObject::owned(1, error.to_string(), None::<()>)
64            }
65        }
66    }
67}
68
69/// Provides rpc methods for interacting with the farmer
70#[rpc(server)]
71pub trait FarmerRpcApi {
72    /// Get metadata necessary for farmer operation
73    #[method(name = "getFarmerAppInfo")]
74    fn get_farmer_app_info(&self) -> Result<FarmerAppInfo, Error>;
75
76    #[method(name = "submitSolutionResponse")]
77    fn submit_solution_response(&self, solution_response: SolutionResponse) -> Result<(), Error>;
78
79    /// Slot info subscription
80    #[subscription(
81        name = "subscribeSlotInfo" => "slot_info",
82        unsubscribe = "unsubscribeSlotInfo",
83        item = SlotInfo,
84    )]
85    async fn subscribe_slot_info(&self) -> SubscriptionResult;
86
87    /// Sign block subscription
88    #[subscription(
89        name = "subscribeBlockSealing" => "block_seal",
90        unsubscribe = "unsubscribeBlockSealing",
91        item = BlockSealInfo,
92    )]
93    async fn subscribe_block_seal(&self) -> SubscriptionResult;
94
95    #[method(name = "submitBlockSeal")]
96    fn submit_block_seal(&self, block_seal: BlockSealResponse) -> Result<(), Error>;
97
98    /// Archived segment header subscription
99    #[subscription(
100        name = "subscribeArchivedSegmentHeader" => "archived_segment_header",
101        unsubscribe = "unsubscribeArchivedSegmentHeader",
102        item = SegmentHeader,
103    )]
104    async fn subscribe_archived_segment_header(&self) -> SubscriptionResult;
105
106    #[method(name = "segmentHeaders")]
107    async fn segment_headers(
108        &self,
109        segment_indices: Vec<SegmentIndex>,
110    ) -> Result<Vec<Option<SegmentHeader>>, Error>;
111
112    #[method(name = "piece", blocking)]
113    fn piece(&self, piece_index: PieceIndex) -> Result<Option<Piece>, Error>;
114
115    #[method(name = "acknowledgeArchivedSegmentHeader")]
116    async fn acknowledge_archived_segment_header(
117        &self,
118        segment_index: SegmentIndex,
119    ) -> Result<(), Error>;
120
121    #[method(name = "lastSegmentHeaders")]
122    async fn last_segment_headers(&self, limit: u32) -> Result<Vec<Option<SegmentHeader>>, Error>;
123}
124
125#[derive(Debug, Default)]
126struct ArchivedSegmentHeaderAcknowledgementSenders {
127    segment_index: SegmentIndex,
128    senders: HashMap<SubscriptionId<'static>, mpsc::Sender<()>>,
129}
130
131#[derive(Debug, Default)]
132struct BlockSignatureSenders {
133    current_pre_seal_hash: Blake3Hash,
134    senders: Vec<oneshot::Sender<OwnedBlockHeaderSeal>>,
135}
136
137/// In-memory cache of last archived segment, such that when request comes back right after
138/// archived segment notification, RPC server is able to answer quickly.
139///
140/// We store weak reference, such that archived segment is not persisted for longer than
141/// necessary occupying RAM.
142#[derive(Debug)]
143enum CachedArchivedSegment {
144    /// Special case for genesis segment when requested over RPC
145    Genesis(Arc<NewArchivedSegment>),
146    Weak(Weak<NewArchivedSegment>),
147}
148
149impl CachedArchivedSegment {
150    fn get(&self) -> Option<Arc<NewArchivedSegment>> {
151        match self {
152            CachedArchivedSegment::Genesis(archived_segment) => Some(Arc::clone(archived_segment)),
153            CachedArchivedSegment::Weak(weak_archived_segment) => weak_archived_segment.upgrade(),
154        }
155    }
156}
157
158/// Farmer RPC configuration
159#[derive(Debug)]
160pub struct FarmerRpcConfig<CI, CSS> {
161    /// Genesis beacon beacon chain block
162    pub genesis_block: OwnedBeaconChainBlock,
163    /// Consensus constants
164    pub consensus_constants: ConsensusConstants,
165    /// Max pieces in sector
166    pub max_pieces_in_sector: u16,
167    /// New slot notifications
168    pub new_slot_notification_receiver: mpsc::Receiver<NewSlotNotification>,
169    /// Block sealing notifications
170    pub block_sealing_notification_receiver: mpsc::Receiver<BlockSealNotification>,
171    /// Archived segment notifications
172    pub archived_segment_notification_receiver: mpsc::Receiver<ArchivedSegmentNotification>,
173    /// DSN bootstrap nodes
174    pub dsn_bootstrap_nodes: Vec<Multiaddr>,
175    /// Beacon chain info
176    pub chain_info: CI,
177    /// Chain sync status
178    pub chain_sync_status: CSS,
179    /// Erasure coding instance
180    pub erasure_coding: ErasureCoding,
181}
182
183/// Worker that drives RPC server tasks
184#[derive(Debug)]
185pub struct FarmerRpcWorker {
186    new_slot_notification_receiver: mpsc::Receiver<NewSlotNotification>,
187    block_sealing_notification_receiver: mpsc::Receiver<BlockSealNotification>,
188    archived_segment_notification_receiver: mpsc::Receiver<ArchivedSegmentNotification>,
189    solution_response_senders: Arc<Mutex<LruMap<SlotNumber, mpsc::Sender<Solution>>>>,
190    block_sealing_senders: Arc<Mutex<BlockSignatureSenders>>,
191    cached_archived_segment: Arc<Mutex<Option<CachedArchivedSegment>>>,
192    archived_segment_acknowledgement_senders:
193        Arc<Mutex<ArchivedSegmentHeaderAcknowledgementSenders>>,
194    slot_info_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
195    block_sealing_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
196    archived_segment_header_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
197}
198
199impl FarmerRpcWorker {
200    /// Drive RPC server tasks
201    pub async fn run(mut self) {
202        loop {
203            select! {
204                maybe_new_slot_notification = self.new_slot_notification_receiver.next() => {
205                    let Some(new_slot_notification) = maybe_new_slot_notification else {
206                        break;
207                    };
208
209                    self.handle_new_slot_notification(new_slot_notification).await;
210                }
211                maybe_block_sealing_notification = self.block_sealing_notification_receiver.next() => {
212                    let Some(block_sealing_notification) = maybe_block_sealing_notification else {
213                        break;
214                    };
215
216                    self.handle_block_sealing_notification(block_sealing_notification).await;
217                }
218                maybe_archived_segment_notification = self.archived_segment_notification_receiver.next() => {
219                    let Some(archived_segment_notification) = maybe_archived_segment_notification else {
220                        break;
221                    };
222
223                    self.handle_archived_segment_notification(archived_segment_notification).await;
224                }
225            }
226        }
227    }
228
229    async fn handle_new_slot_notification(&mut self, new_slot_notification: NewSlotNotification) {
230        let NewSlotNotification {
231            new_slot_info,
232            solution_sender,
233        } = new_slot_notification;
234
235        let slot_number = new_slot_info.slot;
236
237        // Store solution sender so that we can retrieve it when solution comes from
238        // the farmer
239        let mut solution_response_senders = self.solution_response_senders.lock();
240        if solution_response_senders.peek(&slot_number).is_none() {
241            solution_response_senders.insert(slot_number, solution_sender);
242        }
243
244        let global_challenge = new_slot_info
245            .proof_of_time
246            .derive_global_challenge(slot_number);
247
248        // This will be sent to the farmer
249        let slot_info = SlotInfo {
250            slot_number,
251            global_challenge,
252            solution_range: new_slot_info.solution_range,
253        };
254        let slot_info = serde_json::value::to_raw_value(&slot_info)
255            .expect("Serialization of slot info never fails; qed");
256
257        self.slot_info_subscriptions.lock().retain_mut(|sink| {
258            match sink.try_send(slot_info.clone()) {
259                Ok(()) => true,
260                Err(error) => match error {
261                    TrySendError::Closed(_) => {
262                        // Remove closed receivers
263                        false
264                    }
265                    TrySendError::Full(_) => {
266                        warn!(
267                            subscription_id = ?sink.subscription_id(),
268                            "Slot info receiver is too slow, dropping notification"
269                        );
270                        true
271                    }
272                },
273            }
274        });
275    }
276
277    async fn handle_block_sealing_notification(
278        &mut self,
279        block_sealing_notification: BlockSealNotification,
280    ) {
281        let BlockSealNotification {
282            pre_seal_hash,
283            public_key_hash,
284            seal_sender,
285        } = block_sealing_notification;
286
287        // Store signature sender so that we can retrieve it when a solution comes from the farmer
288        {
289            let mut block_sealing_senders = self.block_sealing_senders.lock();
290
291            if block_sealing_senders.current_pre_seal_hash != pre_seal_hash {
292                block_sealing_senders.current_pre_seal_hash = pre_seal_hash;
293                block_sealing_senders.senders.clear();
294            }
295
296            block_sealing_senders.senders.push(seal_sender);
297        }
298
299        // This will be sent to the farmer
300        let block_seal_info = BlockSealInfo {
301            pre_seal_hash,
302            public_key_hash,
303        };
304        let block_seal_info = serde_json::value::to_raw_value(&block_seal_info)
305            .expect("Serialization of block seal info never fails; qed");
306
307        self.block_sealing_subscriptions.lock().retain_mut(|sink| {
308            match sink.try_send(block_seal_info.clone()) {
309                Ok(()) => true,
310                Err(error) => match error {
311                    TrySendError::Closed(_) => {
312                        // Remove closed receivers
313                        false
314                    }
315                    TrySendError::Full(_) => {
316                        warn!(
317                            subscription_id = ?sink.subscription_id(),
318                            "Block seal info receiver is too slow, dropping notification"
319                        );
320                        true
321                    }
322                },
323            }
324        });
325    }
326
327    async fn handle_archived_segment_notification(
328        &mut self,
329        archived_segment_notification: ArchivedSegmentNotification,
330    ) {
331        let ArchivedSegmentNotification {
332            archived_segment,
333            acknowledgement_sender,
334        } = archived_segment_notification;
335
336        let segment_index = archived_segment.segment_header.segment_index();
337
338        self.archived_segment_header_subscriptions
339            .lock()
340            .retain_mut(|sink| {
341                let subscription_id = sink.subscription_id();
342
343                // Store acknowledgment sender so that we can retrieve it when acknowledgment
344                // comes from the farmer, but only if unsafe APIs are allowed
345                let mut archived_segment_acknowledgement_senders =
346                    self.archived_segment_acknowledgement_senders.lock();
347
348                if archived_segment_acknowledgement_senders.segment_index != segment_index {
349                    archived_segment_acknowledgement_senders.segment_index = segment_index;
350                    archived_segment_acknowledgement_senders.senders.clear();
351                }
352
353                let maybe_archived_segment_header = match archived_segment_acknowledgement_senders
354                    .senders
355                    .entry(subscription_id.clone())
356                {
357                    Entry::Occupied(_) => {
358                        // No need to do anything, a farmer is processing a request
359                        None
360                    }
361                    Entry::Vacant(entry) => {
362                        entry.insert(acknowledgement_sender.clone());
363
364                        // This will be sent to the farmer
365                        Some(archived_segment.segment_header)
366                    }
367                };
368
369                self.cached_archived_segment
370                    .lock()
371                    .replace(CachedArchivedSegment::Weak(Arc::downgrade(
372                        &archived_segment,
373                    )));
374
375                // This will be sent to the farmer
376                let maybe_archived_segment_header =
377                    serde_json::value::to_raw_value(&maybe_archived_segment_header)
378                        .expect("Serialization of archived segment info never fails; qed");
379
380                match sink.try_send(maybe_archived_segment_header) {
381                    Ok(()) => true,
382                    Err(error) => match error {
383                        TrySendError::Closed(_) => {
384                            // Remove closed receivers
385                            archived_segment_acknowledgement_senders
386                                .senders
387                                .remove(&subscription_id);
388                            false
389                        }
390                        TrySendError::Full(_) => {
391                            warn!(
392                                ?subscription_id,
393                                "Block seal info receiver is too slow, dropping notification"
394                            );
395                            true
396                        }
397                    },
398                }
399            });
400    }
401}
402
403/// Implements the [`FarmerRpcApiServer`] trait for farmer to connect to
404#[derive(Debug)]
405pub struct FarmerRpc<CI, CSS>
406where
407    CI: ChainInfo<OwnedBeaconChainBlock>,
408    CSS: ChainSyncStatus,
409{
410    genesis_block: OwnedBeaconChainBlock,
411    solution_response_senders: Arc<Mutex<LruMap<SlotNumber, mpsc::Sender<Solution>>>>,
412    block_sealing_senders: Arc<Mutex<BlockSignatureSenders>>,
413    dsn_bootstrap_nodes: Vec<Multiaddr>,
414    chain_info: CI,
415    cached_archived_segment: Arc<Mutex<Option<CachedArchivedSegment>>>,
416    archived_segment_acknowledgement_senders:
417        Arc<Mutex<ArchivedSegmentHeaderAcknowledgementSenders>>,
418    chain_sync_status: CSS,
419    consensus_constants: ConsensusConstants,
420    max_pieces_in_sector: u16,
421    slot_info_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
422    block_sealing_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
423    archived_segment_header_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
424    erasure_coding: ErasureCoding,
425}
426
427/// [`FarmerRpc`] is used for notifying subscribers about the arrival of new slots and for
428/// submission of solutions (or lack thereof).
429///
430/// Internally every time slot notifier emits information about a new slot, a notification is sent
431/// to every subscriber, after which the RPC server waits for the same number of
432/// `submitSolutionResponse` requests with `SolutionResponse` in them or until
433/// timeout is exceeded. The first valid solution for a particular slot wins, others are ignored.
434impl<CI, CSS> FarmerRpc<CI, CSS>
435where
436    CI: ChainInfo<OwnedBeaconChainBlock>,
437    CSS: ChainSyncStatus,
438{
439    /// Creates a new instance of the `FarmerRpc` handler.
440    pub fn new(config: FarmerRpcConfig<CI, CSS>) -> (Self, FarmerRpcWorker) {
441        let block_authoring_delay = u64::from(config.consensus_constants.block_authoring_delay);
442        let block_authoring_delay = usize::try_from(block_authoring_delay)
443            .expect("Block authoring delay will never exceed usize on any platform; qed");
444        let solution_response_senders_capacity = u32::try_from(block_authoring_delay)
445            .expect("Always a tiny constant in the protocol; qed");
446
447        let slot_info_subscriptions = Arc::default();
448        let block_sealing_subscriptions = Arc::default();
449
450        let solution_response_senders = Arc::new(Mutex::new(LruMap::new(ByLength::new(
451            solution_response_senders_capacity,
452        ))));
453        let block_sealing_senders = Arc::default();
454        let archived_segment_header_subscriptions = Arc::default();
455        let cached_archived_segment = Arc::default();
456
457        let rpc = Self {
458            genesis_block: config.genesis_block,
459            solution_response_senders: Arc::clone(&solution_response_senders),
460            block_sealing_senders: Arc::clone(&block_sealing_senders),
461            dsn_bootstrap_nodes: config.dsn_bootstrap_nodes,
462            chain_info: config.chain_info,
463            cached_archived_segment: Arc::clone(&cached_archived_segment),
464            archived_segment_acknowledgement_senders: Arc::default(),
465            chain_sync_status: config.chain_sync_status,
466            consensus_constants: config.consensus_constants,
467            max_pieces_in_sector: config.max_pieces_in_sector,
468            slot_info_subscriptions: Arc::clone(&slot_info_subscriptions),
469            block_sealing_subscriptions: Arc::clone(&block_sealing_subscriptions),
470            archived_segment_header_subscriptions: Arc::clone(
471                &archived_segment_header_subscriptions,
472            ),
473            erasure_coding: config.erasure_coding,
474        };
475
476        let worker = FarmerRpcWorker {
477            new_slot_notification_receiver: config.new_slot_notification_receiver,
478            block_sealing_notification_receiver: config.block_sealing_notification_receiver,
479            archived_segment_notification_receiver: config.archived_segment_notification_receiver,
480            solution_response_senders,
481            block_sealing_senders,
482            cached_archived_segment,
483            archived_segment_acknowledgement_senders: Arc::new(Default::default()),
484            slot_info_subscriptions,
485            block_sealing_subscriptions,
486            archived_segment_header_subscriptions,
487        };
488
489        (rpc, worker)
490    }
491}
492
493#[async_trait]
494impl<CI, CSS> FarmerRpcApiServer for FarmerRpc<CI, CSS>
495where
496    CI: ChainInfo<OwnedBeaconChainBlock>,
497    CSS: ChainSyncStatus,
498{
499    fn get_farmer_app_info(&self) -> Result<FarmerAppInfo, Error> {
500        let last_segment_index = self
501            .chain_info
502            .max_segment_index()
503            .unwrap_or(SegmentIndex::ZERO);
504
505        let consensus_constants = &self.consensus_constants;
506        let protocol_info = FarmerProtocolInfo {
507            history_size: HistorySize::from(last_segment_index),
508            max_pieces_in_sector: self.max_pieces_in_sector,
509            recent_segments: consensus_constants.recent_segments,
510            recent_history_fraction: consensus_constants.recent_history_fraction,
511            min_sector_lifetime: consensus_constants.min_sector_lifetime,
512        };
513
514        let farmer_app_info = FarmerAppInfo {
515            genesis_root: *self.genesis_block.header.header().root(),
516            dsn_bootstrap_nodes: self.dsn_bootstrap_nodes.clone(),
517            syncing: self.chain_sync_status.is_syncing(),
518            farming_timeout: consensus_constants
519                .slot_duration
520                .as_duration()
521                .mul_f64(consensus_constants.block_authoring_delay.as_u64() as f64),
522            protocol_info,
523        };
524
525        Ok(farmer_app_info)
526    }
527
528    fn submit_solution_response(&self, solution_response: SolutionResponse) -> Result<(), Error> {
529        let slot = solution_response.slot_number;
530        let public_key_hash = solution_response.solution.public_key_hash;
531        let sector_index = solution_response.solution.sector_index;
532        let mut solution_response_senders = self.solution_response_senders.lock();
533
534        let success = solution_response_senders
535            .peek_mut(&slot)
536            .and_then(|sender| sender.try_send(solution_response.solution).ok())
537            .is_some();
538
539        if !success {
540            warn!(
541                %slot,
542                %sector_index,
543                %public_key_hash,
544                "Solution was ignored, likely because farmer was too slow"
545            );
546
547            return Err(Error::SolutionWasIgnored { slot });
548        }
549
550        Ok(())
551    }
552
553    async fn subscribe_slot_info(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
554        let subscription = pending.accept().await?;
555        self.slot_info_subscriptions.lock().push(subscription);
556
557        Ok(())
558    }
559
560    async fn subscribe_block_seal(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
561        let subscription = pending.accept().await?;
562        self.block_sealing_subscriptions.lock().push(subscription);
563
564        Ok(())
565    }
566
567    fn submit_block_seal(&self, block_seal: BlockSealResponse) -> Result<(), Error> {
568        let block_sealing_senders = self.block_sealing_senders.clone();
569
570        let mut block_sealing_senders = block_sealing_senders.lock();
571
572        if block_sealing_senders.current_pre_seal_hash == block_seal.pre_seal_hash
573            && let Some(sender) = block_sealing_senders.senders.pop()
574        {
575            let _ = sender.send(block_seal.seal);
576        }
577
578        Ok(())
579    }
580
581    async fn subscribe_archived_segment_header(
582        &self,
583        pending: PendingSubscriptionSink,
584    ) -> SubscriptionResult {
585        let subscription = pending.accept().await?;
586        self.archived_segment_header_subscriptions
587            .lock()
588            .push(subscription);
589
590        Ok(())
591    }
592
593    async fn acknowledge_archived_segment_header(
594        &self,
595        segment_index: SegmentIndex,
596    ) -> Result<(), Error> {
597        let archived_segment_acknowledgement_senders =
598            self.archived_segment_acknowledgement_senders.clone();
599
600        let maybe_sender = {
601            let mut archived_segment_acknowledgement_senders_guard =
602                archived_segment_acknowledgement_senders.lock();
603
604            (archived_segment_acknowledgement_senders_guard.segment_index == segment_index)
605                .then(|| {
606                    let last_key = archived_segment_acknowledgement_senders_guard
607                        .senders
608                        .keys()
609                        .next()
610                        .cloned()?;
611
612                    archived_segment_acknowledgement_senders_guard
613                        .senders
614                        .remove(&last_key)
615                })
616                .flatten()
617        };
618
619        if let Some(mut sender) = maybe_sender
620            && let Err(error) = sender.try_send(())
621            && !error.is_disconnected()
622        {
623            warn!("Failed to acknowledge archived segment: {error}");
624        }
625
626        debug!(%segment_index, "Acknowledged archived segment.");
627
628        Ok(())
629    }
630
631    // Note: this RPC uses the cached archived segment, which is only updated by archived segments
632    // subscriptions
633    fn piece(&self, requested_piece_index: PieceIndex) -> Result<Option<Piece>, Error> {
634        let archived_segment = {
635            let mut cached_archived_segment = self.cached_archived_segment.lock();
636
637            match cached_archived_segment
638                .as_ref()
639                .and_then(CachedArchivedSegment::get)
640            {
641                Some(archived_segment) => archived_segment,
642                None => {
643                    if requested_piece_index > SegmentIndex::ZERO.last_piece_index() {
644                        return Ok(None);
645                    }
646
647                    debug!(%requested_piece_index, "Re-creating the genesis segment on demand");
648
649                    // Re-create the genesis segment on demand
650                    let archived_segment = Arc::new(recreate_genesis_segment(
651                        &self.genesis_block,
652                        self.erasure_coding.clone(),
653                    ));
654
655                    cached_archived_segment.replace(CachedArchivedSegment::Genesis(Arc::clone(
656                        &archived_segment,
657                    )));
658
659                    archived_segment
660                }
661            }
662        };
663
664        if requested_piece_index.segment_index() == archived_segment.segment_header.segment_index()
665        {
666            return Ok(archived_segment
667                .pieces
668                .pieces()
669                .nth(requested_piece_index.position() as usize));
670        }
671
672        Ok(None)
673    }
674
675    async fn segment_headers(
676        &self,
677        segment_indices: Vec<SegmentIndex>,
678    ) -> Result<Vec<Option<SegmentHeader>>, Error> {
679        if segment_indices.len() > MAX_SEGMENT_HEADERS_PER_REQUEST {
680            error!(
681                "`segment_indices` length exceed the limit: {} ",
682                segment_indices.len()
683            );
684
685            return Err(Error::SegmentHeadersLengthExceeded {
686                actual: segment_indices.len(),
687            });
688        };
689
690        Ok(segment_indices
691            .into_iter()
692            .map(|segment_index| self.chain_info.get_segment_header(segment_index))
693            .collect())
694    }
695
696    async fn last_segment_headers(&self, limit: u32) -> Result<Vec<Option<SegmentHeader>>, Error> {
697        if limit as usize > MAX_SEGMENT_HEADERS_PER_REQUEST {
698            error!(
699                "Request limit ({}) exceed the server limit: {} ",
700                limit, MAX_SEGMENT_HEADERS_PER_REQUEST
701            );
702
703            return Err(Error::SegmentHeadersLengthExceeded {
704                actual: limit as usize,
705            });
706        };
707
708        let last_segment_index = self
709            .chain_info
710            .max_segment_index()
711            .unwrap_or(SegmentIndex::ZERO);
712
713        let mut last_segment_headers = (SegmentIndex::ZERO..=last_segment_index)
714            .rev()
715            .take(limit as usize)
716            .map(|segment_index| self.chain_info.get_segment_header(segment_index))
717            .collect::<Vec<_>>();
718
719        last_segment_headers.reverse();
720
721        Ok(last_segment_headers)
722    }
723}