ab_networking/
utils.rs

1//! Miscellaneous utilities for networking.
2
3pub(crate) mod key_with_distance;
4pub mod multihash;
5pub mod piece_provider;
6pub(crate) mod rate_limiter;
7
8use event_listener_primitives::Bag;
9use futures::future::{Fuse, FusedFuture, FutureExt};
10use libp2p::multiaddr::Protocol;
11use libp2p::{Multiaddr, PeerId};
12use prometheus_client::metrics::gauge::Gauge;
13use prometheus_client::registry::Registry;
14use std::future::Future;
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18use tokio::runtime::Handle;
19use tokio::task;
20use tracing::warn;
21
22const NETWORKING_REGISTRY_PREFIX: &str = "subspace";
23
24/// Metrics for Subspace networking
25#[derive(Debug)]
26pub struct SubspaceMetrics {
27    established_connections: Gauge,
28}
29
30impl SubspaceMetrics {
31    /// Constructor
32    pub fn new(registry: &mut Registry) -> Self {
33        let sub_registry = registry.sub_registry_with_prefix(NETWORKING_REGISTRY_PREFIX);
34
35        let gauge = Gauge::default();
36        sub_registry.register(
37            "established_connections",
38            "The current number of established connections",
39            gauge.clone(),
40        );
41
42        Self {
43            established_connections: gauge,
44        }
45    }
46
47    pub(crate) fn inc_established_connections(&self) {
48        self.established_connections.inc();
49    }
50
51    pub(crate) fn dec_established_connections(&self) {
52        self.established_connections.dec();
53    }
54}
55
56/// Joins async join handle on drop
57pub(crate) struct AsyncJoinOnDrop<T>(Option<Fuse<task::JoinHandle<T>>>);
58
59impl<T> Drop for AsyncJoinOnDrop<T> {
60    fn drop(&mut self) {
61        let handle = self.0.take().expect("Always called exactly once; qed");
62        if !handle.is_terminated() {
63            task::block_in_place(move || {
64                let _ = Handle::current().block_on(handle);
65            });
66        }
67    }
68}
69
70impl<T> AsyncJoinOnDrop<T> {
71    // Create new instance
72    pub(crate) fn new(handle: task::JoinHandle<T>) -> Self {
73        Self(Some(handle.fuse()))
74    }
75}
76
77impl<T> Future for AsyncJoinOnDrop<T> {
78    type Output = Result<T, task::JoinError>;
79
80    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
81        Pin::new(self.0.as_mut().expect("Only dropped in Drop impl; qed")).poll(cx)
82    }
83}
84
85/// This test is successful only for global IP addresses and DNS names.
86pub(crate) fn is_global_address_or_dns(addr: &Multiaddr) -> bool {
87    match addr.iter().next() {
88        Some(Protocol::Ip4(ip)) => ip.is_global(),
89        Some(Protocol::Ip6(ip)) => ip.is_global(),
90        Some(Protocol::Dns(_)) | Some(Protocol::Dns4(_)) | Some(Protocol::Dns6(_)) => true,
91        _ => false,
92    }
93}
94
95/// Convenience alias for peer ID and its multiaddresses.
96pub type PeerAddress = (PeerId, Multiaddr);
97
98/// Helper function. Converts multiaddresses to a tuple with peer ID removing the peer Id suffix.
99/// It logs incorrect multiaddresses.
100pub fn strip_peer_id(addresses: Vec<Multiaddr>) -> Vec<PeerAddress> {
101    addresses
102        .into_iter()
103        .filter_map(|multiaddr| {
104            let mut modified_multiaddr = multiaddr.clone();
105
106            let peer_id: Option<PeerId> = modified_multiaddr.pop().and_then(|protocol| {
107                if let Protocol::P2p(peer_id) = protocol {
108                    Some(peer_id)
109                } else {
110                    None
111                }
112            });
113
114            if let Some(peer_id) = peer_id {
115                Some((peer_id, modified_multiaddr))
116            } else {
117                warn!(%multiaddr, "Incorrect multiaddr provided.");
118
119                None
120            }
121        })
122        .collect()
123}
124
125pub(crate) type HandlerFn<A> = Arc<dyn Fn(&A) + Send + Sync + 'static>;
126pub(crate) type Handler<A> = Bag<HandlerFn<A>, A>;