ab_farmer/node_client/
rpc_node_client.rs1use crate::node_client::{NodeClient, NodeClientExt};
4use ab_core_primitives::pieces::{Piece, PieceIndex};
5use ab_core_primitives::segments::{SegmentHeader, SegmentIndex};
6use ab_farmer_rpc_primitives::{
7 BlockSealInfo, BlockSealResponse, FarmerAppInfo, SlotInfo, SolutionResponse,
8};
9use async_lock::Semaphore;
10use async_trait::async_trait;
11use futures::{Stream, StreamExt};
12use jsonrpsee::core::client::{ClientT, Error as JsonError, SubscriptionClientT};
13use jsonrpsee::rpc_params;
14use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
15use std::pin::Pin;
16use std::sync::Arc;
17
18const MAX_CONCURRENT_PIECE_REQUESTS: usize = 10;
24
25#[derive(Debug, Clone)]
30pub struct RpcNodeClient {
31 client: Arc<WsClient>,
32 piece_request_semaphore: Arc<Semaphore>,
33}
34
35impl RpcNodeClient {
36 pub async fn new(url: &str) -> Result<Self, JsonError> {
38 let client = Arc::new(
39 WsClientBuilder::default()
40 .max_request_size(20 * 1024 * 1024)
41 .build(url)
42 .await?,
43 );
44 let piece_request_semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_PIECE_REQUESTS));
45 Ok(Self {
46 client,
47 piece_request_semaphore,
48 })
49 }
50}
51
52#[async_trait]
53impl NodeClient for RpcNodeClient {
54 async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
55 Ok(self
56 .client
57 .request("getFarmerAppInfo", rpc_params![])
58 .await?)
59 }
60
61 async fn subscribe_slot_info(
62 &self,
63 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
64 let subscription = self
65 .client
66 .subscribe("subscribeSlotInfo", rpc_params![], "unsubscribeSlotInfo")
67 .await?;
68
69 Ok(Box::pin(subscription.filter_map(
70 |slot_info_result| async move { slot_info_result.ok() },
71 )))
72 }
73
74 async fn submit_solution_response(
75 &self,
76 solution_response: SolutionResponse,
77 ) -> anyhow::Result<()> {
78 Ok(self
79 .client
80 .request("submitSolutionResponse", rpc_params![&solution_response])
81 .await?)
82 }
83
84 async fn subscribe_block_sealing(
85 &self,
86 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = BlockSealInfo> + Send + 'static>>> {
87 let subscription = self
88 .client
89 .subscribe(
90 "subscribeBlockSealing",
91 rpc_params![],
92 "unsubscribeBlockSealing",
93 )
94 .await?;
95
96 Ok(Box::pin(subscription.filter_map(
97 |block_sealing_info_result| async move { block_sealing_info_result.ok() },
98 )))
99 }
100
101 async fn submit_block_seal(&self, block_seal: BlockSealResponse) -> anyhow::Result<()> {
103 Ok(self
104 .client
105 .request("submitBlockSeal", rpc_params![&block_seal])
106 .await?)
107 }
108
109 async fn subscribe_archived_segment_headers(
110 &self,
111 ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> {
112 let subscription = self
113 .client
114 .subscribe(
115 "subscribeArchivedSegmentHeader",
116 rpc_params![],
117 "unsubscribeArchivedSegmentHeader",
118 )
119 .await?;
120
121 Ok(Box::pin(subscription.filter_map(
122 |archived_segment_header_result| async move { archived_segment_header_result.ok() },
123 )))
124 }
125
126 async fn segment_headers(
127 &self,
128 segment_indices: Vec<SegmentIndex>,
129 ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
130 Ok(self
131 .client
132 .request("segmentHeaders", rpc_params![&segment_indices])
133 .await?)
134 }
135
136 async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
137 let _permit = self.piece_request_semaphore.acquire().await;
138 let client = Arc::clone(&self.client);
139 let piece_fut =
142 tokio::task::spawn(
143 async move { client.request("piece", rpc_params![&piece_index]).await },
144 );
145 Ok(piece_fut.await??)
146 }
147
148 async fn acknowledge_archived_segment_header(
149 &self,
150 segment_index: SegmentIndex,
151 ) -> anyhow::Result<()> {
152 Ok(self
153 .client
154 .request(
155 "acknowledgeArchivedSegmentHeader",
156 rpc_params![&segment_index],
157 )
158 .await?)
159 }
160}
161
162#[async_trait]
163impl NodeClientExt for RpcNodeClient {
164 async fn cached_segment_headers(
165 &self,
166 segment_indices: Vec<SegmentIndex>,
167 ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
168 self.segment_headers(segment_indices).await
169 }
170
171 async fn last_segment_headers(&self, limit: u32) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
172 Ok(self
173 .client
174 .request("lastSegmentHeaders", rpc_params![limit])
175 .await?)
176 }
177}