1pub(crate) mod key_with_distance;
4pub mod multihash;
5pub mod piece_provider;
6pub(crate) mod rate_limiter;
7
8use event_listener_primitives::Bag;
9use futures::future::{Fuse, FusedFuture, FutureExt};
10use libp2p::multiaddr::Protocol;
11use libp2p::{Multiaddr, PeerId};
12use prometheus_client::metrics::gauge::Gauge;
13use prometheus_client::registry::Registry;
14use std::future::Future;
15use std::pin::Pin;
16use std::sync::Arc;
17use std::task::{Context, Poll};
18use tokio::runtime::Handle;
19use tokio::task;
20use tracing::warn;
21
22const NETWORKING_REGISTRY_PREFIX: &str = "subspace";
23
24#[derive(Debug)]
26pub struct SubspaceMetrics {
27 established_connections: Gauge,
28}
29
30impl SubspaceMetrics {
31 pub fn new(registry: &mut Registry) -> Self {
33 let sub_registry = registry.sub_registry_with_prefix(NETWORKING_REGISTRY_PREFIX);
34
35 let gauge = Gauge::default();
36 sub_registry.register(
37 "established_connections",
38 "The current number of established connections",
39 gauge.clone(),
40 );
41
42 Self {
43 established_connections: gauge,
44 }
45 }
46
47 pub(crate) fn inc_established_connections(&self) {
48 self.established_connections.inc();
49 }
50
51 pub(crate) fn dec_established_connections(&self) {
52 self.established_connections.dec();
53 }
54}
55
56pub(crate) struct AsyncJoinOnDrop<T>(Option<Fuse<task::JoinHandle<T>>>);
58
59impl<T> Drop for AsyncJoinOnDrop<T> {
60 fn drop(&mut self) {
61 let handle = self.0.take().expect("Always called exactly once; qed");
62 if !handle.is_terminated() {
63 task::block_in_place(move || {
64 let _ = Handle::current().block_on(handle);
65 });
66 }
67 }
68}
69
70impl<T> AsyncJoinOnDrop<T> {
71 pub(crate) fn new(handle: task::JoinHandle<T>) -> Self {
73 Self(Some(handle.fuse()))
74 }
75}
76
77impl<T> Future for AsyncJoinOnDrop<T> {
78 type Output = Result<T, task::JoinError>;
79
80 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
81 Pin::new(self.0.as_mut().expect("Only dropped in Drop impl; qed")).poll(cx)
82 }
83}
84
85pub(crate) fn is_global_address_or_dns(addr: &Multiaddr) -> bool {
87 match addr.iter().next() {
88 Some(Protocol::Ip4(ip)) => ip.is_global(),
89 Some(Protocol::Ip6(ip)) => ip.is_global(),
90 Some(Protocol::Dns(_)) | Some(Protocol::Dns4(_)) | Some(Protocol::Dns6(_)) => true,
91 _ => false,
92 }
93}
94
95pub type PeerAddress = (PeerId, Multiaddr);
97
98pub fn strip_peer_id(addresses: Vec<Multiaddr>) -> Vec<PeerAddress> {
101 addresses
102 .into_iter()
103 .filter_map(|multiaddr| {
104 let mut modified_multiaddr = multiaddr.clone();
105
106 let peer_id: Option<PeerId> = modified_multiaddr.pop().and_then(|protocol| {
107 if let Protocol::P2p(peer_id) = protocol {
108 Some(peer_id)
109 } else {
110 None
111 }
112 });
113
114 if let Some(peer_id) = peer_id {
115 Some((peer_id, modified_multiaddr))
116 } else {
117 warn!(%multiaddr, "Incorrect multiaddr provided.");
118
119 None
120 }
121 })
122 .collect()
123}
124
125pub(crate) type HandlerFn<A> = Arc<dyn Fn(&A) + Send + Sync + 'static>;
126pub(crate) type Handler<A> = Bag<HandlerFn<A>, A>;