ab_farmer/node_client/
rpc_node_client.rs

1//! Node client implementation that connects to node via RPC (WebSockets)
2
3use crate::node_client::{NodeClient, NodeClientExt};
4use ab_core_primitives::pieces::{Piece, PieceIndex};
5use ab_core_primitives::segments::{SegmentHeader, SegmentIndex};
6use ab_farmer_rpc_primitives::{
7    BlockSealInfo, BlockSealResponse, FarmerAppInfo, FarmerShardMembershipInfo, SlotInfo,
8    SolutionResponse,
9};
10use async_lock::Semaphore;
11use async_trait::async_trait;
12use futures::{Stream, StreamExt};
13use jsonrpsee::core::client::{ClientT, Error as JsonError, SubscriptionClientT};
14use jsonrpsee::rpc_params;
15use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
16use std::pin::Pin;
17use std::sync::Arc;
18
19/// TODO: Node is having a hard time responding for many piece requests, specifically this results
20///  in subscriptions become broken on the node: https://github.com/paritytech/jsonrpsee/issues/1409
21///  This needs to be removed after Substrate upgrade when we can take advantage of new Substrate
22///  API that will prevent subscription breakage:
23///  https://github.com/paritytech/jsonrpsee/issues/1409#issuecomment-2303914643
24const MAX_CONCURRENT_PIECE_REQUESTS: usize = 10;
25
26/// Node client implementation that connects to node via RPC (WebSockets).
27///
28/// This implementation is supposed to be used on local network and not via public Internet due to
29/// sensitive contents.
30#[derive(Debug, Clone)]
31pub struct RpcNodeClient {
32    client: Arc<WsClient>,
33    piece_request_semaphore: Arc<Semaphore>,
34}
35
36impl RpcNodeClient {
37    /// Create a new instance of [`NodeClient`].
38    pub async fn new(url: &str) -> Result<Self, JsonError> {
39        let client = Arc::new(
40            WsClientBuilder::default()
41                .max_request_size(20 * 1024 * 1024)
42                .build(url)
43                .await?,
44        );
45        let piece_request_semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_PIECE_REQUESTS));
46        Ok(Self {
47            client,
48            piece_request_semaphore,
49        })
50    }
51}
52
53#[async_trait]
54impl NodeClient for RpcNodeClient {
55    async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
56        Ok(self
57            .client
58            .request("getFarmerAppInfo", rpc_params![])
59            .await?)
60    }
61
62    async fn subscribe_slot_info(
63        &self,
64    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
65        let subscription = self
66            .client
67            .subscribe("subscribeSlotInfo", rpc_params![], "unsubscribeSlotInfo")
68            .await?;
69
70        Ok(Box::pin(subscription.filter_map(
71            |slot_info_result| async move { slot_info_result.ok() },
72        )))
73    }
74
75    async fn submit_solution_response(
76        &self,
77        solution_response: SolutionResponse,
78    ) -> anyhow::Result<()> {
79        Ok(self
80            .client
81            .request("submitSolutionResponse", rpc_params![&solution_response])
82            .await?)
83    }
84
85    async fn subscribe_block_sealing(
86        &self,
87    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = BlockSealInfo> + Send + 'static>>> {
88        let subscription = self
89            .client
90            .subscribe(
91                "subscribeBlockSealing",
92                rpc_params![],
93                "unsubscribeBlockSealing",
94            )
95            .await?;
96
97        Ok(Box::pin(subscription.filter_map(
98            |block_sealing_info_result| async move { block_sealing_info_result.ok() },
99        )))
100    }
101
102    /// Submit a block seal
103    async fn submit_block_seal(&self, block_seal: BlockSealResponse) -> anyhow::Result<()> {
104        Ok(self
105            .client
106            .request("submitBlockSeal", rpc_params![&block_seal])
107            .await?)
108    }
109
110    async fn subscribe_archived_segment_headers(
111        &self,
112    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> {
113        let subscription = self
114            .client
115            .subscribe(
116                "subscribeArchivedSegmentHeader",
117                rpc_params![],
118                "unsubscribeArchivedSegmentHeader",
119            )
120            .await?;
121
122        Ok(Box::pin(subscription.filter_map(
123            |archived_segment_header_result| async move { archived_segment_header_result.ok() },
124        )))
125    }
126
127    async fn segment_headers(
128        &self,
129        segment_indices: Vec<SegmentIndex>,
130    ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
131        Ok(self
132            .client
133            .request("segmentHeaders", rpc_params![&segment_indices])
134            .await?)
135    }
136
137    async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
138        let _permit = self.piece_request_semaphore.acquire().await;
139        let client = Arc::clone(&self.client);
140        // Spawn a separate task to improve concurrency due to slow-ish JSON decoding that causes
141        // issues for jsonrpsee
142        let piece_fut =
143            tokio::task::spawn(
144                async move { client.request("piece", rpc_params![&piece_index]).await },
145            );
146        Ok(piece_fut.await??)
147    }
148
149    async fn acknowledge_archived_segment_header(
150        &self,
151        segment_index: SegmentIndex,
152    ) -> anyhow::Result<()> {
153        Ok(self
154            .client
155            .request(
156                "acknowledgeArchivedSegmentHeader",
157                rpc_params![&segment_index],
158            )
159            .await?)
160    }
161
162    async fn update_shard_membership_info(
163        &self,
164        info: FarmerShardMembershipInfo,
165    ) -> anyhow::Result<()> {
166        Ok(self
167            .client
168            .request("updateShardMembershipInfo", rpc_params![&info])
169            .await?)
170    }
171}
172
173#[async_trait]
174impl NodeClientExt for RpcNodeClient {
175    async fn cached_segment_headers(
176        &self,
177        segment_indices: Vec<SegmentIndex>,
178    ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
179        self.segment_headers(segment_indices).await
180    }
181
182    async fn last_segment_headers(&self, limit: u32) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
183        Ok(self
184            .client
185            .request("lastSegmentHeaders", rpc_params![limit])
186            .await?)
187    }
188}