Skip to main content

ab_farmer/node_client/
rpc_node_client.rs

1//! Node client implementation that connects to node via RPC (WebSockets)
2
3use crate::node_client::{NodeClient, NodeClientExt};
4use ab_core_primitives::pieces::{Piece, PieceIndex};
5use ab_core_primitives::segments::{
6    SegmentIndex, SuperSegmentHeader, SuperSegmentIndex, SuperSegmentRoot,
7};
8use ab_farmer_rpc_primitives::{
9    BlockSealInfo, BlockSealResponse, FarmerAppInfo, FarmerShardMembershipInfo, SlotInfo,
10    SolutionResponse,
11};
12use async_lock::Semaphore;
13use async_trait::async_trait;
14use futures::{Stream, StreamExt};
15use jsonrpsee::core::client::{ClientT, Error as JsonError, SubscriptionClientT};
16use jsonrpsee::rpc_params;
17use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
18use std::pin::Pin;
19use std::sync::Arc;
20
21/// TODO: Node is having a hard time responding for many piece requests, specifically this results
22///  in subscriptions become broken on the node: https://github.com/paritytech/jsonrpsee/issues/1409
23///  This needs to be removed after Substrate upgrade when we can take advantage of new Substrate
24///  API that will prevent subscription breakage:
25///  https://github.com/paritytech/jsonrpsee/issues/1409#issuecomment-2303914643
26const MAX_CONCURRENT_PIECE_REQUESTS: usize = 10;
27
28/// Node client implementation that connects to node via RPC (WebSockets).
29///
30/// This implementation is supposed to be used on local network and not via public Internet due to
31/// sensitive contents.
32#[derive(Debug, Clone)]
33pub struct RpcNodeClient {
34    client: Arc<WsClient>,
35    piece_request_semaphore: Arc<Semaphore>,
36}
37
38impl RpcNodeClient {
39    /// Create a new instance of [`NodeClient`].
40    pub async fn new(url: &str) -> Result<Self, JsonError> {
41        let client = Arc::new(
42            WsClientBuilder::default()
43                .max_request_size(20 * 1024 * 1024)
44                .build(url)
45                .await?,
46        );
47        let piece_request_semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_PIECE_REQUESTS));
48        Ok(Self {
49            client,
50            piece_request_semaphore,
51        })
52    }
53}
54
55#[async_trait]
56impl NodeClient for RpcNodeClient {
57    async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
58        Ok(self
59            .client
60            .request("getFarmerAppInfo", rpc_params![])
61            .await?)
62    }
63
64    async fn subscribe_slot_info(
65        &self,
66    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
67        let subscription = self
68            .client
69            .subscribe("subscribeSlotInfo", rpc_params![], "unsubscribeSlotInfo")
70            .await?;
71
72        Ok(Box::pin(subscription.filter_map(
73            |slot_info_result| async move { slot_info_result.ok() },
74        )))
75    }
76
77    async fn submit_solution_response(
78        &self,
79        solution_response: SolutionResponse,
80    ) -> anyhow::Result<()> {
81        Ok(self
82            .client
83            .request("submitSolutionResponse", rpc_params![&solution_response])
84            .await?)
85    }
86
87    async fn subscribe_block_sealing(
88        &self,
89    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = BlockSealInfo> + Send + 'static>>> {
90        let subscription = self
91            .client
92            .subscribe(
93                "subscribeBlockSealing",
94                rpc_params![],
95                "unsubscribeBlockSealing",
96            )
97            .await?;
98
99        Ok(Box::pin(subscription.filter_map(
100            |block_sealing_info_result| async move { block_sealing_info_result.ok() },
101        )))
102    }
103
104    async fn submit_block_seal(&self, block_seal: BlockSealResponse) -> anyhow::Result<()> {
105        Ok(self
106            .client
107            .request("submitBlockSeal", rpc_params![&block_seal])
108            .await?)
109    }
110
111    async fn subscribe_new_super_segment_headers(
112        &self,
113    ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SuperSegmentHeader> + Send + 'static>>> {
114        let subscription = self
115            .client
116            .subscribe(
117                "subscribeNewSuperSegmentHeader",
118                rpc_params![],
119                "unsubscribeNewSuperSegmentHeader",
120            )
121            .await?;
122
123        Ok(Box::pin(subscription.filter_map(
124            |new_super_segment_header_result| async move { new_super_segment_header_result.ok() },
125        )))
126    }
127
128    async fn super_segment_headers(
129        &self,
130        super_segment_indices: Vec<SuperSegmentIndex>,
131    ) -> anyhow::Result<Vec<Option<SuperSegmentHeader>>> {
132        Ok(self
133            .client
134            .request("superSegmentHeaders", rpc_params![&super_segment_indices])
135            .await?)
136    }
137
138    async fn super_segment_root_for_segment_index(
139        &self,
140        segment_index: SegmentIndex,
141    ) -> anyhow::Result<Option<SuperSegmentRoot>> {
142        Ok(self
143            .client
144            .request(
145                "superSegmentRootForSegmentIndex",
146                rpc_params![&segment_index],
147            )
148            .await?)
149    }
150
151    async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
152        let _permit = self.piece_request_semaphore.acquire().await;
153        let client = Arc::clone(&self.client);
154        // Spawn a separate task to improve concurrency due to slow-ish JSON decoding that causes
155        // issues for jsonrpsee
156        let piece_fut =
157            tokio::task::spawn(
158                async move { client.request("piece", rpc_params![&piece_index]).await },
159            );
160        Ok(piece_fut.await??)
161    }
162
163    async fn update_shard_membership_info(
164        &self,
165        info: FarmerShardMembershipInfo,
166    ) -> anyhow::Result<()> {
167        Ok(self
168            .client
169            .request("updateShardMembershipInfo", rpc_params![&info])
170            .await?)
171    }
172}
173
174#[async_trait]
175impl NodeClientExt for RpcNodeClient {
176    async fn cached_super_segment_headers(
177        &self,
178        super_segment_indices: Vec<SuperSegmentIndex>,
179    ) -> anyhow::Result<Vec<Option<SuperSegmentHeader>>> {
180        self.super_segment_headers(super_segment_indices).await
181    }
182
183    async fn last_super_segment_headers(
184        &self,
185        limit: u32,
186    ) -> anyhow::Result<Vec<Option<SuperSegmentHeader>>> {
187        Ok(self
188            .client
189            .request("lastSuperSegmentHeaders", rpc_params![limit])
190            .await?)
191    }
192}