1#![feature(try_blocks)]
4
5use ab_archiving::archiver::NewArchivedSegment;
6use ab_client_api::{ChainInfo, ChainSyncStatus};
7use ab_client_archiving::{ArchivedSegmentNotification, recreate_genesis_segment};
8use ab_client_block_authoring::slot_worker::{
9 BlockSealNotification, NewSlotInfo, NewSlotNotification,
10};
11use ab_client_consensus_common::ConsensusConstants;
12use ab_core_primitives::block::header::OwnedBlockHeaderSeal;
13use ab_core_primitives::block::owned::OwnedBeaconChainBlock;
14use ab_core_primitives::hashes::Blake3Hash;
15use ab_core_primitives::pieces::{Piece, PieceIndex};
16use ab_core_primitives::pot::SlotNumber;
17use ab_core_primitives::segments::{HistorySize, SegmentHeader, SegmentIndex};
18use ab_core_primitives::solutions::Solution;
19use ab_erasure_coding::ErasureCoding;
20use ab_farmer_components::FarmerProtocolInfo;
21use ab_farmer_rpc_primitives::{
22 BlockSealInfo, BlockSealResponse, FarmerAppInfo, FarmerShardMembershipInfo,
23 MAX_SEGMENT_HEADERS_PER_REQUEST, SHARD_MEMBERSHIP_EXPIRATION, SlotInfo, SolutionResponse,
24};
25use ab_networking::libp2p::Multiaddr;
26use futures::channel::{mpsc, oneshot};
27use futures::{FutureExt, SinkExt, StreamExt, select};
28use jsonrpsee::core::{SubscriptionResult, async_trait};
29use jsonrpsee::proc_macros::rpc;
30use jsonrpsee::server::{Server, ServerConfig};
31use jsonrpsee::types::{ErrorObject, ErrorObjectOwned, SubscriptionId};
32use jsonrpsee::{
33 ConnectionId, Extensions, PendingSubscriptionSink, SubscriptionSink, TrySendError,
34};
35use parking_lot::Mutex;
36use schnellru::{ByLength, LruMap};
37use std::collections::HashMap;
38use std::collections::hash_map::Entry;
39use std::io;
40use std::net::SocketAddr;
41use std::sync::{Arc, Weak};
42use std::time::Instant;
43use tracing::{debug, error, info, warn};
44
45#[derive(Debug, thiserror::Error)]
47pub enum Error {
48 #[error("Solution was ignored for slot {slot}")]
50 SolutionWasIgnored {
51 slot: SlotNumber,
53 },
54 #[error(
56 "Segment headers length exceeded the limit: {actual}/{MAX_SEGMENT_HEADERS_PER_REQUEST}"
57 )]
58 SegmentHeadersLengthExceeded {
59 actual: usize,
61 },
62}
63
64impl From<Error> for ErrorObjectOwned {
65 fn from(error: Error) -> Self {
66 let code = match &error {
67 Error::SolutionWasIgnored { .. } => 0,
68 Error::SegmentHeadersLengthExceeded { .. } => 1,
69 };
70
71 ErrorObject::owned(code, error.to_string(), None::<()>)
72 }
73}
74
75#[rpc(server)]
77pub trait FarmerRpcApi {
78 #[method(name = "getFarmerAppInfo")]
80 fn get_farmer_app_info(&self) -> Result<FarmerAppInfo, Error>;
81
82 #[method(name = "submitSolutionResponse")]
83 fn submit_solution_response(&self, solution_response: SolutionResponse) -> Result<(), Error>;
84
85 #[subscription(
87 name = "subscribeSlotInfo" => "slot_info",
88 unsubscribe = "unsubscribeSlotInfo",
89 item = SlotInfo,
90 )]
91 async fn subscribe_slot_info(&self) -> SubscriptionResult;
92
93 #[subscription(
95 name = "subscribeBlockSealing" => "block_seal",
96 unsubscribe = "unsubscribeBlockSealing",
97 item = BlockSealInfo,
98 )]
99 async fn subscribe_block_seal(&self) -> SubscriptionResult;
100
101 #[method(name = "submitBlockSeal")]
102 fn submit_block_seal(&self, block_seal: BlockSealResponse) -> Result<(), Error>;
103
104 #[subscription(
106 name = "subscribeArchivedSegmentHeader" => "archived_segment_header",
107 unsubscribe = "unsubscribeArchivedSegmentHeader",
108 item = SegmentHeader,
109 )]
110 async fn subscribe_archived_segment_header(&self) -> SubscriptionResult;
111
112 #[method(name = "segmentHeaders")]
113 async fn segment_headers(
114 &self,
115 segment_indices: Vec<SegmentIndex>,
116 ) -> Result<Vec<Option<SegmentHeader>>, Error>;
117
118 #[method(name = "piece", blocking)]
119 fn piece(&self, piece_index: PieceIndex) -> Result<Option<Piece>, Error>;
120
121 #[method(name = "acknowledgeArchivedSegmentHeader")]
122 async fn acknowledge_archived_segment_header(
123 &self,
124 segment_index: SegmentIndex,
125 ) -> Result<(), Error>;
126
127 #[method(name = "lastSegmentHeaders")]
128 async fn last_segment_headers(&self, limit: u32) -> Result<Vec<Option<SegmentHeader>>, Error>;
129
130 #[method(name = "updateShardMembershipInfo", with_extensions)]
131 async fn update_shard_membership_info(
132 &self,
133 info: Vec<FarmerShardMembershipInfo>,
134 ) -> Result<(), Error>;
135}
136
137#[derive(Debug, Default)]
138struct ArchivedSegmentHeaderAcknowledgementSenders {
139 segment_index: SegmentIndex,
140 senders: HashMap<SubscriptionId<'static>, mpsc::Sender<()>>,
141}
142
143#[derive(Debug, Default)]
144struct BlockSignatureSenders {
145 current_pre_seal_hash: Blake3Hash,
146 senders: Vec<oneshot::Sender<OwnedBlockHeaderSeal>>,
147}
148
149#[derive(Debug)]
155enum CachedArchivedSegment {
156 Genesis(Arc<NewArchivedSegment>),
158 Weak(Weak<NewArchivedSegment>),
159}
160
161impl CachedArchivedSegment {
162 fn get(&self) -> Option<Arc<NewArchivedSegment>> {
163 match self {
164 CachedArchivedSegment::Genesis(archived_segment) => Some(Arc::clone(archived_segment)),
165 CachedArchivedSegment::Weak(weak_archived_segment) => weak_archived_segment.upgrade(),
166 }
167 }
168}
169
170#[derive(Debug)]
171struct ShardMembershipConnectionsState {
172 last_update: Instant,
173 info: Vec<FarmerShardMembershipInfo>,
174}
175
176#[derive(Debug, Default)]
177struct ShardMembershipConnections {
178 connections: HashMap<ConnectionId, ShardMembershipConnectionsState>,
179}
180
181#[derive(Debug)]
183pub struct FarmerRpcConfig<CI, CSS> {
184 pub listen_on: SocketAddr,
186 pub genesis_block: OwnedBeaconChainBlock,
188 pub consensus_constants: ConsensusConstants,
190 pub max_pieces_in_sector: u16,
192 pub new_slot_notification_receiver: mpsc::Receiver<NewSlotNotification>,
194 pub block_sealing_notification_receiver: mpsc::Receiver<BlockSealNotification>,
196 pub archived_segment_notification_receiver: mpsc::Receiver<ArchivedSegmentNotification>,
198 pub shard_membership_updates_sender: mpsc::Sender<Vec<FarmerShardMembershipInfo>>,
200 pub dsn_bootstrap_nodes: Vec<Multiaddr>,
202 pub chain_info: CI,
204 pub chain_sync_status: CSS,
206 pub erasure_coding: ErasureCoding,
208}
209
210#[derive(Debug)]
212pub struct FarmerRpcWorker<CI, CSS>
213where
214 CI: ChainInfo<OwnedBeaconChainBlock>,
215 CSS: ChainSyncStatus,
216{
217 server: Option<Server>,
218 rpc: Option<FarmerRpc<CI, CSS>>,
219 new_slot_notification_receiver: mpsc::Receiver<NewSlotNotification>,
220 block_sealing_notification_receiver: mpsc::Receiver<BlockSealNotification>,
221 archived_segment_notification_receiver: mpsc::Receiver<ArchivedSegmentNotification>,
222 solution_response_senders: Arc<Mutex<LruMap<SlotNumber, mpsc::Sender<Solution>>>>,
223 block_sealing_senders: Arc<Mutex<BlockSignatureSenders>>,
224 cached_archived_segment: Arc<Mutex<Option<CachedArchivedSegment>>>,
225 archived_segment_acknowledgement_senders:
226 Arc<Mutex<ArchivedSegmentHeaderAcknowledgementSenders>>,
227 slot_info_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
228 block_sealing_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
229 archived_segment_header_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
230}
231
232impl<CI, CSS> FarmerRpcWorker<CI, CSS>
233where
234 CI: ChainInfo<OwnedBeaconChainBlock>,
235 CSS: ChainSyncStatus,
236{
237 pub async fn new(config: FarmerRpcConfig<CI, CSS>) -> io::Result<Self> {
239 let server = Server::builder()
240 .set_config(ServerConfig::builder().ws_only().build())
241 .build(config.listen_on)
242 .await?;
243
244 let address = server.local_addr()?;
245 info!(%address, "Started farmer RPC server");
246
247 let block_authoring_delay = u64::from(config.consensus_constants.block_authoring_delay);
248 let block_authoring_delay = usize::try_from(block_authoring_delay)
249 .expect("Block authoring delay will never exceed usize on any platform; qed");
250 let solution_response_senders_capacity = u32::try_from(block_authoring_delay)
251 .expect("Always a tiny constant in the protocol; qed");
252
253 let slot_info_subscriptions = Arc::default();
254 let block_sealing_subscriptions = Arc::default();
255
256 let solution_response_senders = Arc::new(Mutex::new(LruMap::new(ByLength::new(
257 solution_response_senders_capacity,
258 ))));
259 let block_sealing_senders = Arc::default();
260 let cached_archived_segment = Arc::default();
261 let archived_segment_header_subscriptions = Arc::default();
262
263 let rpc = FarmerRpc {
264 genesis_block: config.genesis_block,
265 solution_response_senders: Arc::clone(&solution_response_senders),
266 block_sealing_senders: Arc::clone(&block_sealing_senders),
267 dsn_bootstrap_nodes: config.dsn_bootstrap_nodes,
268 chain_info: config.chain_info,
269 cached_archived_segment: Arc::clone(&cached_archived_segment),
270 archived_segment_acknowledgement_senders: Arc::default(),
271 chain_sync_status: config.chain_sync_status,
272 consensus_constants: config.consensus_constants,
273 max_pieces_in_sector: config.max_pieces_in_sector,
274 slot_info_subscriptions: Arc::clone(&slot_info_subscriptions),
275 block_sealing_subscriptions: Arc::clone(&block_sealing_subscriptions),
276 archived_segment_header_subscriptions: Arc::clone(
277 &archived_segment_header_subscriptions,
278 ),
279 shard_membership_connections: Arc::default(),
280 shard_membership_updates_sender: config.shard_membership_updates_sender,
281 erasure_coding: config.erasure_coding,
282 };
283
284 Ok(Self {
285 server: Some(server),
286 rpc: Some(rpc),
287 new_slot_notification_receiver: config.new_slot_notification_receiver,
288 block_sealing_notification_receiver: config.block_sealing_notification_receiver,
289 archived_segment_notification_receiver: config.archived_segment_notification_receiver,
290 solution_response_senders,
291 block_sealing_senders,
292 cached_archived_segment,
293 archived_segment_acknowledgement_senders: Arc::new(Default::default()),
294 slot_info_subscriptions,
295 block_sealing_subscriptions,
296 archived_segment_header_subscriptions,
297 })
298 }
299
300 pub async fn run(mut self) {
302 let server = self.server.take().expect("Called only once from here; qed");
303 let rpc = self.rpc.take().expect("Called only once from here; qed");
304 let mut server_fut = server.start(rpc.into_rpc()).stopped().boxed().fuse();
305
306 loop {
307 select! {
308 _ = server_fut => {}
309 maybe_new_slot_notification = self.new_slot_notification_receiver.next() => {
310 let Some(new_slot_notification) = maybe_new_slot_notification else {
311 break;
312 };
313
314 self.handle_new_slot_notification(new_slot_notification).await;
315 }
316 maybe_block_sealing_notification = self.block_sealing_notification_receiver.next() => {
317 let Some(block_sealing_notification) = maybe_block_sealing_notification else {
318 break;
319 };
320
321 self.handle_block_sealing_notification(block_sealing_notification).await;
322 }
323 maybe_archived_segment_notification = self.archived_segment_notification_receiver.next() => {
324 let Some(archived_segment_notification) = maybe_archived_segment_notification else {
325 break;
326 };
327
328 self.handle_archived_segment_notification(archived_segment_notification).await;
329 }
330 }
331 }
332 }
333
334 async fn handle_new_slot_notification(&mut self, new_slot_notification: NewSlotNotification) {
335 let NewSlotNotification {
336 new_slot_info,
337 solution_sender,
338 } = new_slot_notification;
339
340 let NewSlotInfo {
341 slot,
342 proof_of_time,
343 solution_range,
344 shard_membership_entropy,
345 num_shards,
346 } = new_slot_info;
347
348 let mut solution_response_senders = self.solution_response_senders.lock();
351 if solution_response_senders.peek(&slot).is_none() {
352 solution_response_senders.insert(slot, solution_sender);
353 }
354
355 let global_challenge = proof_of_time.derive_global_challenge(slot);
356
357 let slot_info = SlotInfo {
359 slot,
360 global_challenge,
361 solution_range: solution_range.to_leaf_shard(num_shards),
362 shard_membership_entropy,
363 num_shards,
364 };
365 let slot_info = serde_json::value::to_raw_value(&slot_info)
366 .expect("Serialization of slot info never fails; qed");
367
368 self.slot_info_subscriptions.lock().retain_mut(|sink| {
369 match sink.try_send(slot_info.clone()) {
370 Ok(()) => true,
371 Err(error) => match error {
372 TrySendError::Closed(_) => {
373 false
375 }
376 TrySendError::Full(_) => {
377 warn!(
378 subscription_id = ?sink.subscription_id(),
379 "Slot info receiver is too slow, dropping notification"
380 );
381 true
382 }
383 },
384 }
385 });
386 }
387
388 async fn handle_block_sealing_notification(
389 &mut self,
390 block_sealing_notification: BlockSealNotification,
391 ) {
392 let BlockSealNotification {
393 pre_seal_hash,
394 public_key_hash,
395 seal_sender,
396 } = block_sealing_notification;
397
398 {
400 let mut block_sealing_senders = self.block_sealing_senders.lock();
401
402 if block_sealing_senders.current_pre_seal_hash != pre_seal_hash {
403 block_sealing_senders.current_pre_seal_hash = pre_seal_hash;
404 block_sealing_senders.senders.clear();
405 }
406
407 block_sealing_senders.senders.push(seal_sender);
408 }
409
410 let block_seal_info = BlockSealInfo {
412 pre_seal_hash,
413 public_key_hash,
414 };
415 let block_seal_info = serde_json::value::to_raw_value(&block_seal_info)
416 .expect("Serialization of block seal info never fails; qed");
417
418 self.block_sealing_subscriptions.lock().retain_mut(|sink| {
419 match sink.try_send(block_seal_info.clone()) {
420 Ok(()) => true,
421 Err(error) => match error {
422 TrySendError::Closed(_) => {
423 false
425 }
426 TrySendError::Full(_) => {
427 warn!(
428 subscription_id = ?sink.subscription_id(),
429 "Block seal info receiver is too slow, dropping notification"
430 );
431 true
432 }
433 },
434 }
435 });
436 }
437
438 async fn handle_archived_segment_notification(
439 &mut self,
440 archived_segment_notification: ArchivedSegmentNotification,
441 ) {
442 let ArchivedSegmentNotification {
443 archived_segment,
444 acknowledgement_sender,
445 } = archived_segment_notification;
446
447 let segment_index = archived_segment.segment_header.segment_index();
448
449 self.archived_segment_header_subscriptions
453 .lock()
454 .retain_mut(|sink| {
455 let subscription_id = sink.subscription_id();
456
457 let mut archived_segment_acknowledgement_senders =
460 self.archived_segment_acknowledgement_senders.lock();
461
462 if archived_segment_acknowledgement_senders.segment_index != segment_index {
463 archived_segment_acknowledgement_senders.segment_index = segment_index;
464 archived_segment_acknowledgement_senders.senders.clear();
465 }
466
467 let maybe_archived_segment_header = match archived_segment_acknowledgement_senders
468 .senders
469 .entry(subscription_id.clone())
470 {
471 Entry::Occupied(_) => {
472 None
474 }
475 Entry::Vacant(entry) => {
476 entry.insert(acknowledgement_sender.clone());
477
478 Some(archived_segment.segment_header)
480 }
481 };
482
483 self.cached_archived_segment
484 .lock()
485 .replace(CachedArchivedSegment::Weak(Arc::downgrade(
486 &archived_segment,
487 )));
488
489 let maybe_archived_segment_header =
491 serde_json::value::to_raw_value(&maybe_archived_segment_header)
492 .expect("Serialization of archived segment info never fails; qed");
493
494 match sink.try_send(maybe_archived_segment_header) {
495 Ok(()) => true,
496 Err(error) => match error {
497 TrySendError::Closed(_) => {
498 archived_segment_acknowledgement_senders
500 .senders
501 .remove(&subscription_id);
502 false
503 }
504 TrySendError::Full(_) => {
505 warn!(
506 ?subscription_id,
507 "Block seal info receiver is too slow, dropping notification"
508 );
509 true
510 }
511 },
512 }
513 });
514 }
515}
516
517#[derive(Debug)]
519struct FarmerRpc<CI, CSS>
520where
521 CI: ChainInfo<OwnedBeaconChainBlock>,
522 CSS: ChainSyncStatus,
523{
524 genesis_block: OwnedBeaconChainBlock,
525 solution_response_senders: Arc<Mutex<LruMap<SlotNumber, mpsc::Sender<Solution>>>>,
526 block_sealing_senders: Arc<Mutex<BlockSignatureSenders>>,
527 dsn_bootstrap_nodes: Vec<Multiaddr>,
528 chain_info: CI,
529 cached_archived_segment: Arc<Mutex<Option<CachedArchivedSegment>>>,
530 archived_segment_acknowledgement_senders:
531 Arc<Mutex<ArchivedSegmentHeaderAcknowledgementSenders>>,
532 chain_sync_status: CSS,
533 consensus_constants: ConsensusConstants,
534 max_pieces_in_sector: u16,
535 slot_info_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
536 block_sealing_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
537 archived_segment_header_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
538 shard_membership_connections: Arc<Mutex<ShardMembershipConnections>>,
539 shard_membership_updates_sender: mpsc::Sender<Vec<FarmerShardMembershipInfo>>,
540 erasure_coding: ErasureCoding,
541}
542
543#[async_trait]
544impl<CI, CSS> FarmerRpcApiServer for FarmerRpc<CI, CSS>
545where
546 CI: ChainInfo<OwnedBeaconChainBlock>,
547 CSS: ChainSyncStatus,
548{
549 fn get_farmer_app_info(&self) -> Result<FarmerAppInfo, Error> {
550 let last_segment_index = self
551 .chain_info
552 .max_segment_index()
553 .unwrap_or(SegmentIndex::ZERO);
554
555 let consensus_constants = &self.consensus_constants;
556 let protocol_info = FarmerProtocolInfo {
557 history_size: HistorySize::from(last_segment_index),
558 max_pieces_in_sector: self.max_pieces_in_sector,
559 recent_segments: consensus_constants.recent_segments,
560 recent_history_fraction: consensus_constants.recent_history_fraction,
561 min_sector_lifetime: consensus_constants.min_sector_lifetime,
562 };
563
564 let farmer_app_info = FarmerAppInfo {
565 genesis_root: *self.genesis_block.header.header().root(),
566 dsn_bootstrap_nodes: self.dsn_bootstrap_nodes.clone(),
567 syncing: self.chain_sync_status.is_syncing(),
568 farming_timeout: consensus_constants
569 .slot_duration
570 .as_duration()
571 .mul_f64(consensus_constants.block_authoring_delay.as_u64() as f64),
572 protocol_info,
573 };
574
575 Ok(farmer_app_info)
576 }
577
578 fn submit_solution_response(&self, solution_response: SolutionResponse) -> Result<(), Error> {
579 let slot = solution_response.slot_number;
580 let public_key_hash = solution_response.solution.public_key_hash;
581 let sector_index = solution_response.solution.sector_index;
582 let mut solution_response_senders = self.solution_response_senders.lock();
583
584 let success = solution_response_senders
585 .peek_mut(&slot)
586 .and_then(|sender| sender.try_send(solution_response.solution).ok())
587 .is_some();
588
589 if !success {
590 warn!(
591 %slot,
592 %sector_index,
593 %public_key_hash,
594 "Solution was ignored, likely because farmer was too slow"
595 );
596
597 return Err(Error::SolutionWasIgnored { slot });
598 }
599
600 Ok(())
601 }
602
603 async fn subscribe_slot_info(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
604 let subscription = pending.accept().await?;
605 self.slot_info_subscriptions.lock().push(subscription);
606
607 Ok(())
608 }
609
610 async fn subscribe_block_seal(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
611 let subscription = pending.accept().await?;
612 self.block_sealing_subscriptions.lock().push(subscription);
613
614 Ok(())
615 }
616
617 fn submit_block_seal(&self, block_seal: BlockSealResponse) -> Result<(), Error> {
618 let block_sealing_senders = self.block_sealing_senders.clone();
619
620 let mut block_sealing_senders = block_sealing_senders.lock();
621
622 if block_sealing_senders.current_pre_seal_hash == block_seal.pre_seal_hash
623 && let Some(sender) = block_sealing_senders.senders.pop()
624 {
625 let _ = sender.send(block_seal.seal);
626 }
627
628 Ok(())
629 }
630
631 async fn subscribe_archived_segment_header(
632 &self,
633 pending: PendingSubscriptionSink,
634 ) -> SubscriptionResult {
635 let subscription = pending.accept().await?;
636 self.archived_segment_header_subscriptions
637 .lock()
638 .push(subscription);
639
640 Ok(())
641 }
642
643 async fn acknowledge_archived_segment_header(
644 &self,
645 segment_index: SegmentIndex,
646 ) -> Result<(), Error> {
647 let archived_segment_acknowledgement_senders =
648 self.archived_segment_acknowledgement_senders.clone();
649
650 let maybe_sender = {
651 let mut archived_segment_acknowledgement_senders_guard =
652 archived_segment_acknowledgement_senders.lock();
653
654 (archived_segment_acknowledgement_senders_guard.segment_index == segment_index)
655 .then(|| {
656 let last_key = archived_segment_acknowledgement_senders_guard
657 .senders
658 .keys()
659 .next()
660 .cloned()?;
661
662 archived_segment_acknowledgement_senders_guard
663 .senders
664 .remove(&last_key)
665 })
666 .flatten()
667 };
668
669 if let Some(mut sender) = maybe_sender
670 && let Err(error) = sender.try_send(())
671 && !error.is_disconnected()
672 {
673 warn!(%error, "Failed to acknowledge archived segment");
674 }
675
676 debug!(%segment_index, "Acknowledged archived segment.");
677
678 Ok(())
679 }
680
681 fn piece(&self, requested_piece_index: PieceIndex) -> Result<Option<Piece>, Error> {
684 let archived_segment = {
685 let mut cached_archived_segment = self.cached_archived_segment.lock();
686
687 match cached_archived_segment
688 .as_ref()
689 .and_then(CachedArchivedSegment::get)
690 {
691 Some(archived_segment) => archived_segment,
692 None => {
693 if requested_piece_index > SegmentIndex::ZERO.last_piece_index() {
694 return Ok(None);
695 }
696
697 debug!(%requested_piece_index, "Re-creating the genesis segment on demand");
698
699 let archived_segment = Arc::new(recreate_genesis_segment(
701 &self.genesis_block,
702 self.erasure_coding.clone(),
703 ));
704
705 cached_archived_segment.replace(CachedArchivedSegment::Genesis(Arc::clone(
706 &archived_segment,
707 )));
708
709 archived_segment
710 }
711 }
712 };
713
714 if requested_piece_index.segment_index() == archived_segment.segment_header.segment_index()
715 {
716 return Ok(archived_segment
717 .pieces
718 .pieces()
719 .nth(requested_piece_index.position() as usize));
720 }
721
722 Ok(None)
723 }
724
725 async fn segment_headers(
726 &self,
727 segment_indices: Vec<SegmentIndex>,
728 ) -> Result<Vec<Option<SegmentHeader>>, Error> {
729 if segment_indices.len() > MAX_SEGMENT_HEADERS_PER_REQUEST {
730 error!(
731 "`segment_indices` length exceed the limit: {} ",
732 segment_indices.len()
733 );
734
735 return Err(Error::SegmentHeadersLengthExceeded {
736 actual: segment_indices.len(),
737 });
738 };
739
740 Ok(segment_indices
741 .into_iter()
742 .map(|segment_index| self.chain_info.get_segment_header(segment_index))
743 .collect())
744 }
745
746 async fn last_segment_headers(&self, limit: u32) -> Result<Vec<Option<SegmentHeader>>, Error> {
747 if limit as usize > MAX_SEGMENT_HEADERS_PER_REQUEST {
748 error!(
749 "Request limit ({}) exceed the server limit: {} ",
750 limit, MAX_SEGMENT_HEADERS_PER_REQUEST
751 );
752
753 return Err(Error::SegmentHeadersLengthExceeded {
754 actual: limit as usize,
755 });
756 };
757
758 let last_segment_index = self
759 .chain_info
760 .max_segment_index()
761 .unwrap_or(SegmentIndex::ZERO);
762
763 let mut last_segment_headers = (SegmentIndex::ZERO..=last_segment_index)
764 .rev()
765 .take(limit as usize)
766 .map(|segment_index| self.chain_info.get_segment_header(segment_index))
767 .collect::<Vec<_>>();
768
769 last_segment_headers.reverse();
770
771 Ok(last_segment_headers)
772 }
773
774 async fn update_shard_membership_info(
775 &self,
776 extensions: &Extensions,
777 info: Vec<FarmerShardMembershipInfo>,
778 ) -> Result<(), Error> {
779 let connection_id = extensions
780 .get::<ConnectionId>()
781 .expect("`ConnectionId` is always present; qed");
782
783 let shard_membership = {
784 let mut shard_membership_connections = self.shard_membership_connections.lock();
785
786 shard_membership_connections
789 .connections
790 .retain(|_connection_id, state| {
791 state.last_update.elapsed() >= SHARD_MEMBERSHIP_EXPIRATION
792 });
793
794 shard_membership_connections.connections.insert(
795 *connection_id,
796 ShardMembershipConnectionsState {
797 last_update: Instant::now(),
798 info,
799 },
800 );
801
802 shard_membership_connections
803 .connections
804 .values()
805 .flat_map(|state| state.info.clone())
806 .collect::<Vec<_>>()
807 };
808
809 if let Err(error) = self
810 .shard_membership_updates_sender
811 .clone()
812 .send(shard_membership)
813 .await
814 {
815 warn!(%error, "Failed to send shard membership update");
816 }
817
818 Ok(())
819 }
820}