1use ab_archiving::archiver::NewArchivedSegment;
4use ab_client_api::{BeaconChainInfo, ChainSyncStatus};
5use ab_client_archiving::recreate::{
6 RecreateSegmentError, RecreateSegmentSuperSegmentDetails, recreate_genesis_segment,
7 recreate_segment,
8};
9use ab_client_block_authoring::slot_worker::{
10 BlockSealNotification, NewSlotInfo, NewSlotNotification,
11};
12use ab_client_consensus_common::ConsensusConstants;
13use ab_core_primitives::block::header::OwnedBlockHeaderSeal;
14use ab_core_primitives::block::owned::OwnedBeaconChainBlock;
15use ab_core_primitives::hashes::Blake3Hash;
16use ab_core_primitives::pieces::{Piece, PieceIndex};
17use ab_core_primitives::pot::SlotNumber;
18use ab_core_primitives::segments::{
19 HistorySize, LocalSegmentIndex, SegmentIndex, SuperSegment, SuperSegmentHeader,
20 SuperSegmentIndex, SuperSegmentRoot,
21};
22use ab_core_primitives::shard::ShardIndex;
23use ab_core_primitives::solutions::Solution;
24use ab_erasure_coding::ErasureCoding;
25use ab_farmer_components::FarmerProtocolInfo;
26use ab_farmer_rpc_primitives::{
27 BlockSealInfo, BlockSealResponse, FarmerAppInfo, FarmerShardMembershipInfo,
28 MAX_SUPER_SEGMENT_HEADERS_PER_REQUEST, SHARD_MEMBERSHIP_EXPIRATION, SlotInfo, SolutionResponse,
29};
30use ab_networking::libp2p::Multiaddr;
31use async_lock::Mutex as AsyncMutex;
32use futures::channel::{mpsc, oneshot};
33use futures::{FutureExt, SinkExt, StreamExt, select};
34use jsonrpsee::core::{SubscriptionResult, async_trait};
35use jsonrpsee::proc_macros::rpc;
36use jsonrpsee::server::{Server, ServerConfig};
37use jsonrpsee::tokio::task::{JoinError, spawn_blocking};
38use jsonrpsee::tokio::time::MissedTickBehavior;
39use jsonrpsee::types::{ErrorObject, ErrorObjectOwned};
40use jsonrpsee::{
41 ConnectionId, Extensions, PendingSubscriptionSink, SubscriptionSink, TrySendError,
42};
43use parking_lot::Mutex;
44use schnellru::{ByLength, LruMap};
45use std::collections::{HashMap, VecDeque};
46use std::io;
47use std::net::SocketAddr;
48use std::sync::Arc;
49use std::time::{Duration, Instant};
50use tracing::{error, info, warn};
51
52const CACHED_SUPER_SEGMENTS_CAPACITY: usize = 5;
53const CACHED_ARCHIVED_SEGMENT_TIMEOUT: Duration = Duration::from_mins(1);
54
55#[derive(Debug, thiserror::Error)]
57pub enum Error {
58 #[error("Solution was ignored for slot {slot}")]
60 SolutionWasIgnored {
61 slot: SlotNumber,
63 },
64 #[error(
66 "Super segment headers length exceeded the limit: \
67 {actual}/{MAX_SUPER_SEGMENT_HEADERS_PER_REQUEST}"
68 )]
69 SuperSegmentHeadersLengthExceeded {
70 actual: usize,
72 },
73 #[error("Failed to recreate segment: {0}")]
75 FailedToRecreateSegment(#[from] RecreateSegmentError),
76 #[error("Blocking task join error: {0}")]
78 BlockingTaskJoinError(#[from] JoinError),
79}
80
81impl From<Error> for ErrorObjectOwned {
82 fn from(error: Error) -> Self {
83 let code = match &error {
84 Error::SolutionWasIgnored { .. } => 0,
85 Error::SuperSegmentHeadersLengthExceeded { .. } => 1,
86 Error::FailedToRecreateSegment(_) => 2,
87 Error::BlockingTaskJoinError(_) => 3,
88 };
89
90 ErrorObject::owned(code, error.to_string(), None::<()>)
91 }
92}
93
94#[rpc(server)]
96pub trait FarmerRpcApi {
97 #[method(name = "getFarmerAppInfo")]
99 fn get_farmer_app_info(&self) -> Result<FarmerAppInfo, Error>;
100
101 #[method(name = "submitSolutionResponse")]
102 fn submit_solution_response(&self, solution_response: SolutionResponse) -> Result<(), Error>;
103
104 #[subscription(
106 name = "subscribeSlotInfo" => "slot_info",
107 unsubscribe = "unsubscribeSlotInfo",
108 item = SlotInfo,
109 )]
110 async fn subscribe_slot_info(&self) -> SubscriptionResult;
111
112 #[subscription(
114 name = "subscribeBlockSealing" => "block_seal",
115 unsubscribe = "unsubscribeBlockSealing",
116 item = BlockSealInfo,
117 )]
118 async fn subscribe_block_seal(&self) -> SubscriptionResult;
119
120 #[method(name = "submitBlockSeal")]
121 fn submit_block_seal(&self, block_seal: BlockSealResponse) -> Result<(), Error>;
122
123 #[subscription(
125 name = "subscribeNewSuperSegmentHeader" => "new_super_segment_header",
126 unsubscribe = "unsubscribeNewSuperSegmentHeader",
127 item = SuperSegmentHeader,
128 )]
129 async fn subscribe_new_super_segment_header(&self) -> SubscriptionResult;
130
131 #[method(name = "superSegmentHeaders")]
132 async fn super_segment_headers(
133 &self,
134 super_segment_indices: Vec<SuperSegmentIndex>,
135 ) -> Result<Vec<Option<SuperSegmentHeader>>, Error>;
136
137 #[method(name = "lastSuperSegmentHeaders")]
138 async fn last_super_segment_headers(
139 &self,
140 limit: u32,
141 ) -> Result<Vec<Option<SuperSegmentHeader>>, Error>;
142
143 #[method(name = "superSegmentRootForSegmentIndex")]
144 async fn super_segment_root_for_segment_index(
145 &self,
146 segment_index: SegmentIndex,
147 ) -> Result<Option<SuperSegmentRoot>, Error>;
148
149 #[method(name = "piece")]
150 async fn piece(&self, piece_index: PieceIndex) -> Result<Option<Piece>, Error>;
151
152 #[method(name = "updateShardMembershipInfo", with_extensions)]
153 async fn update_shard_membership_info(
154 &self,
155 info: Vec<FarmerShardMembershipInfo>,
156 ) -> Result<(), Error>;
157}
158
159#[derive(Debug, Default)]
160struct BlockSignatureSenders {
161 current_pre_seal_hash: Blake3Hash,
162 senders: Vec<oneshot::Sender<OwnedBlockHeaderSeal>>,
163}
164
165#[derive(Debug)]
166struct CachedSuperSegments {
167 super_segments: VecDeque<SuperSegment>,
168}
169
170impl Default for CachedSuperSegments {
171 fn default() -> Self {
172 Self {
173 super_segments: VecDeque::with_capacity(CACHED_SUPER_SEGMENTS_CAPACITY),
174 }
175 }
176}
177
178impl CachedSuperSegments {
179 fn get_for_segment_index(&self, segment_index: SegmentIndex) -> Option<&SuperSegment> {
180 self.super_segments.iter().find(|super_segment| {
181 let max_segment_index = super_segment.header.max_segment_index.as_inner();
182 let first_segment_index = max_segment_index
183 - SegmentIndex::from(u64::from(super_segment.header.num_segments))
184 + SegmentIndex::ONE;
185
186 (first_segment_index..=max_segment_index).contains(&segment_index)
187 })
188 }
189
190 fn add(&mut self, super_segment: SuperSegment) {
191 if self.super_segments.len() == CACHED_SUPER_SEGMENTS_CAPACITY {
192 self.super_segments.pop_front();
193 }
194
195 self.super_segments.push_back(super_segment);
196 }
197}
198
199#[derive(Debug)]
201struct CachedArchivedSegment {
202 segment_index: SegmentIndex,
203 segment: NewArchivedSegment,
204 last_used_at: Instant,
205}
206
207#[derive(Debug)]
208struct ShardMembershipConnectionsState {
209 last_update: Instant,
210 info: Vec<FarmerShardMembershipInfo>,
211}
212
213#[derive(Debug, Default)]
214struct ShardMembershipConnections {
215 connections: HashMap<ConnectionId, ShardMembershipConnectionsState>,
216}
217
218#[derive(Debug)]
220pub struct FarmerRpcConfig<BCI, CSS> {
221 pub listen_on: SocketAddr,
223 pub genesis_block: OwnedBeaconChainBlock,
225 pub consensus_constants: ConsensusConstants,
227 pub max_pieces_in_sector: u16,
229 pub new_slot_notification_receiver: mpsc::Receiver<NewSlotNotification>,
231 pub block_sealing_notification_receiver: mpsc::Receiver<BlockSealNotification>,
233 pub new_super_segment_notification_receiver: mpsc::Receiver<SuperSegment>,
235 pub shard_membership_updates_sender: mpsc::Sender<Vec<FarmerShardMembershipInfo>>,
237 pub dsn_bootstrap_nodes: Vec<Multiaddr>,
239 pub beacon_chain_info: BCI,
241 pub chain_sync_status: CSS,
243 pub erasure_coding: ErasureCoding,
245}
246
247#[derive(Debug)]
249pub struct FarmerRpcWorker<BCI, CSS>
250where
251 BCI: BeaconChainInfo,
252 CSS: ChainSyncStatus,
253{
254 server: Option<Server>,
255 rpc: Option<FarmerRpc<BCI, CSS>>,
256 new_slot_notification_receiver: mpsc::Receiver<NewSlotNotification>,
257 block_sealing_notification_receiver: mpsc::Receiver<BlockSealNotification>,
258 new_super_segment_notification_receiver: mpsc::Receiver<SuperSegment>,
259 solution_response_senders: Arc<Mutex<LruMap<SlotNumber, mpsc::Sender<Solution>>>>,
260 block_sealing_senders: Arc<Mutex<BlockSignatureSenders>>,
261 slot_info_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
262 block_sealing_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
263 new_super_segment_header_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
264 cached_archived_segment: Arc<AsyncMutex<Option<CachedArchivedSegment>>>,
265 cached_super_segments: Arc<Mutex<CachedSuperSegments>>,
266}
267
268impl<BCI, CSS> FarmerRpcWorker<BCI, CSS>
269where
270 BCI: BeaconChainInfo,
271 CSS: ChainSyncStatus,
272{
273 pub async fn new(config: FarmerRpcConfig<BCI, CSS>) -> io::Result<Self> {
275 let server = Server::builder()
276 .set_config(ServerConfig::builder().ws_only().build())
277 .build(config.listen_on)
278 .await?;
279
280 let address = server.local_addr()?;
281 info!(%address, "Started farmer RPC server");
282
283 let block_authoring_delay = u64::from(config.consensus_constants.block_authoring_delay);
284 let block_authoring_delay = usize::try_from(block_authoring_delay)
285 .expect("Block authoring delay will never exceed usize on any platform; qed");
286 let solution_response_senders_capacity = u32::try_from(block_authoring_delay)
287 .expect("Always a tiny constant in the protocol; qed");
288
289 let slot_info_subscriptions = Arc::default();
290 let block_sealing_subscriptions = Arc::default();
291
292 let solution_response_senders = Arc::new(Mutex::new(LruMap::new(ByLength::new(
293 solution_response_senders_capacity,
294 ))));
295 let block_sealing_senders = Arc::default();
296 let new_super_segment_header_subscriptions = Arc::default();
297 let cached_archived_segment = Arc::default();
298 let cached_super_segments = Arc::default();
299
300 let rpc = FarmerRpc {
301 genesis_block: config.genesis_block,
302 solution_response_senders: Arc::clone(&solution_response_senders),
303 block_sealing_senders: Arc::clone(&block_sealing_senders),
304 dsn_bootstrap_nodes: config.dsn_bootstrap_nodes,
305 beacon_chain_info: config.beacon_chain_info,
306 chain_sync_status: config.chain_sync_status,
307 consensus_constants: config.consensus_constants,
308 max_pieces_in_sector: config.max_pieces_in_sector,
309 slot_info_subscriptions: Arc::clone(&slot_info_subscriptions),
310 block_sealing_subscriptions: Arc::clone(&block_sealing_subscriptions),
311 new_super_segment_header_subscriptions: Arc::clone(
312 &new_super_segment_header_subscriptions,
313 ),
314 cached_archived_segment: Arc::clone(&cached_archived_segment),
315 cached_super_segments: Arc::clone(&cached_super_segments),
316 shard_membership_connections: Arc::default(),
317 shard_membership_updates_sender: config.shard_membership_updates_sender,
318 erasure_coding: config.erasure_coding,
319 };
320
321 Ok(Self {
322 server: Some(server),
323 rpc: Some(rpc),
324 new_slot_notification_receiver: config.new_slot_notification_receiver,
325 block_sealing_notification_receiver: config.block_sealing_notification_receiver,
326 new_super_segment_notification_receiver: config.new_super_segment_notification_receiver,
327 solution_response_senders,
328 block_sealing_senders,
329 slot_info_subscriptions,
330 block_sealing_subscriptions,
331 new_super_segment_header_subscriptions,
332 cached_archived_segment,
333 cached_super_segments,
334 })
335 }
336
337 pub async fn run(mut self) {
339 let server = self.server.take().expect("Called only once from here; qed");
340 let rpc = self.rpc.take().expect("Called only once from here; qed");
341 let mut server_fut = server.start(rpc.into_rpc()).stopped().boxed().fuse();
342
343 let mut archived_segment_cache_cleanup_interval =
345 tokio::time::interval(CACHED_ARCHIVED_SEGMENT_TIMEOUT);
346 archived_segment_cache_cleanup_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
347
348 loop {
349 select! {
350 _ = server_fut => {}
351 maybe_new_slot_notification = self.new_slot_notification_receiver.next() => {
352 let Some(new_slot_notification) = maybe_new_slot_notification else {
353 break;
354 };
355
356 self.handle_new_slot_notification(new_slot_notification).await;
357 }
358 maybe_block_sealing_notification = self.block_sealing_notification_receiver.next() => {
359 let Some(block_sealing_notification) = maybe_block_sealing_notification else {
360 break;
361 };
362
363 self.handle_block_sealing_notification(block_sealing_notification).await;
364 }
365 maybe_new_super_segment = self.new_super_segment_notification_receiver.next() => {
366 let Some(new_super_segment) = maybe_new_super_segment else {
367 break;
368 };
369
370 self.handle_new_super_segment(new_super_segment).await;
371 }
372 _ = archived_segment_cache_cleanup_interval.tick().fuse() => {
373 if let Some(mut maybe_cached_archived_segment) = self.cached_archived_segment.try_lock()
374 && let Some(cached_archived_segment) = maybe_cached_archived_segment.as_ref()
375 && cached_archived_segment.last_used_at.elapsed() >= CACHED_ARCHIVED_SEGMENT_TIMEOUT
376 {
377 maybe_cached_archived_segment.take();
378 }
379 }
380 }
381 }
382 }
383
384 async fn handle_new_slot_notification(&mut self, new_slot_notification: NewSlotNotification) {
385 let NewSlotNotification {
386 new_slot_info,
387 solution_sender,
388 } = new_slot_notification;
389
390 let NewSlotInfo {
391 slot,
392 proof_of_time,
393 solution_range,
394 shard_membership_entropy,
395 num_shards,
396 } = new_slot_info;
397
398 let mut solution_response_senders = self.solution_response_senders.lock();
401 if solution_response_senders.peek(&slot).is_none() {
402 solution_response_senders.insert(slot, solution_sender);
403 }
404
405 let global_challenge = proof_of_time.derive_global_challenge(slot);
406
407 let slot_info = SlotInfo {
409 slot,
410 global_challenge,
411 solution_range: solution_range.to_leaf_shard(num_shards),
412 shard_membership_entropy,
413 num_shards,
414 };
415 let slot_info = serde_json::value::to_raw_value(&slot_info)
416 .expect("Serialization of slot info never fails; qed");
417
418 self.slot_info_subscriptions.lock().retain_mut(|sink| {
419 match sink.try_send(slot_info.clone()) {
420 Ok(()) => true,
421 Err(error) => match error {
422 TrySendError::Closed(_) => {
423 false
425 }
426 TrySendError::Full(_) => {
427 warn!(
428 subscription_id = ?sink.subscription_id(),
429 "Slot info receiver is too slow, dropping notification"
430 );
431 true
432 }
433 },
434 }
435 });
436 }
437
438 async fn handle_block_sealing_notification(
439 &mut self,
440 block_sealing_notification: BlockSealNotification,
441 ) {
442 let BlockSealNotification {
443 pre_seal_hash,
444 public_key_hash,
445 seal_sender,
446 } = block_sealing_notification;
447
448 {
450 let mut block_sealing_senders = self.block_sealing_senders.lock();
451
452 if block_sealing_senders.current_pre_seal_hash != pre_seal_hash {
453 block_sealing_senders.current_pre_seal_hash = pre_seal_hash;
454 block_sealing_senders.senders.clear();
455 }
456
457 block_sealing_senders.senders.push(seal_sender);
458 }
459
460 let block_seal_info = BlockSealInfo {
462 pre_seal_hash,
463 public_key_hash,
464 };
465 let block_seal_info = serde_json::value::to_raw_value(&block_seal_info)
466 .expect("Serialization of block seal info never fails; qed");
467
468 self.block_sealing_subscriptions.lock().retain_mut(|sink| {
469 match sink.try_send(block_seal_info.clone()) {
470 Ok(()) => true,
471 Err(error) => match error {
472 TrySendError::Closed(_) => {
473 false
475 }
476 TrySendError::Full(_) => {
477 warn!(
478 subscription_id = ?sink.subscription_id(),
479 "Block seal info receiver is too slow, dropping notification"
480 );
481 true
482 }
483 },
484 }
485 });
486 }
487
488 async fn handle_new_super_segment(&mut self, super_segment: SuperSegment) {
489 let super_segment_header = serde_json::value::to_raw_value(&super_segment.header)
491 .expect("Serialization of super segment info never fails; qed");
492
493 self.cached_super_segments.lock().add(super_segment);
494
495 self.new_super_segment_header_subscriptions
496 .lock()
497 .retain_mut(|sink| {
498 let subscription_id = sink.subscription_id();
499
500 match sink.try_send(super_segment_header.clone()) {
501 Ok(()) => true,
502 Err(error) => match error {
503 TrySendError::Closed(_) => false,
504 TrySendError::Full(_) => {
505 warn!(
506 ?subscription_id,
507 "Super segment receiver is too slow, dropping notification"
508 );
509 true
510 }
511 },
512 }
513 });
514 }
515}
516
517#[derive(Debug)]
519struct FarmerRpc<BCI, CSS>
520where
521 BCI: BeaconChainInfo,
522 CSS: ChainSyncStatus,
523{
524 genesis_block: OwnedBeaconChainBlock,
525 solution_response_senders: Arc<Mutex<LruMap<SlotNumber, mpsc::Sender<Solution>>>>,
526 block_sealing_senders: Arc<Mutex<BlockSignatureSenders>>,
527 dsn_bootstrap_nodes: Vec<Multiaddr>,
528 beacon_chain_info: BCI,
529 chain_sync_status: CSS,
530 consensus_constants: ConsensusConstants,
531 max_pieces_in_sector: u16,
532 slot_info_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
533 block_sealing_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
534 new_super_segment_header_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
535 cached_archived_segment: Arc<AsyncMutex<Option<CachedArchivedSegment>>>,
536 cached_super_segments: Arc<Mutex<CachedSuperSegments>>,
537 shard_membership_connections: Arc<Mutex<ShardMembershipConnections>>,
538 shard_membership_updates_sender: mpsc::Sender<Vec<FarmerShardMembershipInfo>>,
539 erasure_coding: ErasureCoding,
540}
541
542#[async_trait]
543impl<BCI, CSS> FarmerRpcApiServer for FarmerRpc<BCI, CSS>
544where
545 BCI: BeaconChainInfo,
546 CSS: ChainSyncStatus,
547{
548 fn get_farmer_app_info(&self) -> Result<FarmerAppInfo, Error> {
549 let max_segment_index = self
550 .beacon_chain_info
551 .last_super_segment_header()
552 .map(|super_segment_header| super_segment_header.max_segment_index.as_inner())
553 .unwrap_or(SegmentIndex::ZERO);
554
555 let consensus_constants = &self.consensus_constants;
556 let protocol_info = FarmerProtocolInfo {
557 history_size: HistorySize::from(max_segment_index),
558 max_pieces_in_sector: self.max_pieces_in_sector,
559 recent_segments: consensus_constants.recent_segments,
560 recent_history_fraction: consensus_constants.recent_history_fraction,
561 min_sector_lifetime: consensus_constants.min_sector_lifetime,
562 };
563
564 let farmer_app_info = FarmerAppInfo {
565 genesis_root: *self.genesis_block.header.header().root(),
566 dsn_bootstrap_nodes: self.dsn_bootstrap_nodes.clone(),
567 syncing: self.chain_sync_status.is_syncing(),
568 farming_timeout: consensus_constants
569 .slot_duration
570 .as_duration()
571 .mul_f64(u64::from(consensus_constants.block_authoring_delay) as f64),
572 protocol_info,
573 };
574
575 Ok(farmer_app_info)
576 }
577
578 fn submit_solution_response(&self, solution_response: SolutionResponse) -> Result<(), Error> {
579 let slot = solution_response.slot_number;
580 let public_key_hash = solution_response.solution.public_key_hash;
581 let sector_index = solution_response.solution.sector_index;
582 let mut solution_response_senders = self.solution_response_senders.lock();
583
584 let success = solution_response_senders
585 .peek_mut(&slot)
586 .and_then(|sender| sender.try_send(solution_response.solution).ok())
587 .is_some();
588
589 if !success {
590 warn!(
591 %slot,
592 %sector_index,
593 %public_key_hash,
594 "Solution was ignored, likely because farmer was too slow"
595 );
596
597 return Err(Error::SolutionWasIgnored { slot });
598 }
599
600 Ok(())
601 }
602
603 async fn subscribe_slot_info(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
604 let subscription = pending.accept().await?;
605 self.slot_info_subscriptions.lock().push(subscription);
606
607 Ok(())
608 }
609
610 async fn subscribe_block_seal(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
611 let subscription = pending.accept().await?;
612 self.block_sealing_subscriptions.lock().push(subscription);
613
614 Ok(())
615 }
616
617 fn submit_block_seal(&self, block_seal: BlockSealResponse) -> Result<(), Error> {
618 let block_sealing_senders = self.block_sealing_senders.clone();
619
620 let mut block_sealing_senders = block_sealing_senders.lock();
621
622 if block_sealing_senders.current_pre_seal_hash == block_seal.pre_seal_hash
623 && let Some(sender) = block_sealing_senders.senders.pop()
624 {
625 let _ = sender.send(block_seal.seal);
626 }
627
628 Ok(())
629 }
630
631 async fn subscribe_new_super_segment_header(
632 &self,
633 pending: PendingSubscriptionSink,
634 ) -> SubscriptionResult {
635 let subscription = pending.accept().await?;
636 self.new_super_segment_header_subscriptions
637 .lock()
638 .push(subscription);
639
640 Ok(())
641 }
642
643 async fn super_segment_headers(
644 &self,
645 super_segment_indices: Vec<SuperSegmentIndex>,
646 ) -> Result<Vec<Option<SuperSegmentHeader>>, Error> {
647 if super_segment_indices.len() > MAX_SUPER_SEGMENT_HEADERS_PER_REQUEST {
648 error!(
649 "`super_segment_indices` length exceed the limit: {} ",
650 super_segment_indices.len()
651 );
652
653 return Err(Error::SuperSegmentHeadersLengthExceeded {
654 actual: super_segment_indices.len(),
655 });
656 };
657
658 Ok(super_segment_indices
659 .into_iter()
660 .map(|super_segment_index| {
661 self.beacon_chain_info
662 .get_super_segment_header(super_segment_index)
663 })
664 .collect())
665 }
666
667 async fn last_super_segment_headers(
668 &self,
669 limit: u32,
670 ) -> Result<Vec<Option<SuperSegmentHeader>>, Error> {
671 if limit as usize > MAX_SUPER_SEGMENT_HEADERS_PER_REQUEST {
672 error!(
673 "Request limit ({}) exceed the server limit: {} ",
674 limit, MAX_SUPER_SEGMENT_HEADERS_PER_REQUEST
675 );
676
677 return Err(Error::SuperSegmentHeadersLengthExceeded {
678 actual: limit as usize,
679 });
680 };
681
682 let last_super_segment_index = self
683 .beacon_chain_info
684 .last_super_segment_header()
685 .map(|super_segment_header| super_segment_header.index.as_inner())
686 .unwrap_or(SuperSegmentIndex::ZERO);
687
688 let mut last_super_segment_headers = (SuperSegmentIndex::ZERO..=last_super_segment_index)
689 .rev()
690 .take(limit as usize)
691 .map(|super_segment_index| {
692 self.beacon_chain_info
693 .get_super_segment_header(super_segment_index)
694 })
695 .collect::<Vec<_>>();
696
697 last_super_segment_headers.reverse();
698
699 Ok(last_super_segment_headers)
700 }
701
702 async fn super_segment_root_for_segment_index(
703 &self,
704 segment_index: SegmentIndex,
705 ) -> Result<Option<SuperSegmentRoot>, Error> {
706 Ok(self
707 .beacon_chain_info
708 .get_super_segment_header_for_segment_index(segment_index)
709 .map(|super_segment_header| super_segment_header.root))
710 }
711
712 async fn piece(&self, requested_piece_index: PieceIndex) -> Result<Option<Piece>, Error> {
715 let segment_index = requested_piece_index.segment_index();
716 let cached_archived_segment = &mut *self.cached_archived_segment.lock().await;
717
718 if let Some(cached_archived_segment) = cached_archived_segment
719 && cached_archived_segment.segment_index == segment_index
720 {
721 cached_archived_segment.last_used_at = Instant::now();
722
723 return Ok(cached_archived_segment
724 .segment
725 .pieces
726 .pieces()
727 .nth(usize::from(requested_piece_index.position())));
728 }
729
730 if segment_index == SegmentIndex::ZERO {
731 let segment = spawn_blocking({
732 let genesis_block = self.genesis_block.clone();
733 let erasure_coding = self.erasure_coding.clone();
734
735 move || recreate_genesis_segment(&genesis_block, erasure_coding)
736 })
737 .await?;
738 let cached_archived_segment = cached_archived_segment.insert(CachedArchivedSegment {
739 segment_index: SegmentIndex::ZERO,
740 segment,
741 last_used_at: Instant::now(),
742 });
743
744 return Ok(cached_archived_segment
745 .segment
746 .pieces
747 .pieces()
748 .nth(usize::from(requested_piece_index.position())));
749 }
750
751 let (super_segment_index, shard_segment_root_with_position, segment_proof) = {
752 let cached_super_segments = self.cached_super_segments.lock();
753 let Some(super_segment) = cached_super_segments.get_for_segment_index(segment_index)
754 else {
755 return Ok(None);
756 };
757
758 let Some(shard_segment_root_with_position) = super_segment
759 .segment_roots
760 .iter()
761 .nth_back(u64::from(
762 super_segment.header.max_segment_index.as_inner() - segment_index,
763 ) as usize)
764 .copied()
765 else {
766 error!(
767 %requested_piece_index,
768 %segment_index,
769 super_segment_header = ?super_segment.header,
770 "Failed to find segment index inside super segment, this should never happen"
771 );
772 return Ok(None);
773 };
774
775 let segment_position = shard_segment_root_with_position.segment_position;
776
777 let Some(segment_proof) = super_segment.proof_for_segment(segment_position) else {
778 error!(
779 %requested_piece_index,
780 %segment_index,
781 %segment_position,
782 super_segment_header = ?super_segment.header,
783 "Failed to get segment proof for segment position, this should never happen"
784 );
785
786 return Ok(None);
787 };
788
789 (
790 super_segment.header.index.as_inner(),
791 shard_segment_root_with_position,
792 segment_proof,
793 )
794 };
795
796 let recreate_segment_super_segment_details = RecreateSegmentSuperSegmentDetails {
797 super_segment_index,
798 segment_position: shard_segment_root_with_position.segment_position,
799 segment_proof,
800 };
801
802 if shard_segment_root_with_position.shard_index != ShardIndex::BEACON_CHAIN {
803 unimplemented!("Shard segments for non-beacon chain shards are not supported yet");
806 }
807
808 let last_archived_segment = shard_segment_root_with_position
809 .local_segment_index
810 .checked_sub(LocalSegmentIndex::ONE)
811 .and_then(|last_segment_index| {
812 self.beacon_chain_info
813 .get_segment_header(last_segment_index)
814 });
815
816 let maybe_segment = recreate_segment(
817 last_archived_segment,
818 &self.beacon_chain_info,
819 self.erasure_coding.clone(),
820 &recreate_segment_super_segment_details,
821 |_| Vec::new(),
822 )
823 .await?;
824
825 let Some(segment) = maybe_segment else {
826 return Ok(None);
827 };
828
829 let cached_archived_segment = cached_archived_segment.insert(CachedArchivedSegment {
830 segment_index,
831 segment,
832 last_used_at: Instant::now(),
833 });
834
835 Ok(cached_archived_segment
836 .segment
837 .pieces
838 .pieces()
839 .nth(usize::from(requested_piece_index.position())))
840 }
841
842 async fn update_shard_membership_info(
843 &self,
844 extensions: &Extensions,
845 info: Vec<FarmerShardMembershipInfo>,
846 ) -> Result<(), Error> {
847 let connection_id = extensions
848 .get::<ConnectionId>()
849 .expect("`ConnectionId` is always present; qed");
850
851 let shard_membership = {
852 let mut shard_membership_connections = self.shard_membership_connections.lock();
853
854 shard_membership_connections
857 .connections
858 .retain(|_connection_id, state| {
859 state.last_update.elapsed() < SHARD_MEMBERSHIP_EXPIRATION
860 });
861
862 shard_membership_connections.connections.insert(
863 *connection_id,
864 ShardMembershipConnectionsState {
865 last_update: Instant::now(),
866 info,
867 },
868 );
869
870 shard_membership_connections
871 .connections
872 .values()
873 .flat_map(|state| state.info.clone())
874 .collect::<Vec<_>>()
875 };
876
877 if let Err(error) = self
878 .shard_membership_updates_sender
879 .clone()
880 .send(shard_membership)
881 .await
882 {
883 warn!(%error, "Failed to send shard membership update");
884 }
885
886 Ok(())
887 }
888}