1#![feature(try_blocks)]
4
5use ab_archiving::archiver::NewArchivedSegment;
6use ab_client_api::{ChainInfo, ChainSyncStatus};
7use ab_client_archiving::{ArchivedSegmentNotification, recreate_genesis_segment};
8use ab_client_block_authoring::slot_worker::{
9 BlockSealNotification, NewSlotInfo, NewSlotNotification,
10};
11use ab_client_consensus_common::ConsensusConstants;
12use ab_core_primitives::block::header::OwnedBlockHeaderSeal;
13use ab_core_primitives::block::owned::OwnedBeaconChainBlock;
14use ab_core_primitives::hashes::Blake3Hash;
15use ab_core_primitives::pieces::{Piece, PieceIndex};
16use ab_core_primitives::pot::SlotNumber;
17use ab_core_primitives::segments::{HistorySize, LocalSegmentIndex, SegmentHeader, SegmentIndex};
18use ab_core_primitives::solutions::Solution;
19use ab_erasure_coding::ErasureCoding;
20use ab_farmer_components::FarmerProtocolInfo;
21use ab_farmer_rpc_primitives::{
22 BlockSealInfo, BlockSealResponse, FarmerAppInfo, FarmerShardMembershipInfo,
23 MAX_SEGMENT_HEADERS_PER_REQUEST, SHARD_MEMBERSHIP_EXPIRATION, SlotInfo, SolutionResponse,
24};
25use ab_networking::libp2p::Multiaddr;
26use futures::channel::{mpsc, oneshot};
27use futures::{FutureExt, SinkExt, StreamExt, select};
28use jsonrpsee::core::{SubscriptionResult, async_trait};
29use jsonrpsee::proc_macros::rpc;
30use jsonrpsee::server::{Server, ServerConfig};
31use jsonrpsee::types::{ErrorObject, ErrorObjectOwned, SubscriptionId};
32use jsonrpsee::{
33 ConnectionId, Extensions, PendingSubscriptionSink, SubscriptionSink, TrySendError,
34};
35use parking_lot::Mutex;
36use schnellru::{ByLength, LruMap};
37use std::collections::HashMap;
38use std::collections::hash_map::Entry;
39use std::io;
40use std::net::SocketAddr;
41use std::sync::{Arc, Weak};
42use std::time::Instant;
43use tracing::{debug, error, info, warn};
44
45#[derive(Debug, thiserror::Error)]
47pub enum Error {
48 #[error("Solution was ignored for slot {slot}")]
50 SolutionWasIgnored {
51 slot: SlotNumber,
53 },
54 #[error(
56 "Segment headers length exceeded the limit: {actual}/{MAX_SEGMENT_HEADERS_PER_REQUEST}"
57 )]
58 SegmentHeadersLengthExceeded {
59 actual: usize,
61 },
62}
63
64impl From<Error> for ErrorObjectOwned {
65 fn from(error: Error) -> Self {
66 let code = match &error {
67 Error::SolutionWasIgnored { .. } => 0,
68 Error::SegmentHeadersLengthExceeded { .. } => 1,
69 };
70
71 ErrorObject::owned(code, error.to_string(), None::<()>)
72 }
73}
74
75#[rpc(server)]
77pub trait FarmerRpcApi {
78 #[method(name = "getFarmerAppInfo")]
80 fn get_farmer_app_info(&self) -> Result<FarmerAppInfo, Error>;
81
82 #[method(name = "submitSolutionResponse")]
83 fn submit_solution_response(&self, solution_response: SolutionResponse) -> Result<(), Error>;
84
85 #[subscription(
87 name = "subscribeSlotInfo" => "slot_info",
88 unsubscribe = "unsubscribeSlotInfo",
89 item = SlotInfo,
90 )]
91 async fn subscribe_slot_info(&self) -> SubscriptionResult;
92
93 #[subscription(
95 name = "subscribeBlockSealing" => "block_seal",
96 unsubscribe = "unsubscribeBlockSealing",
97 item = BlockSealInfo,
98 )]
99 async fn subscribe_block_seal(&self) -> SubscriptionResult;
100
101 #[method(name = "submitBlockSeal")]
102 fn submit_block_seal(&self, block_seal: BlockSealResponse) -> Result<(), Error>;
103
104 #[subscription(
106 name = "subscribeArchivedSegmentHeader" => "archived_segment_header",
107 unsubscribe = "unsubscribeArchivedSegmentHeader",
108 item = SegmentHeader,
109 )]
110 async fn subscribe_archived_segment_header(&self) -> SubscriptionResult;
111
112 #[method(name = "segmentHeaders")]
113 async fn segment_headers(
114 &self,
115 segment_indices: Vec<SegmentIndex>,
116 ) -> Result<Vec<Option<SegmentHeader>>, Error>;
117
118 #[method(name = "piece", blocking)]
119 fn piece(&self, piece_index: PieceIndex) -> Result<Option<Piece>, Error>;
120
121 #[method(name = "acknowledgeArchivedSegmentHeader")]
122 async fn acknowledge_archived_segment_header(
123 &self,
124 segment_index: SegmentIndex,
125 ) -> Result<(), Error>;
126
127 #[method(name = "lastSegmentHeaders")]
128 async fn last_segment_headers(&self, limit: u32) -> Result<Vec<Option<SegmentHeader>>, Error>;
129
130 #[method(name = "updateShardMembershipInfo", with_extensions)]
131 async fn update_shard_membership_info(
132 &self,
133 info: Vec<FarmerShardMembershipInfo>,
134 ) -> Result<(), Error>;
135}
136
137#[derive(Debug, Default)]
138struct ArchivedSegmentHeaderAcknowledgementSenders {
139 segment_index: SegmentIndex,
140 senders: HashMap<SubscriptionId<'static>, mpsc::Sender<()>>,
141}
142
143#[derive(Debug, Default)]
144struct BlockSignatureSenders {
145 current_pre_seal_hash: Blake3Hash,
146 senders: Vec<oneshot::Sender<OwnedBlockHeaderSeal>>,
147}
148
149#[derive(Debug)]
155enum CachedArchivedSegment {
156 Genesis(Arc<NewArchivedSegment>),
158 Weak(Weak<NewArchivedSegment>),
159}
160
161impl CachedArchivedSegment {
162 fn get(&self) -> Option<Arc<NewArchivedSegment>> {
163 match self {
164 CachedArchivedSegment::Genesis(archived_segment) => Some(Arc::clone(archived_segment)),
165 CachedArchivedSegment::Weak(weak_archived_segment) => weak_archived_segment.upgrade(),
166 }
167 }
168}
169
170#[derive(Debug)]
171struct ShardMembershipConnectionsState {
172 last_update: Instant,
173 info: Vec<FarmerShardMembershipInfo>,
174}
175
176#[derive(Debug, Default)]
177struct ShardMembershipConnections {
178 connections: HashMap<ConnectionId, ShardMembershipConnectionsState>,
179}
180
181#[derive(Debug)]
183pub struct FarmerRpcConfig<CI, CSS> {
184 pub listen_on: SocketAddr,
186 pub genesis_block: OwnedBeaconChainBlock,
188 pub consensus_constants: ConsensusConstants,
190 pub max_pieces_in_sector: u16,
192 pub new_slot_notification_receiver: mpsc::Receiver<NewSlotNotification>,
194 pub block_sealing_notification_receiver: mpsc::Receiver<BlockSealNotification>,
196 pub archived_segment_notification_receiver: mpsc::Receiver<ArchivedSegmentNotification>,
198 pub shard_membership_updates_sender: mpsc::Sender<Vec<FarmerShardMembershipInfo>>,
200 pub dsn_bootstrap_nodes: Vec<Multiaddr>,
202 pub chain_info: CI,
204 pub chain_sync_status: CSS,
206 pub erasure_coding: ErasureCoding,
208}
209
210#[derive(Debug)]
212pub struct FarmerRpcWorker<CI, CSS>
213where
214 CI: ChainInfo<OwnedBeaconChainBlock>,
215 CSS: ChainSyncStatus,
216{
217 server: Option<Server>,
218 rpc: Option<FarmerRpc<CI, CSS>>,
219 new_slot_notification_receiver: mpsc::Receiver<NewSlotNotification>,
220 block_sealing_notification_receiver: mpsc::Receiver<BlockSealNotification>,
221 archived_segment_notification_receiver: mpsc::Receiver<ArchivedSegmentNotification>,
222 solution_response_senders: Arc<Mutex<LruMap<SlotNumber, mpsc::Sender<Solution>>>>,
223 block_sealing_senders: Arc<Mutex<BlockSignatureSenders>>,
224 cached_archived_segment: Arc<Mutex<Option<CachedArchivedSegment>>>,
225 archived_segment_acknowledgement_senders:
226 Arc<Mutex<ArchivedSegmentHeaderAcknowledgementSenders>>,
227 slot_info_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
228 block_sealing_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
229 archived_segment_header_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
230}
231
232impl<CI, CSS> FarmerRpcWorker<CI, CSS>
233where
234 CI: ChainInfo<OwnedBeaconChainBlock>,
235 CSS: ChainSyncStatus,
236{
237 pub async fn new(config: FarmerRpcConfig<CI, CSS>) -> io::Result<Self> {
239 let server = Server::builder()
240 .set_config(ServerConfig::builder().ws_only().build())
241 .build(config.listen_on)
242 .await?;
243
244 let address = server.local_addr()?;
245 info!(%address, "Started farmer RPC server");
246
247 let block_authoring_delay = u64::from(config.consensus_constants.block_authoring_delay);
248 let block_authoring_delay = usize::try_from(block_authoring_delay)
249 .expect("Block authoring delay will never exceed usize on any platform; qed");
250 let solution_response_senders_capacity = u32::try_from(block_authoring_delay)
251 .expect("Always a tiny constant in the protocol; qed");
252
253 let slot_info_subscriptions = Arc::default();
254 let block_sealing_subscriptions = Arc::default();
255
256 let solution_response_senders = Arc::new(Mutex::new(LruMap::new(ByLength::new(
257 solution_response_senders_capacity,
258 ))));
259 let block_sealing_senders = Arc::default();
260 let cached_archived_segment = Arc::default();
261 let archived_segment_header_subscriptions = Arc::default();
262
263 let rpc = FarmerRpc {
264 genesis_block: config.genesis_block,
265 solution_response_senders: Arc::clone(&solution_response_senders),
266 block_sealing_senders: Arc::clone(&block_sealing_senders),
267 dsn_bootstrap_nodes: config.dsn_bootstrap_nodes,
268 chain_info: config.chain_info,
269 cached_archived_segment: Arc::clone(&cached_archived_segment),
270 archived_segment_acknowledgement_senders: Arc::default(),
271 chain_sync_status: config.chain_sync_status,
272 consensus_constants: config.consensus_constants,
273 max_pieces_in_sector: config.max_pieces_in_sector,
274 slot_info_subscriptions: Arc::clone(&slot_info_subscriptions),
275 block_sealing_subscriptions: Arc::clone(&block_sealing_subscriptions),
276 archived_segment_header_subscriptions: Arc::clone(
277 &archived_segment_header_subscriptions,
278 ),
279 shard_membership_connections: Arc::default(),
280 shard_membership_updates_sender: config.shard_membership_updates_sender,
281 erasure_coding: config.erasure_coding,
282 };
283
284 Ok(Self {
285 server: Some(server),
286 rpc: Some(rpc),
287 new_slot_notification_receiver: config.new_slot_notification_receiver,
288 block_sealing_notification_receiver: config.block_sealing_notification_receiver,
289 archived_segment_notification_receiver: config.archived_segment_notification_receiver,
290 solution_response_senders,
291 block_sealing_senders,
292 cached_archived_segment,
293 archived_segment_acknowledgement_senders: Arc::new(Default::default()),
294 slot_info_subscriptions,
295 block_sealing_subscriptions,
296 archived_segment_header_subscriptions,
297 })
298 }
299
300 pub async fn run(mut self) {
302 let server = self.server.take().expect("Called only once from here; qed");
303 let rpc = self.rpc.take().expect("Called only once from here; qed");
304 let mut server_fut = server.start(rpc.into_rpc()).stopped().boxed().fuse();
305
306 loop {
307 select! {
308 _ = server_fut => {}
309 maybe_new_slot_notification = self.new_slot_notification_receiver.next() => {
310 let Some(new_slot_notification) = maybe_new_slot_notification else {
311 break;
312 };
313
314 self.handle_new_slot_notification(new_slot_notification).await;
315 }
316 maybe_block_sealing_notification = self.block_sealing_notification_receiver.next() => {
317 let Some(block_sealing_notification) = maybe_block_sealing_notification else {
318 break;
319 };
320
321 self.handle_block_sealing_notification(block_sealing_notification).await;
322 }
323 maybe_archived_segment_notification = self.archived_segment_notification_receiver.next() => {
324 let Some(archived_segment_notification) = maybe_archived_segment_notification else {
325 break;
326 };
327
328 self.handle_archived_segment_notification(archived_segment_notification).await;
329 }
330 }
331 }
332 }
333
334 async fn handle_new_slot_notification(&mut self, new_slot_notification: NewSlotNotification) {
335 let NewSlotNotification {
336 new_slot_info,
337 solution_sender,
338 } = new_slot_notification;
339
340 let NewSlotInfo {
341 slot,
342 proof_of_time,
343 solution_range,
344 shard_membership_entropy,
345 num_shards,
346 } = new_slot_info;
347
348 let mut solution_response_senders = self.solution_response_senders.lock();
351 if solution_response_senders.peek(&slot).is_none() {
352 solution_response_senders.insert(slot, solution_sender);
353 }
354
355 let global_challenge = proof_of_time.derive_global_challenge(slot);
356
357 let slot_info = SlotInfo {
359 slot,
360 global_challenge,
361 solution_range: solution_range.to_leaf_shard(num_shards),
362 shard_membership_entropy,
363 num_shards,
364 };
365 let slot_info = serde_json::value::to_raw_value(&slot_info)
366 .expect("Serialization of slot info never fails; qed");
367
368 self.slot_info_subscriptions.lock().retain_mut(|sink| {
369 match sink.try_send(slot_info.clone()) {
370 Ok(()) => true,
371 Err(error) => match error {
372 TrySendError::Closed(_) => {
373 false
375 }
376 TrySendError::Full(_) => {
377 warn!(
378 subscription_id = ?sink.subscription_id(),
379 "Slot info receiver is too slow, dropping notification"
380 );
381 true
382 }
383 },
384 }
385 });
386 }
387
388 async fn handle_block_sealing_notification(
389 &mut self,
390 block_sealing_notification: BlockSealNotification,
391 ) {
392 let BlockSealNotification {
393 pre_seal_hash,
394 public_key_hash,
395 seal_sender,
396 } = block_sealing_notification;
397
398 {
400 let mut block_sealing_senders = self.block_sealing_senders.lock();
401
402 if block_sealing_senders.current_pre_seal_hash != pre_seal_hash {
403 block_sealing_senders.current_pre_seal_hash = pre_seal_hash;
404 block_sealing_senders.senders.clear();
405 }
406
407 block_sealing_senders.senders.push(seal_sender);
408 }
409
410 let block_seal_info = BlockSealInfo {
412 pre_seal_hash,
413 public_key_hash,
414 };
415 let block_seal_info = serde_json::value::to_raw_value(&block_seal_info)
416 .expect("Serialization of block seal info never fails; qed");
417
418 self.block_sealing_subscriptions.lock().retain_mut(|sink| {
419 match sink.try_send(block_seal_info.clone()) {
420 Ok(()) => true,
421 Err(error) => match error {
422 TrySendError::Closed(_) => {
423 false
425 }
426 TrySendError::Full(_) => {
427 warn!(
428 subscription_id = ?sink.subscription_id(),
429 "Block seal info receiver is too slow, dropping notification"
430 );
431 true
432 }
433 },
434 }
435 });
436 }
437
438 async fn handle_archived_segment_notification(
439 &mut self,
440 archived_segment_notification: ArchivedSegmentNotification,
441 ) {
442 let ArchivedSegmentNotification {
443 archived_segment,
444 acknowledgement_sender,
445 } = archived_segment_notification;
446
447 let segment_index =
448 SegmentIndex::from(archived_segment.segment_header.local_segment_index());
449
450 self.archived_segment_header_subscriptions
454 .lock()
455 .retain_mut(|sink| {
456 let subscription_id = sink.subscription_id();
457
458 let mut archived_segment_acknowledgement_senders =
461 self.archived_segment_acknowledgement_senders.lock();
462
463 if archived_segment_acknowledgement_senders.segment_index != segment_index {
464 archived_segment_acknowledgement_senders.segment_index = segment_index;
465 archived_segment_acknowledgement_senders.senders.clear();
466 }
467
468 let maybe_archived_segment_header = match archived_segment_acknowledgement_senders
469 .senders
470 .entry(subscription_id.clone())
471 {
472 Entry::Occupied(_) => {
473 None
475 }
476 Entry::Vacant(entry) => {
477 entry.insert(acknowledgement_sender.clone());
478
479 Some(archived_segment.segment_header)
481 }
482 };
483
484 self.cached_archived_segment
485 .lock()
486 .replace(CachedArchivedSegment::Weak(Arc::downgrade(
487 &archived_segment,
488 )));
489
490 let maybe_archived_segment_header =
492 serde_json::value::to_raw_value(&maybe_archived_segment_header)
493 .expect("Serialization of archived segment info never fails; qed");
494
495 match sink.try_send(maybe_archived_segment_header) {
496 Ok(()) => true,
497 Err(error) => match error {
498 TrySendError::Closed(_) => {
499 archived_segment_acknowledgement_senders
501 .senders
502 .remove(&subscription_id);
503 false
504 }
505 TrySendError::Full(_) => {
506 warn!(
507 ?subscription_id,
508 "Block seal info receiver is too slow, dropping notification"
509 );
510 true
511 }
512 },
513 }
514 });
515 }
516}
517
518#[derive(Debug)]
520struct FarmerRpc<CI, CSS>
521where
522 CI: ChainInfo<OwnedBeaconChainBlock>,
523 CSS: ChainSyncStatus,
524{
525 genesis_block: OwnedBeaconChainBlock,
526 solution_response_senders: Arc<Mutex<LruMap<SlotNumber, mpsc::Sender<Solution>>>>,
527 block_sealing_senders: Arc<Mutex<BlockSignatureSenders>>,
528 dsn_bootstrap_nodes: Vec<Multiaddr>,
529 chain_info: CI,
530 cached_archived_segment: Arc<Mutex<Option<CachedArchivedSegment>>>,
531 archived_segment_acknowledgement_senders:
532 Arc<Mutex<ArchivedSegmentHeaderAcknowledgementSenders>>,
533 chain_sync_status: CSS,
534 consensus_constants: ConsensusConstants,
535 max_pieces_in_sector: u16,
536 slot_info_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
537 block_sealing_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
538 archived_segment_header_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
539 shard_membership_connections: Arc<Mutex<ShardMembershipConnections>>,
540 shard_membership_updates_sender: mpsc::Sender<Vec<FarmerShardMembershipInfo>>,
541 erasure_coding: ErasureCoding,
542}
543
544#[async_trait]
545impl<CI, CSS> FarmerRpcApiServer for FarmerRpc<CI, CSS>
546where
547 CI: ChainInfo<OwnedBeaconChainBlock>,
548 CSS: ChainSyncStatus,
549{
550 fn get_farmer_app_info(&self) -> Result<FarmerAppInfo, Error> {
551 let last_segment_index = self
552 .chain_info
553 .max_local_segment_index()
554 .unwrap_or(LocalSegmentIndex::ZERO);
555
556 let consensus_constants = &self.consensus_constants;
557 let protocol_info = FarmerProtocolInfo {
558 history_size: HistorySize::from(SegmentIndex::from(last_segment_index)),
559 max_pieces_in_sector: self.max_pieces_in_sector,
560 recent_segments: consensus_constants.recent_segments,
561 recent_history_fraction: consensus_constants.recent_history_fraction,
562 min_sector_lifetime: consensus_constants.min_sector_lifetime,
563 };
564
565 let farmer_app_info = FarmerAppInfo {
566 genesis_root: *self.genesis_block.header.header().root(),
567 dsn_bootstrap_nodes: self.dsn_bootstrap_nodes.clone(),
568 syncing: self.chain_sync_status.is_syncing(),
569 farming_timeout: consensus_constants
570 .slot_duration
571 .as_duration()
572 .mul_f64(consensus_constants.block_authoring_delay.as_u64() as f64),
573 protocol_info,
574 };
575
576 Ok(farmer_app_info)
577 }
578
579 fn submit_solution_response(&self, solution_response: SolutionResponse) -> Result<(), Error> {
580 let slot = solution_response.slot_number;
581 let public_key_hash = solution_response.solution.public_key_hash;
582 let sector_index = solution_response.solution.sector_index;
583 let mut solution_response_senders = self.solution_response_senders.lock();
584
585 let success = solution_response_senders
586 .peek_mut(&slot)
587 .and_then(|sender| sender.try_send(solution_response.solution).ok())
588 .is_some();
589
590 if !success {
591 warn!(
592 %slot,
593 %sector_index,
594 %public_key_hash,
595 "Solution was ignored, likely because farmer was too slow"
596 );
597
598 return Err(Error::SolutionWasIgnored { slot });
599 }
600
601 Ok(())
602 }
603
604 async fn subscribe_slot_info(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
605 let subscription = pending.accept().await?;
606 self.slot_info_subscriptions.lock().push(subscription);
607
608 Ok(())
609 }
610
611 async fn subscribe_block_seal(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
612 let subscription = pending.accept().await?;
613 self.block_sealing_subscriptions.lock().push(subscription);
614
615 Ok(())
616 }
617
618 fn submit_block_seal(&self, block_seal: BlockSealResponse) -> Result<(), Error> {
619 let block_sealing_senders = self.block_sealing_senders.clone();
620
621 let mut block_sealing_senders = block_sealing_senders.lock();
622
623 if block_sealing_senders.current_pre_seal_hash == block_seal.pre_seal_hash
624 && let Some(sender) = block_sealing_senders.senders.pop()
625 {
626 let _ = sender.send(block_seal.seal);
627 }
628
629 Ok(())
630 }
631
632 async fn subscribe_archived_segment_header(
633 &self,
634 pending: PendingSubscriptionSink,
635 ) -> SubscriptionResult {
636 let subscription = pending.accept().await?;
637 self.archived_segment_header_subscriptions
638 .lock()
639 .push(subscription);
640
641 Ok(())
642 }
643
644 async fn acknowledge_archived_segment_header(
645 &self,
646 segment_index: SegmentIndex,
647 ) -> Result<(), Error> {
648 let archived_segment_acknowledgement_senders =
649 self.archived_segment_acknowledgement_senders.clone();
650
651 let maybe_sender = {
652 let mut archived_segment_acknowledgement_senders_guard =
653 archived_segment_acknowledgement_senders.lock();
654
655 (archived_segment_acknowledgement_senders_guard.segment_index == segment_index)
656 .then(|| {
657 let last_key = archived_segment_acknowledgement_senders_guard
658 .senders
659 .keys()
660 .next()
661 .cloned()?;
662
663 archived_segment_acknowledgement_senders_guard
664 .senders
665 .remove(&last_key)
666 })
667 .flatten()
668 };
669
670 if let Some(mut sender) = maybe_sender
671 && let Err(error) = sender.try_send(())
672 && !error.is_disconnected()
673 {
674 warn!(%error, "Failed to acknowledge archived segment");
675 }
676
677 debug!(%segment_index, "Acknowledged archived segment.");
678
679 Ok(())
680 }
681
682 fn piece(&self, requested_piece_index: PieceIndex) -> Result<Option<Piece>, Error> {
685 let archived_segment = {
686 let mut cached_archived_segment = self.cached_archived_segment.lock();
687
688 match cached_archived_segment
689 .as_ref()
690 .and_then(CachedArchivedSegment::get)
691 {
692 Some(archived_segment) => archived_segment,
693 None => {
694 if requested_piece_index > SegmentIndex::ZERO.last_piece_index() {
695 return Ok(None);
696 }
697
698 debug!(%requested_piece_index, "Re-creating the genesis segment on demand");
699
700 let archived_segment = Arc::new(recreate_genesis_segment(
702 &self.genesis_block,
703 self.erasure_coding.clone(),
704 ));
705
706 cached_archived_segment.replace(CachedArchivedSegment::Genesis(Arc::clone(
707 &archived_segment,
708 )));
709
710 archived_segment
711 }
712 }
713 };
714
715 if requested_piece_index.segment_index()
716 == SegmentIndex::from(archived_segment.segment_header.local_segment_index())
717 {
718 return Ok(archived_segment
719 .pieces
720 .pieces()
721 .nth(usize::from(requested_piece_index.position())));
722 }
723
724 Ok(None)
725 }
726
727 async fn segment_headers(
728 &self,
729 segment_indices: Vec<SegmentIndex>,
730 ) -> Result<Vec<Option<SegmentHeader>>, Error> {
731 if segment_indices.len() > MAX_SEGMENT_HEADERS_PER_REQUEST {
732 error!(
733 "`segment_indices` length exceed the limit: {} ",
734 segment_indices.len()
735 );
736
737 return Err(Error::SegmentHeadersLengthExceeded {
738 actual: segment_indices.len(),
739 });
740 };
741
742 Ok(segment_indices
743 .into_iter()
744 .map(|segment_index| self.chain_info.get_segment_header(segment_index.into()))
745 .collect())
746 }
747
748 async fn last_segment_headers(&self, limit: u32) -> Result<Vec<Option<SegmentHeader>>, Error> {
749 if limit as usize > MAX_SEGMENT_HEADERS_PER_REQUEST {
750 error!(
751 "Request limit ({}) exceed the server limit: {} ",
752 limit, MAX_SEGMENT_HEADERS_PER_REQUEST
753 );
754
755 return Err(Error::SegmentHeadersLengthExceeded {
756 actual: limit as usize,
757 });
758 };
759
760 let last_segment_index = self
761 .chain_info
762 .max_local_segment_index()
763 .unwrap_or(LocalSegmentIndex::ZERO);
764
765 let mut last_segment_headers = (LocalSegmentIndex::ZERO..=last_segment_index)
766 .rev()
767 .take(limit as usize)
768 .map(|segment_index| self.chain_info.get_segment_header(segment_index))
769 .collect::<Vec<_>>();
770
771 last_segment_headers.reverse();
772
773 Ok(last_segment_headers)
774 }
775
776 async fn update_shard_membership_info(
777 &self,
778 extensions: &Extensions,
779 info: Vec<FarmerShardMembershipInfo>,
780 ) -> Result<(), Error> {
781 let connection_id = extensions
782 .get::<ConnectionId>()
783 .expect("`ConnectionId` is always present; qed");
784
785 let shard_membership = {
786 let mut shard_membership_connections = self.shard_membership_connections.lock();
787
788 shard_membership_connections
791 .connections
792 .retain(|_connection_id, state| {
793 state.last_update.elapsed() >= SHARD_MEMBERSHIP_EXPIRATION
794 });
795
796 shard_membership_connections.connections.insert(
797 *connection_id,
798 ShardMembershipConnectionsState {
799 last_update: Instant::now(),
800 info,
801 },
802 );
803
804 shard_membership_connections
805 .connections
806 .values()
807 .flat_map(|state| state.info.clone())
808 .collect::<Vec<_>>()
809 };
810
811 if let Err(error) = self
812 .shard_membership_updates_sender
813 .clone()
814 .send(shard_membership)
815 .await
816 {
817 warn!(%error, "Failed to send shard membership update");
818 }
819
820 Ok(())
821 }
822}