ab_farmer/node_client/
rpc_node_client.rs

1//! Node client implementation that connects to node via RPC (WebSockets)
2
3use crate::node_client::{NodeClient, NodeClientExt};
4use ab_core_primitives::pieces::{Piece, PieceIndex};
5use ab_core_primitives::segments::{SegmentHeader, SegmentIndex};
6use ab_farmer_rpc_primitives::{
7    BlockSealInfo, BlockSealResponse, FarmerAppInfo, SlotInfo, SolutionResponse,
8};
9use async_lock::Semaphore;
10use async_trait::async_trait;
11use futures::{Stream, StreamExt};
12use jsonrpsee::core::client::{ClientT, Error as JsonError, SubscriptionClientT};
13use jsonrpsee::rpc_params;
14use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
15use std::pin::Pin;
16use std::sync::Arc;
17
18/// TODO: Node is having a hard time responding for many piece requests, specifically this results
19///  in subscriptions become broken on the node: https://github.com/paritytech/jsonrpsee/issues/1409
20///  This needs to be removed after Substrate upgrade when we can take advantage of new Substrate
21///  API that will prevent subscription breakage:
22///  https://github.com/paritytech/jsonrpsee/issues/1409#issuecomment-2303914643
23const MAX_CONCURRENT_PIECE_REQUESTS: usize = 10;
24
25/// Node client implementation that connects to node via RPC (WebSockets).
26///
27/// This implementation is supposed to be used on local network and not via public Internet due to
28/// sensitive contents.
29#[derive(Debug, Clone)]
30pub struct RpcNodeClient {
31    client: Arc<WsClient>,
32    piece_request_semaphore: Arc<Semaphore>,
33}
34
35impl RpcNodeClient {
36    /// Create a new instance of [`NodeClient`].
37    pub async fn new(url: &str) -> Result<Self, JsonError> {
38        let client = Arc::new(
39            WsClientBuilder::default()
40                .max_request_size(20 * 1024 * 1024)
41                .build(url)
42                .await?,
43        );
44        let piece_request_semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_PIECE_REQUESTS));
45        Ok(Self {
46            client,
47            piece_request_semaphore,
48        })
49    }
50}
51
52#[async_trait]
53impl NodeClient for RpcNodeClient {
54    async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
55        Ok(self
56            .client
57            .request("getFarmerAppInfo", rpc_params![])
58            .await?)
59    }
60
61    async fn subscribe_slot_info(
62        &self,
63    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
64        let subscription = self
65            .client
66            .subscribe("subscribeSlotInfo", rpc_params![], "unsubscribeSlotInfo")
67            .await?;
68
69        Ok(Box::pin(subscription.filter_map(
70            |slot_info_result| async move { slot_info_result.ok() },
71        )))
72    }
73
74    async fn submit_solution_response(
75        &self,
76        solution_response: SolutionResponse,
77    ) -> anyhow::Result<()> {
78        Ok(self
79            .client
80            .request("submitSolutionResponse", rpc_params![&solution_response])
81            .await?)
82    }
83
84    async fn subscribe_block_sealing(
85        &self,
86    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = BlockSealInfo> + Send + 'static>>> {
87        let subscription = self
88            .client
89            .subscribe(
90                "subscribeBlockSealing",
91                rpc_params![],
92                "unsubscribeBlockSealing",
93            )
94            .await?;
95
96        Ok(Box::pin(subscription.filter_map(
97            |block_sealing_info_result| async move { block_sealing_info_result.ok() },
98        )))
99    }
100
101    /// Submit a block seal
102    async fn submit_block_seal(&self, block_seal: BlockSealResponse) -> anyhow::Result<()> {
103        Ok(self
104            .client
105            .request("submitBlockSeal", rpc_params![&block_seal])
106            .await?)
107    }
108
109    async fn subscribe_archived_segment_headers(
110        &self,
111    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> {
112        let subscription = self
113            .client
114            .subscribe(
115                "subscribeArchivedSegmentHeader",
116                rpc_params![],
117                "unsubscribeArchivedSegmentHeader",
118            )
119            .await?;
120
121        Ok(Box::pin(subscription.filter_map(
122            |archived_segment_header_result| async move { archived_segment_header_result.ok() },
123        )))
124    }
125
126    async fn segment_headers(
127        &self,
128        segment_indices: Vec<SegmentIndex>,
129    ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
130        Ok(self
131            .client
132            .request("segmentHeaders", rpc_params![&segment_indices])
133            .await?)
134    }
135
136    async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
137        let _permit = self.piece_request_semaphore.acquire().await;
138        let client = Arc::clone(&self.client);
139        // Spawn a separate task to improve concurrency due to slow-ish JSON decoding that causes
140        // issues for jsonrpsee
141        let piece_fut =
142            tokio::task::spawn(
143                async move { client.request("piece", rpc_params![&piece_index]).await },
144            );
145        Ok(piece_fut.await??)
146    }
147
148    async fn acknowledge_archived_segment_header(
149        &self,
150        segment_index: SegmentIndex,
151    ) -> anyhow::Result<()> {
152        Ok(self
153            .client
154            .request(
155                "acknowledgeArchivedSegmentHeader",
156                rpc_params![&segment_index],
157            )
158            .await?)
159    }
160}
161
162#[async_trait]
163impl NodeClientExt for RpcNodeClient {
164    async fn cached_segment_headers(
165        &self,
166        segment_indices: Vec<SegmentIndex>,
167    ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
168        self.segment_headers(segment_indices).await
169    }
170
171    async fn last_segment_headers(&self, limit: u32) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
172        Ok(self
173            .client
174            .request("lastSegmentHeaders", rpc_params![limit])
175            .await?)
176    }
177}