1#![feature(try_blocks)]
4
5use ab_archiving::archiver::NewArchivedSegment;
6use ab_client_api::{BeaconChainInfo, ChainSyncStatus};
7use ab_client_archiving::segment::{ArchivedSegmentNotification, recreate_genesis_segment};
8use ab_client_block_authoring::slot_worker::{
9 BlockSealNotification, NewSlotInfo, NewSlotNotification,
10};
11use ab_client_consensus_common::ConsensusConstants;
12use ab_core_primitives::block::header::OwnedBlockHeaderSeal;
13use ab_core_primitives::block::owned::OwnedBeaconChainBlock;
14use ab_core_primitives::hashes::Blake3Hash;
15use ab_core_primitives::pieces::{Piece, PieceIndex};
16use ab_core_primitives::pot::SlotNumber;
17use ab_core_primitives::segments::{
18 HistorySize, LocalSegmentIndex, SegmentHeader, SegmentIndex, SuperSegmentHeader,
19 SuperSegmentIndex,
20};
21use ab_core_primitives::solutions::Solution;
22use ab_erasure_coding::ErasureCoding;
23use ab_farmer_components::FarmerProtocolInfo;
24use ab_farmer_rpc_primitives::{
25 BlockSealInfo, BlockSealResponse, FarmerAppInfo, FarmerShardMembershipInfo,
26 MAX_SEGMENT_HEADERS_PER_REQUEST, MAX_SUPER_SEGMENT_HEADERS_PER_REQUEST,
27 SHARD_MEMBERSHIP_EXPIRATION, SlotInfo, SolutionResponse,
28};
29use ab_networking::libp2p::Multiaddr;
30use futures::channel::{mpsc, oneshot};
31use futures::{FutureExt, SinkExt, StreamExt, select};
32use jsonrpsee::core::{SubscriptionResult, async_trait};
33use jsonrpsee::proc_macros::rpc;
34use jsonrpsee::server::{Server, ServerConfig};
35use jsonrpsee::types::{ErrorObject, ErrorObjectOwned, SubscriptionId};
36use jsonrpsee::{
37 ConnectionId, Extensions, PendingSubscriptionSink, SubscriptionSink, TrySendError,
38};
39use parking_lot::Mutex;
40use replace_with::replace_with_or_abort;
41use schnellru::{ByLength, LruMap};
42use std::collections::HashMap;
43use std::collections::hash_map::Entry;
44use std::io;
45use std::net::SocketAddr;
46use std::sync::{Arc, Weak};
47use std::time::Instant;
48use tracing::{debug, error, info, warn};
49
50#[derive(Debug, thiserror::Error)]
52pub enum Error {
53 #[error("Solution was ignored for slot {slot}")]
55 SolutionWasIgnored {
56 slot: SlotNumber,
58 },
59 #[error(
61 "Segment headers length exceeded the limit: {actual}/{MAX_SEGMENT_HEADERS_PER_REQUEST}"
62 )]
63 SegmentHeadersLengthExceeded {
64 actual: usize,
66 },
67}
68
69impl From<Error> for ErrorObjectOwned {
70 fn from(error: Error) -> Self {
71 let code = match &error {
72 Error::SolutionWasIgnored { .. } => 0,
73 Error::SegmentHeadersLengthExceeded { .. } => 1,
74 };
75
76 ErrorObject::owned(code, error.to_string(), None::<()>)
77 }
78}
79
80#[rpc(server)]
82pub trait FarmerRpcApi {
83 #[method(name = "getFarmerAppInfo")]
85 fn get_farmer_app_info(&self) -> Result<FarmerAppInfo, Error>;
86
87 #[method(name = "submitSolutionResponse")]
88 fn submit_solution_response(&self, solution_response: SolutionResponse) -> Result<(), Error>;
89
90 #[subscription(
92 name = "subscribeSlotInfo" => "slot_info",
93 unsubscribe = "unsubscribeSlotInfo",
94 item = SlotInfo,
95 )]
96 async fn subscribe_slot_info(&self) -> SubscriptionResult;
97
98 #[subscription(
100 name = "subscribeBlockSealing" => "block_seal",
101 unsubscribe = "unsubscribeBlockSealing",
102 item = BlockSealInfo,
103 )]
104 async fn subscribe_block_seal(&self) -> SubscriptionResult;
105
106 #[method(name = "submitBlockSeal")]
107 fn submit_block_seal(&self, block_seal: BlockSealResponse) -> Result<(), Error>;
108
109 #[subscription(
111 name = "subscribeArchivedSegmentHeader" => "archived_segment_header",
112 unsubscribe = "unsubscribeArchivedSegmentHeader",
113 item = SegmentHeader,
114 )]
115 async fn subscribe_archived_segment_header(&self) -> SubscriptionResult;
116
117 #[method(name = "superSegmentHeaders")]
118 async fn super_segment_headers(
119 &self,
120 super_segment_indices: Vec<SuperSegmentIndex>,
121 ) -> Result<Vec<Option<SuperSegmentHeader>>, Error>;
122
123 #[method(name = "segmentHeaders")]
124 async fn segment_headers(
125 &self,
126 segment_indices: Vec<SegmentIndex>,
127 ) -> Result<Vec<Option<SegmentHeader>>, Error>;
128
129 #[method(name = "piece", blocking)]
130 fn piece(&self, piece_index: PieceIndex) -> Result<Option<Piece>, Error>;
131
132 #[method(name = "acknowledgeArchivedSegmentHeader")]
133 async fn acknowledge_archived_segment_header(
134 &self,
135 segment_index: SegmentIndex,
136 ) -> Result<(), Error>;
137
138 #[method(name = "lastSegmentHeaders")]
139 async fn last_segment_headers(&self, limit: u32) -> Result<Vec<Option<SegmentHeader>>, Error>;
140
141 #[method(name = "updateShardMembershipInfo", with_extensions)]
142 async fn update_shard_membership_info(
143 &self,
144 info: Vec<FarmerShardMembershipInfo>,
145 ) -> Result<(), Error>;
146}
147
148#[derive(Debug, Default)]
149struct ArchivedSegmentHeaderAcknowledgementSenders {
150 segment_index: SegmentIndex,
151 senders: HashMap<SubscriptionId<'static>, mpsc::Sender<()>>,
152}
153
154#[derive(Debug, Default)]
155struct BlockSignatureSenders {
156 current_pre_seal_hash: Blake3Hash,
157 senders: Vec<oneshot::Sender<OwnedBlockHeaderSeal>>,
158}
159
160#[derive(Debug)]
166enum CachedArchivedSegment {
167 Genesis(Arc<NewArchivedSegment>),
169 Weak(Weak<NewArchivedSegment>),
170}
171
172impl CachedArchivedSegment {
173 fn get(&self) -> Option<Arc<NewArchivedSegment>> {
174 match self {
175 CachedArchivedSegment::Genesis(archived_segment) => Some(Arc::clone(archived_segment)),
176 CachedArchivedSegment::Weak(weak_archived_segment) => weak_archived_segment.upgrade(),
177 }
178 }
179}
180
181#[derive(Debug)]
182struct ShardMembershipConnectionsState {
183 last_update: Instant,
184 info: Vec<FarmerShardMembershipInfo>,
185}
186
187#[derive(Debug, Default)]
188struct ShardMembershipConnections {
189 connections: HashMap<ConnectionId, ShardMembershipConnectionsState>,
190}
191
192#[derive(Debug)]
194pub struct FarmerRpcConfig<BCI, CSS> {
195 pub listen_on: SocketAddr,
197 pub genesis_block: OwnedBeaconChainBlock,
199 pub consensus_constants: ConsensusConstants,
201 pub max_pieces_in_sector: u16,
203 pub new_slot_notification_receiver: mpsc::Receiver<NewSlotNotification>,
205 pub block_sealing_notification_receiver: mpsc::Receiver<BlockSealNotification>,
207 pub archived_segment_notification_receiver: mpsc::Receiver<ArchivedSegmentNotification>,
209 pub shard_membership_updates_sender: mpsc::Sender<Vec<FarmerShardMembershipInfo>>,
211 pub dsn_bootstrap_nodes: Vec<Multiaddr>,
213 pub beacon_chain_info: BCI,
215 pub chain_sync_status: CSS,
217 pub erasure_coding: ErasureCoding,
219}
220
221#[derive(Debug)]
223pub struct FarmerRpcWorker<BCI, CSS>
224where
225 BCI: BeaconChainInfo,
226 CSS: ChainSyncStatus,
227{
228 server: Option<Server>,
229 rpc: Option<FarmerRpc<BCI, CSS>>,
230 new_slot_notification_receiver: mpsc::Receiver<NewSlotNotification>,
231 block_sealing_notification_receiver: mpsc::Receiver<BlockSealNotification>,
232 archived_segment_notification_receiver: mpsc::Receiver<ArchivedSegmentNotification>,
233 solution_response_senders: Arc<Mutex<LruMap<SlotNumber, mpsc::Sender<Solution>>>>,
234 block_sealing_senders: Arc<Mutex<BlockSignatureSenders>>,
235 cached_archived_segment: Arc<Mutex<Option<CachedArchivedSegment>>>,
236 archived_segment_acknowledgement_senders:
237 Arc<Mutex<ArchivedSegmentHeaderAcknowledgementSenders>>,
238 slot_info_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
239 block_sealing_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
240 archived_segment_header_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
241}
242
243impl<BCI, CSS> FarmerRpcWorker<BCI, CSS>
244where
245 BCI: BeaconChainInfo,
246 CSS: ChainSyncStatus,
247{
248 pub async fn new(config: FarmerRpcConfig<BCI, CSS>) -> io::Result<Self> {
250 let server = Server::builder()
251 .set_config(ServerConfig::builder().ws_only().build())
252 .build(config.listen_on)
253 .await?;
254
255 let address = server.local_addr()?;
256 info!(%address, "Started farmer RPC server");
257
258 let block_authoring_delay = u64::from(config.consensus_constants.block_authoring_delay);
259 let block_authoring_delay = usize::try_from(block_authoring_delay)
260 .expect("Block authoring delay will never exceed usize on any platform; qed");
261 let solution_response_senders_capacity = u32::try_from(block_authoring_delay)
262 .expect("Always a tiny constant in the protocol; qed");
263
264 let slot_info_subscriptions = Arc::default();
265 let block_sealing_subscriptions = Arc::default();
266
267 let solution_response_senders = Arc::new(Mutex::new(LruMap::new(ByLength::new(
268 solution_response_senders_capacity,
269 ))));
270 let block_sealing_senders = Arc::default();
271 let cached_archived_segment = Arc::default();
272 let archived_segment_header_subscriptions = Arc::default();
273
274 let rpc = FarmerRpc {
275 genesis_block: config.genesis_block,
276 solution_response_senders: Arc::clone(&solution_response_senders),
277 block_sealing_senders: Arc::clone(&block_sealing_senders),
278 dsn_bootstrap_nodes: config.dsn_bootstrap_nodes,
279 beacon_chain_info: config.beacon_chain_info,
280 cached_archived_segment: Arc::clone(&cached_archived_segment),
281 archived_segment_acknowledgement_senders: Arc::default(),
282 chain_sync_status: config.chain_sync_status,
283 consensus_constants: config.consensus_constants,
284 max_pieces_in_sector: config.max_pieces_in_sector,
285 slot_info_subscriptions: Arc::clone(&slot_info_subscriptions),
286 block_sealing_subscriptions: Arc::clone(&block_sealing_subscriptions),
287 archived_segment_header_subscriptions: Arc::clone(
288 &archived_segment_header_subscriptions,
289 ),
290 shard_membership_connections: Arc::default(),
291 shard_membership_updates_sender: config.shard_membership_updates_sender,
292 erasure_coding: config.erasure_coding,
293 };
294
295 Ok(Self {
296 server: Some(server),
297 rpc: Some(rpc),
298 new_slot_notification_receiver: config.new_slot_notification_receiver,
299 block_sealing_notification_receiver: config.block_sealing_notification_receiver,
300 archived_segment_notification_receiver: config.archived_segment_notification_receiver,
301 solution_response_senders,
302 block_sealing_senders,
303 cached_archived_segment,
304 archived_segment_acknowledgement_senders: Arc::new(Default::default()),
305 slot_info_subscriptions,
306 block_sealing_subscriptions,
307 archived_segment_header_subscriptions,
308 })
309 }
310
311 pub async fn run(mut self) {
313 let server = self.server.take().expect("Called only once from here; qed");
314 let rpc = self.rpc.take().expect("Called only once from here; qed");
315 let mut server_fut = server.start(rpc.into_rpc()).stopped().boxed().fuse();
316
317 loop {
318 select! {
319 _ = server_fut => {}
320 maybe_new_slot_notification = self.new_slot_notification_receiver.next() => {
321 let Some(new_slot_notification) = maybe_new_slot_notification else {
322 break;
323 };
324
325 self.handle_new_slot_notification(new_slot_notification).await;
326 }
327 maybe_block_sealing_notification = self.block_sealing_notification_receiver.next() => {
328 let Some(block_sealing_notification) = maybe_block_sealing_notification else {
329 break;
330 };
331
332 self.handle_block_sealing_notification(block_sealing_notification).await;
333 }
334 maybe_archived_segment_notification = self.archived_segment_notification_receiver.next() => {
335 let Some(archived_segment_notification) = maybe_archived_segment_notification else {
336 break;
337 };
338
339 self.handle_archived_segment_notification(archived_segment_notification).await;
340 }
341 }
342 }
343 }
344
345 async fn handle_new_slot_notification(&mut self, new_slot_notification: NewSlotNotification) {
346 let NewSlotNotification {
347 new_slot_info,
348 solution_sender,
349 } = new_slot_notification;
350
351 let NewSlotInfo {
352 slot,
353 proof_of_time,
354 solution_range,
355 shard_membership_entropy,
356 num_shards,
357 } = new_slot_info;
358
359 let mut solution_response_senders = self.solution_response_senders.lock();
362 if solution_response_senders.peek(&slot).is_none() {
363 solution_response_senders.insert(slot, solution_sender);
364 }
365
366 let global_challenge = proof_of_time.derive_global_challenge(slot);
367
368 let slot_info = SlotInfo {
370 slot,
371 global_challenge,
372 solution_range: solution_range.to_leaf_shard(num_shards),
373 shard_membership_entropy,
374 num_shards,
375 };
376 let slot_info = serde_json::value::to_raw_value(&slot_info)
377 .expect("Serialization of slot info never fails; qed");
378
379 self.slot_info_subscriptions.lock().retain_mut(|sink| {
380 match sink.try_send(slot_info.clone()) {
381 Ok(()) => true,
382 Err(error) => match error {
383 TrySendError::Closed(_) => {
384 false
386 }
387 TrySendError::Full(_) => {
388 warn!(
389 subscription_id = ?sink.subscription_id(),
390 "Slot info receiver is too slow, dropping notification"
391 );
392 true
393 }
394 },
395 }
396 });
397 }
398
399 async fn handle_block_sealing_notification(
400 &mut self,
401 block_sealing_notification: BlockSealNotification,
402 ) {
403 let BlockSealNotification {
404 pre_seal_hash,
405 public_key_hash,
406 seal_sender,
407 } = block_sealing_notification;
408
409 {
411 let mut block_sealing_senders = self.block_sealing_senders.lock();
412
413 if block_sealing_senders.current_pre_seal_hash != pre_seal_hash {
414 block_sealing_senders.current_pre_seal_hash = pre_seal_hash;
415 block_sealing_senders.senders.clear();
416 }
417
418 block_sealing_senders.senders.push(seal_sender);
419 }
420
421 let block_seal_info = BlockSealInfo {
423 pre_seal_hash,
424 public_key_hash,
425 };
426 let block_seal_info = serde_json::value::to_raw_value(&block_seal_info)
427 .expect("Serialization of block seal info never fails; qed");
428
429 self.block_sealing_subscriptions.lock().retain_mut(|sink| {
430 match sink.try_send(block_seal_info.clone()) {
431 Ok(()) => true,
432 Err(error) => match error {
433 TrySendError::Closed(_) => {
434 false
436 }
437 TrySendError::Full(_) => {
438 warn!(
439 subscription_id = ?sink.subscription_id(),
440 "Block seal info receiver is too slow, dropping notification"
441 );
442 true
443 }
444 },
445 }
446 });
447 }
448
449 async fn handle_archived_segment_notification(
450 &mut self,
451 archived_segment_notification: ArchivedSegmentNotification,
452 ) {
453 let ArchivedSegmentNotification {
454 mut archived_segment,
455 acknowledgement_sender,
456 } = archived_segment_notification;
457
458 let segment_index =
459 SegmentIndex::from(archived_segment.segment_header.local_segment_index());
460
461 self.archived_segment_header_subscriptions
465 .lock()
466 .retain_mut(|sink| {
467 let subscription_id = sink.subscription_id();
468
469 let mut archived_segment_acknowledgement_senders =
472 self.archived_segment_acknowledgement_senders.lock();
473
474 if archived_segment_acknowledgement_senders.segment_index != segment_index {
475 archived_segment_acknowledgement_senders.segment_index = segment_index;
476 archived_segment_acknowledgement_senders.senders.clear();
477 }
478
479 {
483 let archived_segment = Arc::make_mut(&mut archived_segment);
484
485 for piece in archived_segment.pieces.iter_mut() {
486 piece.header.super_segment_index = SuperSegmentIndex::from(u64::from(
487 archived_segment.segment_header.segment_index.as_inner(),
488 ))
489 .into();
490 }
492
493 replace_with_or_abort(&mut archived_segment.pieces, |pieces| {
494 pieces.to_shared()
495 });
496 }
497
498 self.cached_archived_segment
499 .lock()
500 .replace(CachedArchivedSegment::Weak(Arc::downgrade(
501 &archived_segment,
502 )));
503
504 let maybe_archived_segment_header = match archived_segment_acknowledgement_senders
505 .senders
506 .entry(subscription_id.clone())
507 {
508 Entry::Occupied(_) => {
509 None
511 }
512 Entry::Vacant(entry) => {
513 entry.insert(acknowledgement_sender.clone());
514
515 Some(archived_segment.segment_header)
517 }
518 };
519
520 let maybe_archived_segment_header =
522 serde_json::value::to_raw_value(&maybe_archived_segment_header)
523 .expect("Serialization of archived segment info never fails; qed");
524
525 match sink.try_send(maybe_archived_segment_header) {
526 Ok(()) => true,
527 Err(error) => match error {
528 TrySendError::Closed(_) => {
529 archived_segment_acknowledgement_senders
531 .senders
532 .remove(&subscription_id);
533 false
534 }
535 TrySendError::Full(_) => {
536 warn!(
537 ?subscription_id,
538 "Block seal info receiver is too slow, dropping notification"
539 );
540 true
541 }
542 },
543 }
544 });
545 }
546}
547
548#[derive(Debug)]
550struct FarmerRpc<BCI, CSS>
551where
552 BCI: BeaconChainInfo,
553 CSS: ChainSyncStatus,
554{
555 genesis_block: OwnedBeaconChainBlock,
556 solution_response_senders: Arc<Mutex<LruMap<SlotNumber, mpsc::Sender<Solution>>>>,
557 block_sealing_senders: Arc<Mutex<BlockSignatureSenders>>,
558 dsn_bootstrap_nodes: Vec<Multiaddr>,
559 beacon_chain_info: BCI,
560 cached_archived_segment: Arc<Mutex<Option<CachedArchivedSegment>>>,
561 archived_segment_acknowledgement_senders:
562 Arc<Mutex<ArchivedSegmentHeaderAcknowledgementSenders>>,
563 chain_sync_status: CSS,
564 consensus_constants: ConsensusConstants,
565 max_pieces_in_sector: u16,
566 slot_info_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
567 block_sealing_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
568 archived_segment_header_subscriptions: Arc<Mutex<Vec<SubscriptionSink>>>,
569 shard_membership_connections: Arc<Mutex<ShardMembershipConnections>>,
570 shard_membership_updates_sender: mpsc::Sender<Vec<FarmerShardMembershipInfo>>,
571 erasure_coding: ErasureCoding,
572}
573
574#[async_trait]
575impl<BCI, CSS> FarmerRpcApiServer for FarmerRpc<BCI, CSS>
576where
577 BCI: BeaconChainInfo,
578 CSS: ChainSyncStatus,
579{
580 fn get_farmer_app_info(&self) -> Result<FarmerAppInfo, Error> {
581 let last_segment_index = self
582 .beacon_chain_info
583 .last_segment_header()
584 .map(|segment_header| segment_header.segment_index.as_inner())
585 .unwrap_or(LocalSegmentIndex::ZERO);
586
587 let consensus_constants = &self.consensus_constants;
588 let protocol_info = FarmerProtocolInfo {
589 history_size: HistorySize::from(SegmentIndex::from(last_segment_index)),
590 max_pieces_in_sector: self.max_pieces_in_sector,
591 recent_segments: consensus_constants.recent_segments,
592 recent_history_fraction: consensus_constants.recent_history_fraction,
593 min_sector_lifetime: consensus_constants.min_sector_lifetime,
594 };
595
596 let farmer_app_info = FarmerAppInfo {
597 genesis_root: *self.genesis_block.header.header().root(),
598 dsn_bootstrap_nodes: self.dsn_bootstrap_nodes.clone(),
599 syncing: self.chain_sync_status.is_syncing(),
600 farming_timeout: consensus_constants
601 .slot_duration
602 .as_duration()
603 .mul_f64(u64::from(consensus_constants.block_authoring_delay) as f64),
604 protocol_info,
605 };
606
607 Ok(farmer_app_info)
608 }
609
610 fn submit_solution_response(&self, solution_response: SolutionResponse) -> Result<(), Error> {
611 let slot = solution_response.slot_number;
612 let public_key_hash = solution_response.solution.public_key_hash;
613 let sector_index = solution_response.solution.sector_index;
614 let mut solution_response_senders = self.solution_response_senders.lock();
615
616 let success = solution_response_senders
617 .peek_mut(&slot)
618 .and_then(|sender| sender.try_send(solution_response.solution).ok())
619 .is_some();
620
621 if !success {
622 warn!(
623 %slot,
624 %sector_index,
625 %public_key_hash,
626 "Solution was ignored, likely because farmer was too slow"
627 );
628
629 return Err(Error::SolutionWasIgnored { slot });
630 }
631
632 Ok(())
633 }
634
635 async fn subscribe_slot_info(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
636 let subscription = pending.accept().await?;
637 self.slot_info_subscriptions.lock().push(subscription);
638
639 Ok(())
640 }
641
642 async fn subscribe_block_seal(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
643 let subscription = pending.accept().await?;
644 self.block_sealing_subscriptions.lock().push(subscription);
645
646 Ok(())
647 }
648
649 fn submit_block_seal(&self, block_seal: BlockSealResponse) -> Result<(), Error> {
650 let block_sealing_senders = self.block_sealing_senders.clone();
651
652 let mut block_sealing_senders = block_sealing_senders.lock();
653
654 if block_sealing_senders.current_pre_seal_hash == block_seal.pre_seal_hash
655 && let Some(sender) = block_sealing_senders.senders.pop()
656 {
657 let _ = sender.send(block_seal.seal);
658 }
659
660 Ok(())
661 }
662
663 async fn subscribe_archived_segment_header(
664 &self,
665 pending: PendingSubscriptionSink,
666 ) -> SubscriptionResult {
667 let subscription = pending.accept().await?;
668 self.archived_segment_header_subscriptions
669 .lock()
670 .push(subscription);
671
672 Ok(())
673 }
674
675 async fn acknowledge_archived_segment_header(
676 &self,
677 segment_index: SegmentIndex,
678 ) -> Result<(), Error> {
679 let archived_segment_acknowledgement_senders =
680 self.archived_segment_acknowledgement_senders.clone();
681
682 let maybe_sender = {
683 let mut archived_segment_acknowledgement_senders_guard =
684 archived_segment_acknowledgement_senders.lock();
685
686 (archived_segment_acknowledgement_senders_guard.segment_index == segment_index)
687 .then(|| {
688 let last_key = archived_segment_acknowledgement_senders_guard
689 .senders
690 .keys()
691 .next()
692 .cloned()?;
693
694 archived_segment_acknowledgement_senders_guard
695 .senders
696 .remove(&last_key)
697 })
698 .flatten()
699 };
700
701 if let Some(mut sender) = maybe_sender
702 && let Err(error) = sender.try_send(())
703 && !error.is_disconnected()
704 {
705 warn!(%error, "Failed to acknowledge archived segment");
706 }
707
708 debug!(%segment_index, "Acknowledged archived segment.");
709
710 Ok(())
711 }
712
713 fn piece(&self, requested_piece_index: PieceIndex) -> Result<Option<Piece>, Error> {
716 let archived_segment = {
717 let mut cached_archived_segment = self.cached_archived_segment.lock();
718
719 match cached_archived_segment
720 .as_ref()
721 .and_then(CachedArchivedSegment::get)
722 {
723 Some(archived_segment) => archived_segment,
724 None => {
725 if requested_piece_index > SegmentIndex::ZERO.last_piece_index() {
726 return Ok(None);
727 }
728
729 debug!(%requested_piece_index, "Re-creating the genesis segment on demand");
730
731 let archived_segment = Arc::new(recreate_genesis_segment(
733 &self.genesis_block,
734 self.erasure_coding.clone(),
735 ));
736
737 cached_archived_segment.replace(CachedArchivedSegment::Genesis(Arc::clone(
738 &archived_segment,
739 )));
740
741 archived_segment
742 }
743 }
744 };
745
746 if requested_piece_index.segment_index()
747 == SegmentIndex::from(archived_segment.segment_header.local_segment_index())
748 {
749 return Ok(archived_segment
750 .pieces
751 .pieces()
752 .nth(usize::from(requested_piece_index.position())));
753 }
754
755 Ok(None)
756 }
757
758 async fn super_segment_headers(
759 &self,
760 super_segment_indices: Vec<SuperSegmentIndex>,
761 ) -> Result<Vec<Option<SuperSegmentHeader>>, Error> {
762 if super_segment_indices.len() > MAX_SUPER_SEGMENT_HEADERS_PER_REQUEST {
763 error!(
764 "`super_segment_indices` length exceed the limit: {} ",
765 super_segment_indices.len()
766 );
767
768 return Err(Error::SegmentHeadersLengthExceeded {
769 actual: super_segment_indices.len(),
770 });
771 };
772
773 Ok(super_segment_indices
774 .into_iter()
775 .map(|super_segment_index| {
776 self.beacon_chain_info
777 .get_super_segment_header(super_segment_index)
778 })
779 .collect())
780 }
781
782 async fn segment_headers(
783 &self,
784 segment_indices: Vec<SegmentIndex>,
785 ) -> Result<Vec<Option<SegmentHeader>>, Error> {
786 if segment_indices.len() > MAX_SEGMENT_HEADERS_PER_REQUEST {
787 error!(
788 "`segment_indices` length exceed the limit: {} ",
789 segment_indices.len()
790 );
791
792 return Err(Error::SegmentHeadersLengthExceeded {
793 actual: segment_indices.len(),
794 });
795 };
796
797 Ok(segment_indices
798 .into_iter()
799 .map(|segment_index| {
800 self.beacon_chain_info
801 .get_segment_header(segment_index.into())
802 })
803 .collect())
804 }
805
806 async fn last_segment_headers(&self, limit: u32) -> Result<Vec<Option<SegmentHeader>>, Error> {
807 if limit as usize > MAX_SEGMENT_HEADERS_PER_REQUEST {
808 error!(
809 "Request limit ({}) exceed the server limit: {} ",
810 limit, MAX_SEGMENT_HEADERS_PER_REQUEST
811 );
812
813 return Err(Error::SegmentHeadersLengthExceeded {
814 actual: limit as usize,
815 });
816 };
817
818 let last_segment_index = self
819 .beacon_chain_info
820 .last_segment_header()
821 .map(|segment_header| segment_header.segment_index.as_inner())
822 .unwrap_or(LocalSegmentIndex::ZERO);
823
824 let mut last_segment_headers = (LocalSegmentIndex::ZERO..=last_segment_index)
825 .rev()
826 .take(limit as usize)
827 .map(|segment_index| self.beacon_chain_info.get_segment_header(segment_index))
828 .collect::<Vec<_>>();
829
830 last_segment_headers.reverse();
831
832 Ok(last_segment_headers)
833 }
834
835 async fn update_shard_membership_info(
836 &self,
837 extensions: &Extensions,
838 info: Vec<FarmerShardMembershipInfo>,
839 ) -> Result<(), Error> {
840 let connection_id = extensions
841 .get::<ConnectionId>()
842 .expect("`ConnectionId` is always present; qed");
843
844 let shard_membership = {
845 let mut shard_membership_connections = self.shard_membership_connections.lock();
846
847 shard_membership_connections
850 .connections
851 .retain(|_connection_id, state| {
852 state.last_update.elapsed() >= SHARD_MEMBERSHIP_EXPIRATION
853 });
854
855 shard_membership_connections.connections.insert(
856 *connection_id,
857 ShardMembershipConnectionsState {
858 last_update: Instant::now(),
859 info,
860 },
861 );
862
863 shard_membership_connections
864 .connections
865 .values()
866 .flat_map(|state| state.info.clone())
867 .collect::<Vec<_>>()
868 };
869
870 if let Err(error) = self
871 .shard_membership_updates_sender
872 .clone()
873 .send(shard_membership)
874 .await
875 {
876 warn!(%error, "Failed to send shard membership update");
877 }
878
879 Ok(())
880 }
881}