Skip to main content

ab_farmer/node_client/
rpc_node_client.rs

1//! Node client implementation that connects to node via RPC (WebSockets)
2
3use crate::node_client::{NodeClient, NodeClientExt};
4use ab_core_primitives::pieces::{Piece, PieceIndex};
5use ab_core_primitives::segments::{
6    SegmentHeader, SegmentIndex, SuperSegmentHeader, SuperSegmentIndex,
7};
8use ab_farmer_rpc_primitives::{
9    BlockSealInfo, BlockSealResponse, FarmerAppInfo, FarmerShardMembershipInfo, SlotInfo,
10    SolutionResponse,
11};
12use async_lock::Semaphore;
13use async_trait::async_trait;
14use futures::{Stream, StreamExt};
15use jsonrpsee::core::client::{ClientT, Error as JsonError, SubscriptionClientT};
16use jsonrpsee::rpc_params;
17use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
18use std::pin::Pin;
19use std::sync::Arc;
20
21/// TODO: Node is having a hard time responding for many piece requests, specifically this results
22///  in subscriptions become broken on the node: https://github.com/paritytech/jsonrpsee/issues/1409
23///  This needs to be removed after Substrate upgrade when we can take advantage of new Substrate
24///  API that will prevent subscription breakage:
25///  https://github.com/paritytech/jsonrpsee/issues/1409#issuecomment-2303914643
26const MAX_CONCURRENT_PIECE_REQUESTS: usize = 10;
27
28/// Node client implementation that connects to node via RPC (WebSockets).
29///
30/// This implementation is supposed to be used on local network and not via public Internet due to
31/// sensitive contents.
32#[derive(Debug, Clone)]
33pub struct RpcNodeClient {
34    client: Arc<WsClient>,
35    piece_request_semaphore: Arc<Semaphore>,
36}
37
38impl RpcNodeClient {
39    /// Create a new instance of [`NodeClient`].
40    pub async fn new(url: &str) -> Result<Self, JsonError> {
41        let client = Arc::new(
42            WsClientBuilder::default()
43                .max_request_size(20 * 1024 * 1024)
44                .build(url)
45                .await?,
46        );
47        let piece_request_semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_PIECE_REQUESTS));
48        Ok(Self {
49            client,
50            piece_request_semaphore,
51        })
52    }
53}
54
55#[async_trait]
56impl NodeClient for RpcNodeClient {
57    async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
58        Ok(self
59            .client
60            .request("getFarmerAppInfo", rpc_params![])
61            .await?)
62    }
63
64    async fn subscribe_slot_info(
65        &self,
66    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
67        let subscription = self
68            .client
69            .subscribe("subscribeSlotInfo", rpc_params![], "unsubscribeSlotInfo")
70            .await?;
71
72        Ok(Box::pin(subscription.filter_map(
73            |slot_info_result| async move { slot_info_result.ok() },
74        )))
75    }
76
77    async fn submit_solution_response(
78        &self,
79        solution_response: SolutionResponse,
80    ) -> anyhow::Result<()> {
81        Ok(self
82            .client
83            .request("submitSolutionResponse", rpc_params![&solution_response])
84            .await?)
85    }
86
87    async fn subscribe_block_sealing(
88        &self,
89    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = BlockSealInfo> + Send + 'static>>> {
90        let subscription = self
91            .client
92            .subscribe(
93                "subscribeBlockSealing",
94                rpc_params![],
95                "unsubscribeBlockSealing",
96            )
97            .await?;
98
99        Ok(Box::pin(subscription.filter_map(
100            |block_sealing_info_result| async move { block_sealing_info_result.ok() },
101        )))
102    }
103
104    /// Submit a block seal
105    async fn submit_block_seal(&self, block_seal: BlockSealResponse) -> anyhow::Result<()> {
106        Ok(self
107            .client
108            .request("submitBlockSeal", rpc_params![&block_seal])
109            .await?)
110    }
111
112    async fn subscribe_archived_segment_headers(
113        &self,
114    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> {
115        let subscription = self
116            .client
117            .subscribe(
118                "subscribeArchivedSegmentHeader",
119                rpc_params![],
120                "unsubscribeArchivedSegmentHeader",
121            )
122            .await?;
123
124        Ok(Box::pin(subscription.filter_map(
125            |archived_segment_header_result| async move { archived_segment_header_result.ok() },
126        )))
127    }
128
129    async fn super_segment_headers(
130        &self,
131        super_segment_indices: Vec<SuperSegmentIndex>,
132    ) -> anyhow::Result<Vec<Option<SuperSegmentHeader>>> {
133        Ok(self
134            .client
135            .request("superSegmentHeaders", rpc_params![&super_segment_indices])
136            .await?)
137    }
138
139    async fn segment_headers(
140        &self,
141        segment_indices: Vec<SegmentIndex>,
142    ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
143        Ok(self
144            .client
145            .request("segmentHeaders", rpc_params![&segment_indices])
146            .await?)
147    }
148
149    async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
150        let _permit = self.piece_request_semaphore.acquire().await;
151        let client = Arc::clone(&self.client);
152        // Spawn a separate task to improve concurrency due to slow-ish JSON decoding that causes
153        // issues for jsonrpsee
154        let piece_fut =
155            tokio::task::spawn(
156                async move { client.request("piece", rpc_params![&piece_index]).await },
157            );
158        Ok(piece_fut.await??)
159    }
160
161    async fn acknowledge_archived_segment_header(
162        &self,
163        segment_index: SegmentIndex,
164    ) -> anyhow::Result<()> {
165        Ok(self
166            .client
167            .request(
168                "acknowledgeArchivedSegmentHeader",
169                rpc_params![&segment_index],
170            )
171            .await?)
172    }
173
174    async fn update_shard_membership_info(
175        &self,
176        info: FarmerShardMembershipInfo,
177    ) -> anyhow::Result<()> {
178        Ok(self
179            .client
180            .request("updateShardMembershipInfo", rpc_params![&info])
181            .await?)
182    }
183}
184
185#[async_trait]
186impl NodeClientExt for RpcNodeClient {
187    async fn cached_segment_headers(
188        &self,
189        segment_indices: Vec<SegmentIndex>,
190    ) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
191        self.segment_headers(segment_indices).await
192    }
193
194    async fn last_segment_headers(&self, limit: u32) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
195        Ok(self
196            .client
197            .request("lastSegmentHeaders", rpc_params![limit])
198            .await?)
199    }
200}