ab_farmer/node_client/
rpc_node_client.rs1use crate::node_client::{NodeClient, NodeClientExt};
4use ab_core_primitives::pieces::{Piece, PieceIndex};
5use ab_core_primitives::segments::{
6 SegmentHeader, SegmentIndex, SuperSegmentHeader, SuperSegmentIndex,
7};
8use ab_farmer_rpc_primitives::{
9 BlockSealInfo, BlockSealResponse, FarmerAppInfo, FarmerShardMembershipInfo, SlotInfo,
10 SolutionResponse,
11};
12use async_lock::Semaphore;
13use async_trait::async_trait;
14use futures::{Stream, StreamExt};
15use jsonrpsee::core::client::{ClientT, Error as JsonError, SubscriptionClientT};
16use jsonrpsee::rpc_params;
17use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
18use std::pin::Pin;
19use std::sync::Arc;
20
21const MAX_CONCURRENT_PIECE_REQUESTS: usize = 10;
27
28#[derive(Debug, Clone)]
33pub struct RpcNodeClient {
34 client: Arc<WsClient>,
35 piece_request_semaphore: Arc<Semaphore>,
36}
37
38impl RpcNodeClient {
39 pub async fn new(url: &str) -> Result<Self, JsonError> {
41 let client = Arc::new(
42 WsClientBuilder::default()
43 .max_request_size(20 * 1024 * 1024)
44 .build(url)
45 .await?,
46 );
47 let piece_request_semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_PIECE_REQUESTS));
48 Ok(Self {
49 client,
50 piece_request_semaphore,
51 })
52 }
53}
54
55#[async_trait]
56impl NodeClient for RpcNodeClient {
57 async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
58 Ok(self
59 .client
60 .request("getFarmerAppInfo", rpc_params![])
61 .await?)
62 }
63
64 async fn subscribe_slot_info(
65 &self,
66 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
67 let subscription = self
68 .client
69 .subscribe("subscribeSlotInfo", rpc_params![], "unsubscribeSlotInfo")
70 .await?;
71
72 Ok(Box::pin(subscription.filter_map(
73 |slot_info_result| async move { slot_info_result.ok() },
74 )))
75 }
76
77 async fn submit_solution_response(
78 &self,
79 solution_response: SolutionResponse,
80 ) -> anyhow::Result<()> {
81 Ok(self
82 .client
83 .request("submitSolutionResponse", rpc_params![&solution_response])
84 .await?)
85 }
86
87 async fn subscribe_block_sealing(
88 &self,
89 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = BlockSealInfo> + Send + 'static>>> {
90 let subscription = self
91 .client
92 .subscribe(
93 "subscribeBlockSealing",
94 rpc_params![],
95 "unsubscribeBlockSealing",
96 )
97 .await?;
98
99 Ok(Box::pin(subscription.filter_map(
100 |block_sealing_info_result| async move { block_sealing_info_result.ok() },
101 )))
102 }
103
104 async fn submit_block_seal(&self, block_seal: BlockSealResponse) -> anyhow::Result<()> {
106 Ok(self
107 .client
108 .request("submitBlockSeal", rpc_params![&block_seal])
109 .await?)
110 }
111
112 async fn subscribe_archived_segment_headers(
113 &self,
114 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> {
115 let subscription = self
116 .client
117 .subscribe(
118 "subscribeArchivedSegmentHeader",
119 rpc_params![],
120 "unsubscribeArchivedSegmentHeader",
121 )
122 .await?;
123
124 Ok(Box::pin(subscription.filter_map(
125 |archived_segment_header_result| async move { archived_segment_header_result.ok() },
126 )))
127 }
128
129 async fn super_segment_headers(
130 &self,
131 super_segment_indices: Vec<SuperSegmentIndex>,
132 ) -> anyhow::Result<Vec<Option<SuperSegmentHeader>>> {
133 Ok(self
134 .client
135 .request("superSegmentHeaders", rpc_params![&super_segment_indices])
136 .await?)
137 }
138
139 async fn segment_headers(
140 &self,
141 segment_indices: Vec<SegmentIndex>,
142 ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
143 Ok(self
144 .client
145 .request("segmentHeaders", rpc_params![&segment_indices])
146 .await?)
147 }
148
149 async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
150 let _permit = self.piece_request_semaphore.acquire().await;
151 let client = Arc::clone(&self.client);
152 let piece_fut =
155 tokio::task::spawn(
156 async move { client.request("piece", rpc_params![&piece_index]).await },
157 );
158 Ok(piece_fut.await??)
159 }
160
161 async fn acknowledge_archived_segment_header(
162 &self,
163 segment_index: SegmentIndex,
164 ) -> anyhow::Result<()> {
165 Ok(self
166 .client
167 .request(
168 "acknowledgeArchivedSegmentHeader",
169 rpc_params![&segment_index],
170 )
171 .await?)
172 }
173
174 async fn update_shard_membership_info(
175 &self,
176 info: FarmerShardMembershipInfo,
177 ) -> anyhow::Result<()> {
178 Ok(self
179 .client
180 .request("updateShardMembershipInfo", rpc_params![&info])
181 .await?)
182 }
183}
184
185#[async_trait]
186impl NodeClientExt for RpcNodeClient {
187 async fn cached_segment_headers(
188 &self,
189 segment_indices: Vec<SegmentIndex>,
190 ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
191 self.segment_headers(segment_indices).await
192 }
193
194 async fn last_segment_headers(&self, limit: u32) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
195 Ok(self
196 .client
197 .request("lastSegmentHeaders", rpc_params![limit])
198 .await?)
199 }
200}