ab_farmer/node_client/
rpc_node_client.rs1use crate::node_client::{NodeClient, NodeClientExt};
4use ab_core_primitives::pieces::{Piece, PieceIndex};
5use ab_core_primitives::segments::{
6 SegmentIndex, SuperSegmentHeader, SuperSegmentIndex, SuperSegmentRoot,
7};
8use ab_farmer_rpc_primitives::{
9 BlockSealInfo, BlockSealResponse, FarmerAppInfo, FarmerShardMembershipInfo, SlotInfo,
10 SolutionResponse,
11};
12use async_lock::Semaphore;
13use async_trait::async_trait;
14use futures::{Stream, StreamExt};
15use jsonrpsee::core::client::{ClientT, Error as JsonError, SubscriptionClientT};
16use jsonrpsee::rpc_params;
17use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
18use std::pin::Pin;
19use std::sync::Arc;
20
21const MAX_CONCURRENT_PIECE_REQUESTS: usize = 10;
27
28#[derive(Debug, Clone)]
33pub struct RpcNodeClient {
34 client: Arc<WsClient>,
35 piece_request_semaphore: Arc<Semaphore>,
36}
37
38impl RpcNodeClient {
39 pub async fn new(url: &str) -> Result<Self, JsonError> {
41 let client = Arc::new(
42 WsClientBuilder::default()
43 .max_request_size(20 * 1024 * 1024)
44 .build(url)
45 .await?,
46 );
47 let piece_request_semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_PIECE_REQUESTS));
48 Ok(Self {
49 client,
50 piece_request_semaphore,
51 })
52 }
53}
54
55#[async_trait]
56impl NodeClient for RpcNodeClient {
57 async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
58 Ok(self
59 .client
60 .request("getFarmerAppInfo", rpc_params![])
61 .await?)
62 }
63
64 async fn subscribe_slot_info(
65 &self,
66 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
67 let subscription = self
68 .client
69 .subscribe("subscribeSlotInfo", rpc_params![], "unsubscribeSlotInfo")
70 .await?;
71
72 Ok(Box::pin(subscription.filter_map(
73 |slot_info_result| async move { slot_info_result.ok() },
74 )))
75 }
76
77 async fn submit_solution_response(
78 &self,
79 solution_response: SolutionResponse,
80 ) -> anyhow::Result<()> {
81 Ok(self
82 .client
83 .request("submitSolutionResponse", rpc_params![&solution_response])
84 .await?)
85 }
86
87 async fn subscribe_block_sealing(
88 &self,
89 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = BlockSealInfo> + Send + 'static>>> {
90 let subscription = self
91 .client
92 .subscribe(
93 "subscribeBlockSealing",
94 rpc_params![],
95 "unsubscribeBlockSealing",
96 )
97 .await?;
98
99 Ok(Box::pin(subscription.filter_map(
100 |block_sealing_info_result| async move { block_sealing_info_result.ok() },
101 )))
102 }
103
104 async fn submit_block_seal(&self, block_seal: BlockSealResponse) -> anyhow::Result<()> {
105 Ok(self
106 .client
107 .request("submitBlockSeal", rpc_params![&block_seal])
108 .await?)
109 }
110
111 async fn subscribe_new_super_segment_headers(
112 &self,
113 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SuperSegmentHeader> + Send + 'static>>> {
114 let subscription = self
115 .client
116 .subscribe(
117 "subscribeNewSuperSegmentHeader",
118 rpc_params![],
119 "unsubscribeNewSuperSegmentHeader",
120 )
121 .await?;
122
123 Ok(Box::pin(subscription.filter_map(
124 |new_super_segment_header_result| async move { new_super_segment_header_result.ok() },
125 )))
126 }
127
128 async fn super_segment_headers(
129 &self,
130 super_segment_indices: Vec<SuperSegmentIndex>,
131 ) -> anyhow::Result<Vec<Option<SuperSegmentHeader>>> {
132 Ok(self
133 .client
134 .request("superSegmentHeaders", rpc_params![&super_segment_indices])
135 .await?)
136 }
137
138 async fn super_segment_root_for_segment_index(
139 &self,
140 segment_index: SegmentIndex,
141 ) -> anyhow::Result<Option<SuperSegmentRoot>> {
142 Ok(self
143 .client
144 .request(
145 "superSegmentRootForSegmentIndex",
146 rpc_params![&segment_index],
147 )
148 .await?)
149 }
150
151 async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
152 let _permit = self.piece_request_semaphore.acquire().await;
153 let client = Arc::clone(&self.client);
154 let piece_fut =
157 tokio::task::spawn(
158 async move { client.request("piece", rpc_params![&piece_index]).await },
159 );
160 Ok(piece_fut.await??)
161 }
162
163 async fn update_shard_membership_info(
164 &self,
165 info: FarmerShardMembershipInfo,
166 ) -> anyhow::Result<()> {
167 Ok(self
168 .client
169 .request("updateShardMembershipInfo", rpc_params![&info])
170 .await?)
171 }
172}
173
174#[async_trait]
175impl NodeClientExt for RpcNodeClient {
176 async fn cached_super_segment_headers(
177 &self,
178 super_segment_indices: Vec<SuperSegmentIndex>,
179 ) -> anyhow::Result<Vec<Option<SuperSegmentHeader>>> {
180 self.super_segment_headers(super_segment_indices).await
181 }
182
183 async fn last_super_segment_headers(
184 &self,
185 limit: u32,
186 ) -> anyhow::Result<Vec<Option<SuperSegmentHeader>>> {
187 Ok(self
188 .client
189 .request("lastSuperSegmentHeaders", rpc_params![limit])
190 .await?)
191 }
192}