ab_farmer/node_client/
rpc_node_client.rs1use crate::node_client::{NodeClient, NodeClientExt};
4use ab_core_primitives::pieces::{Piece, PieceIndex};
5use ab_core_primitives::segments::{SegmentHeader, SegmentIndex};
6use ab_farmer_rpc_primitives::{
7 BlockSealInfo, BlockSealResponse, FarmerAppInfo, FarmerShardMembershipInfo, SlotInfo,
8 SolutionResponse,
9};
10use async_lock::Semaphore;
11use async_trait::async_trait;
12use futures::{Stream, StreamExt};
13use jsonrpsee::core::client::{ClientT, Error as JsonError, SubscriptionClientT};
14use jsonrpsee::rpc_params;
15use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
16use std::pin::Pin;
17use std::sync::Arc;
18
19const MAX_CONCURRENT_PIECE_REQUESTS: usize = 10;
25
26#[derive(Debug, Clone)]
31pub struct RpcNodeClient {
32 client: Arc<WsClient>,
33 piece_request_semaphore: Arc<Semaphore>,
34}
35
36impl RpcNodeClient {
37 pub async fn new(url: &str) -> Result<Self, JsonError> {
39 let client = Arc::new(
40 WsClientBuilder::default()
41 .max_request_size(20 * 1024 * 1024)
42 .build(url)
43 .await?,
44 );
45 let piece_request_semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_PIECE_REQUESTS));
46 Ok(Self {
47 client,
48 piece_request_semaphore,
49 })
50 }
51}
52
53#[async_trait]
54impl NodeClient for RpcNodeClient {
55 async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
56 Ok(self
57 .client
58 .request("getFarmerAppInfo", rpc_params![])
59 .await?)
60 }
61
62 async fn subscribe_slot_info(
63 &self,
64 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
65 let subscription = self
66 .client
67 .subscribe("subscribeSlotInfo", rpc_params![], "unsubscribeSlotInfo")
68 .await?;
69
70 Ok(Box::pin(subscription.filter_map(
71 |slot_info_result| async move { slot_info_result.ok() },
72 )))
73 }
74
75 async fn submit_solution_response(
76 &self,
77 solution_response: SolutionResponse,
78 ) -> anyhow::Result<()> {
79 Ok(self
80 .client
81 .request("submitSolutionResponse", rpc_params![&solution_response])
82 .await?)
83 }
84
85 async fn subscribe_block_sealing(
86 &self,
87 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = BlockSealInfo> + Send + 'static>>> {
88 let subscription = self
89 .client
90 .subscribe(
91 "subscribeBlockSealing",
92 rpc_params![],
93 "unsubscribeBlockSealing",
94 )
95 .await?;
96
97 Ok(Box::pin(subscription.filter_map(
98 |block_sealing_info_result| async move { block_sealing_info_result.ok() },
99 )))
100 }
101
102 async fn submit_block_seal(&self, block_seal: BlockSealResponse) -> anyhow::Result<()> {
104 Ok(self
105 .client
106 .request("submitBlockSeal", rpc_params![&block_seal])
107 .await?)
108 }
109
110 async fn subscribe_archived_segment_headers(
111 &self,
112 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> {
113 let subscription = self
114 .client
115 .subscribe(
116 "subscribeArchivedSegmentHeader",
117 rpc_params![],
118 "unsubscribeArchivedSegmentHeader",
119 )
120 .await?;
121
122 Ok(Box::pin(subscription.filter_map(
123 |archived_segment_header_result| async move { archived_segment_header_result.ok() },
124 )))
125 }
126
127 async fn segment_headers(
128 &self,
129 segment_indices: Vec<SegmentIndex>,
130 ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
131 Ok(self
132 .client
133 .request("segmentHeaders", rpc_params![&segment_indices])
134 .await?)
135 }
136
137 async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
138 let _permit = self.piece_request_semaphore.acquire().await;
139 let client = Arc::clone(&self.client);
140 let piece_fut =
143 tokio::task::spawn(
144 async move { client.request("piece", rpc_params![&piece_index]).await },
145 );
146 Ok(piece_fut.await??)
147 }
148
149 async fn acknowledge_archived_segment_header(
150 &self,
151 segment_index: SegmentIndex,
152 ) -> anyhow::Result<()> {
153 Ok(self
154 .client
155 .request(
156 "acknowledgeArchivedSegmentHeader",
157 rpc_params![&segment_index],
158 )
159 .await?)
160 }
161
162 async fn update_shard_membership_info(
163 &self,
164 info: FarmerShardMembershipInfo,
165 ) -> anyhow::Result<()> {
166 Ok(self
167 .client
168 .request("updateShardMembershipInfo", rpc_params![&info])
169 .await?)
170 }
171}
172
173#[async_trait]
174impl NodeClientExt for RpcNodeClient {
175 async fn cached_segment_headers(
176 &self,
177 segment_indices: Vec<SegmentIndex>,
178 ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
179 self.segment_headers(segment_indices).await
180 }
181
182 async fn last_segment_headers(&self, limit: u32) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
183 Ok(self
184 .client
185 .request("lastSegmentHeaders", rpc_params![limit])
186 .await?)
187 }
188}