ab_networking/
constructor.rs

1pub(crate) mod temporary_bans;
2mod transport;
3
4use crate::behavior::persistent_parameters::{KnownPeersRegistry, StubNetworkingParametersManager};
5use crate::behavior::{Behavior, BehaviorConfig};
6use crate::constructor::temporary_bans::TemporaryBans;
7use crate::constructor::transport::build_transport;
8use crate::node::Node;
9use crate::node_runner::{NodeRunner, NodeRunnerConfig};
10use crate::protocols::autonat_wrapper::Config as AutonatWrapperConfig;
11use crate::protocols::request_response::request_response_factory::RequestHandler;
12use crate::protocols::reserved_peers::Config as ReservedPeersConfig;
13use crate::shared::Shared;
14use crate::utils::rate_limiter::RateLimiter;
15use crate::utils::{SubspaceMetrics, strip_peer_id};
16use ab_core_primitives::pieces::Piece;
17use backoff::{ExponentialBackoff, SystemClock};
18use futures::channel::mpsc;
19use libp2p::autonat::Config as AutonatConfig;
20use libp2p::connection_limits::ConnectionLimits;
21use libp2p::gossipsub::{
22    Config as GossipsubConfig, ConfigBuilder as GossipsubConfigBuilder,
23    Message as GossipsubMessage, MessageId, ValidationMode,
24};
25use libp2p::identify::Config as IdentifyConfig;
26use libp2p::kad::store::RecordStore;
27use libp2p::kad::{
28    BucketInserts, Config as KademliaConfig, Mode, ProviderRecord, Record, RecordKey, StoreInserts,
29    store,
30};
31use libp2p::metrics::Metrics;
32use libp2p::multiaddr::Protocol;
33use libp2p::yamux::Config as YamuxConfig;
34use libp2p::{Multiaddr, PeerId, StreamProtocol, SwarmBuilder, TransportError, identity};
35use parking_lot::Mutex;
36use prometheus_client::registry::Registry;
37use std::borrow::Cow;
38use std::iter::Empty;
39use std::sync::Arc;
40use std::time::{Duration, Instant};
41use std::{fmt, io, iter};
42use thiserror::Error;
43use tracing::{debug, info};
44
45const DEFAULT_NETWORK_PROTOCOL_VERSION: &str = "dev";
46const KADEMLIA_PROTOCOL: &str = "/subspace/kad/0.1.0";
47const GOSSIPSUB_PROTOCOL_PREFIX: &str = "subspace/gossipsub";
48
49/// Defines max_negotiating_inbound_streams constant for the swarm.
50/// It must be set for large plots.
51const SWARM_MAX_NEGOTIATING_INBOUND_STREAMS: usize = 100000;
52/// How long will connection be allowed to be open without any usage
53const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(3);
54/// The default maximum established incoming connection number for the swarm.
55const SWARM_MAX_ESTABLISHED_INCOMING_CONNECTIONS: u32 = 100;
56/// The default maximum established incoming connection number for the swarm.
57const SWARM_MAX_ESTABLISHED_OUTGOING_CONNECTIONS: u32 = 100;
58/// The default maximum pending incoming connection number for the swarm.
59const SWARM_MAX_PENDING_INCOMING_CONNECTIONS: u32 = 80;
60/// The default maximum pending incoming connection number for the swarm.
61const SWARM_MAX_PENDING_OUTGOING_CONNECTIONS: u32 = 80;
62const KADEMLIA_QUERY_TIMEOUT: Duration = Duration::from_secs(40);
63const SWARM_MAX_ESTABLISHED_CONNECTIONS_PER_PEER: u32 = 3;
64const MAX_CONCURRENT_STREAMS_PER_CONNECTION: usize = 10;
65// TODO: Consider moving this constant to configuration or removing `Toggle` wrapper when we find a
66//  use-case for gossipsub protocol.
67const ENABLE_GOSSIP_PROTOCOL: bool = false;
68
69const TEMPORARY_BANS_CACHE_SIZE: u32 = 10_000;
70const TEMPORARY_BANS_DEFAULT_BACKOFF_INITIAL_INTERVAL: Duration = Duration::from_secs(5);
71const TEMPORARY_BANS_DEFAULT_BACKOFF_RANDOMIZATION_FACTOR: f64 = 0.1;
72const TEMPORARY_BANS_DEFAULT_BACKOFF_MULTIPLIER: f64 = 1.5;
73const TEMPORARY_BANS_DEFAULT_MAX_INTERVAL: Duration = Duration::from_secs(30 * 60);
74
75/// We pause between reserved peers dialing otherwise we could do multiple dials to offline peers
76/// wasting resources and producing a ton of log records.
77const DIALING_INTERVAL_IN_SECS: Duration = Duration::from_secs(1);
78
79/// Specific YAMUX settings for Subspace applications: additional buffer space for pieces and
80/// substream's limit.
81///
82/// Defines a replication factor for Kademlia on get_record operation.
83/// "Good citizen" supports the network health.
84const YAMUX_MAX_STREAMS: usize = 256;
85
86/// Max confidence for autonat protocol. Could affect Kademlia mode change.
87pub(crate) const AUTONAT_MAX_CONFIDENCE: usize = 3;
88/// We set a very long pause before autonat initialization (Duration::Max panics).
89const AUTONAT_SERVER_PROBE_DELAY: Duration = Duration::from_secs(3600 * 24 * 365);
90
91/// Defines Kademlia mode
92#[derive(Clone, Debug)]
93pub enum KademliaMode {
94    /// The Kademlia mode is static for the duration of the application.
95    Static(Mode),
96    /// Kademlia mode will be changed using Autonat protocol when max confidence reached.
97    Dynamic,
98}
99
100impl KademliaMode {
101    /// Returns true if the mode is Dynamic.
102    pub fn is_dynamic(&self) -> bool {
103        matches!(self, Self::Dynamic)
104    }
105
106    /// Returns true if the mode is Static.
107    pub fn is_static(&self) -> bool {
108        matches!(self, Self::Static(..))
109    }
110}
111
112pub(crate) struct DummyRecordStore;
113
114impl RecordStore for DummyRecordStore {
115    type RecordsIter<'a>
116        = Empty<Cow<'a, Record>>
117    where
118        Self: 'a;
119    type ProvidedIter<'a>
120        = Empty<Cow<'a, ProviderRecord>>
121    where
122        Self: 'a;
123
124    #[inline]
125    fn get(&self, _key: &RecordKey) -> Option<Cow<'_, Record>> {
126        // Not supported
127        None
128    }
129
130    #[inline]
131    fn put(&mut self, _record: Record) -> store::Result<()> {
132        // Not supported
133        Ok(())
134    }
135
136    #[inline]
137    fn remove(&mut self, _key: &RecordKey) {
138        // Not supported
139    }
140
141    #[inline]
142    fn records(&self) -> Self::RecordsIter<'_> {
143        // Not supported
144        iter::empty()
145    }
146
147    #[inline]
148    fn add_provider(&mut self, _record: ProviderRecord) -> store::Result<()> {
149        // Not supported
150        Ok(())
151    }
152
153    #[inline]
154    fn providers(&self, _key: &RecordKey) -> Vec<ProviderRecord> {
155        // Not supported
156        Vec::new()
157    }
158
159    #[inline]
160    fn provided(&self) -> Self::ProvidedIter<'_> {
161        // Not supported
162        iter::empty()
163    }
164
165    #[inline]
166    fn remove_provider(&mut self, _key: &RecordKey, _provider: &PeerId) {
167        // Not supported
168    }
169}
170
171/// [`Node`] configuration.
172pub struct Config {
173    /// Identity keypair of a node used for authenticated connections.
174    pub keypair: identity::Keypair,
175    /// List of [`Multiaddr`] on which to listen for incoming connections.
176    pub listen_on: Vec<Multiaddr>,
177    /// Fallback to random port if specified (or default) port is already occupied.
178    pub listen_on_fallback_to_random_port: bool,
179    /// Adds a timeout to the setup and protocol upgrade process for all inbound and outbound
180    /// connections established through the transport.
181    pub timeout: Duration,
182    /// The configuration for the Identify behaviour.
183    pub identify: IdentifyConfig,
184    /// The configuration for the Kademlia behaviour.
185    pub kademlia: KademliaConfig,
186    /// The configuration for the Gossip behaviour.
187    pub gossipsub: Option<GossipsubConfig>,
188    /// Yamux multiplexing configuration.
189    pub yamux_config: YamuxConfig,
190    /// Should non-global addresses be added to the DHT?
191    pub allow_non_global_addresses_in_dht: bool,
192    /// How frequently should random queries be done using Kademlia DHT to populate routing table.
193    pub initial_random_query_interval: Duration,
194    /// A reference to the `NetworkingParametersRegistry` implementation.
195    pub known_peers_registry: Box<dyn KnownPeersRegistry>,
196    /// The configuration for the `RequestResponsesBehaviour` protocol.
197    pub request_response_protocols: Vec<Box<dyn RequestHandler>>,
198    /// Defines set of peers with a permanent connection (and reconnection if necessary).
199    pub reserved_peers: Vec<Multiaddr>,
200    /// Established incoming swarm connection limit.
201    pub max_established_incoming_connections: u32,
202    /// Established outgoing swarm connection limit.
203    pub max_established_outgoing_connections: u32,
204    /// Pending incoming swarm connection limit.
205    pub max_pending_incoming_connections: u32,
206    /// Pending outgoing swarm connection limit.
207    pub max_pending_outgoing_connections: u32,
208    /// How many temporarily banned unreachable peers to keep in memory.
209    pub temporary_bans_cache_size: u32,
210    /// Backoff policy for temporary banning of unreachable peers.
211    pub temporary_ban_backoff: ExponentialBackoff,
212    /// Optional libp2p prometheus metrics. None will disable metrics gathering.
213    pub libp2p_metrics: Option<Metrics>,
214    /// Internal prometheus metrics. None will disable metrics gathering.
215    pub metrics: Option<SubspaceMetrics>,
216    /// Defines protocol version for the network peers. Affects network partition.
217    pub protocol_version: String,
218    /// Addresses to bootstrap Kademlia network
219    pub bootstrap_addresses: Vec<Multiaddr>,
220    /// Kademlia mode. The default value is set to Static(Client). The peer won't add its address
221    /// to other peers` Kademlia routing table. Changing this behaviour implies that a peer can
222    /// provide pieces to others.
223    pub kademlia_mode: KademliaMode,
224    /// Known external addresses to the local peer. The addresses will be added on the swarm start
225    /// and enable peer to notify others about its reachable address.
226    pub external_addresses: Vec<Multiaddr>,
227}
228
229impl fmt::Debug for Config {
230    #[inline]
231    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
232        f.debug_struct("Config").finish()
233    }
234}
235
236/// This default can only be used for `dev` networks.
237/// Other networks should use `Config::new()` to apply the correct prefix to the protocol version.
238impl Default for Config {
239    #[inline]
240    fn default() -> Self {
241        let ed25519_keypair = identity::ed25519::Keypair::generate();
242        let keypair = identity::Keypair::from(ed25519_keypair);
243
244        Self::new(DEFAULT_NETWORK_PROTOCOL_VERSION.to_string(), keypair, None)
245    }
246}
247
248impl Config {
249    /// Creates a new [`Config`].
250    /// Applies a subspace-specific version prefix to the `protocol_version`.
251    pub fn new(
252        protocol_version: String,
253        keypair: identity::Keypair,
254        prometheus_registry: Option<&mut Registry>,
255    ) -> Self {
256        let (libp2p_metrics, metrics) = prometheus_registry
257            .map(|registry| {
258                (
259                    Some(Metrics::new(registry)),
260                    Some(SubspaceMetrics::new(registry)),
261                )
262            })
263            .unwrap_or((None, None));
264
265        let mut kademlia = KademliaConfig::new(
266            StreamProtocol::try_from_owned(KADEMLIA_PROTOCOL.to_owned())
267                .expect("Manual protocol name creation."),
268        );
269        kademlia
270            .set_query_timeout(KADEMLIA_QUERY_TIMEOUT)
271            .disjoint_query_paths(true)
272            .set_max_packet_size(2 * Piece::SIZE)
273            .set_kbucket_inserts(BucketInserts::Manual)
274            .set_record_filtering(StoreInserts::FilterBoth)
275            // We don't use records and providers publication.
276            .set_provider_record_ttl(None)
277            .set_provider_publication_interval(None)
278            .set_record_ttl(None)
279            .set_replication_interval(None);
280
281        let mut yamux_config = YamuxConfig::default();
282        yamux_config.set_max_num_streams(YAMUX_MAX_STREAMS);
283
284        let gossipsub = ENABLE_GOSSIP_PROTOCOL.then(|| {
285            GossipsubConfigBuilder::default()
286                .protocol_id_prefix(GOSSIPSUB_PROTOCOL_PREFIX)
287                // TODO: Do we want message signing?
288                .validation_mode(ValidationMode::None)
289                // To content-address message, we can take the hash of message and use it as an ID.
290                .message_id_fn(|message: &GossipsubMessage| {
291                    MessageId::from(blake3::hash(&message.data).as_bytes())
292                })
293                .max_transmit_size(2 * 1024 * 1024) // 2MB
294                .build()
295                .expect("Default config for gossipsub is always correct; qed")
296        });
297
298        let protocol_version = format!("/subspace/2/{protocol_version}");
299        let identify = IdentifyConfig::new(protocol_version.clone(), keypair.public());
300
301        let temporary_ban_backoff = ExponentialBackoff {
302            current_interval: TEMPORARY_BANS_DEFAULT_BACKOFF_INITIAL_INTERVAL,
303            initial_interval: TEMPORARY_BANS_DEFAULT_BACKOFF_INITIAL_INTERVAL,
304            randomization_factor: TEMPORARY_BANS_DEFAULT_BACKOFF_RANDOMIZATION_FACTOR,
305            multiplier: TEMPORARY_BANS_DEFAULT_BACKOFF_MULTIPLIER,
306            max_interval: TEMPORARY_BANS_DEFAULT_MAX_INTERVAL,
307            start_time: Instant::now(),
308            max_elapsed_time: None,
309            clock: SystemClock::default(),
310        };
311
312        Self {
313            keypair,
314            listen_on: vec![],
315            listen_on_fallback_to_random_port: true,
316            timeout: Duration::from_secs(10),
317            identify,
318            kademlia,
319            gossipsub,
320            allow_non_global_addresses_in_dht: false,
321            initial_random_query_interval: Duration::from_secs(1),
322            known_peers_registry: StubNetworkingParametersManager.boxed(),
323            request_response_protocols: Vec::new(),
324            yamux_config,
325            reserved_peers: Vec::new(),
326            max_established_incoming_connections: SWARM_MAX_ESTABLISHED_INCOMING_CONNECTIONS,
327            max_established_outgoing_connections: SWARM_MAX_ESTABLISHED_OUTGOING_CONNECTIONS,
328            max_pending_incoming_connections: SWARM_MAX_PENDING_INCOMING_CONNECTIONS,
329            max_pending_outgoing_connections: SWARM_MAX_PENDING_OUTGOING_CONNECTIONS,
330            temporary_bans_cache_size: TEMPORARY_BANS_CACHE_SIZE,
331            temporary_ban_backoff,
332            libp2p_metrics,
333            metrics,
334            protocol_version,
335            bootstrap_addresses: Vec::new(),
336            kademlia_mode: KademliaMode::Static(Mode::Client),
337            external_addresses: Vec::new(),
338        }
339    }
340}
341
342/// Errors that might happen during network creation.
343#[derive(Debug, Error)]
344pub enum CreationError {
345    /// Circuit relay client error.
346    #[error("Expected relay server node.")]
347    RelayServerExpected,
348    /// I/O error.
349    #[error("I/O error: {0}")]
350    Io(#[from] io::Error),
351    /// Transport creation error.
352    #[error("Transport creation error: {0}")]
353    // TODO: Restore `#[from] TransportError` once https://github.com/libp2p/rust-libp2p/issues/4824
354    //  is resolved
355    TransportCreationError(Box<dyn std::error::Error + Send + Sync>),
356    /// Transport error when attempting to listen on multiaddr.
357    #[error("Transport error when attempting to listen on multiaddr: {0}")]
358    TransportError(#[from] TransportError<io::Error>),
359}
360
361/// Converts public key from keypair to PeerId.
362/// It serves as the shared PeerId generating algorithm.
363pub fn peer_id(keypair: &identity::Keypair) -> PeerId {
364    keypair.public().to_peer_id()
365}
366
367/// Create a new network node and node runner instances.
368pub fn construct(config: Config) -> Result<(Node, NodeRunner), CreationError> {
369    let Config {
370        keypair,
371        listen_on,
372        listen_on_fallback_to_random_port,
373        timeout,
374        identify,
375        kademlia,
376        gossipsub,
377        yamux_config,
378        allow_non_global_addresses_in_dht,
379        initial_random_query_interval,
380        known_peers_registry,
381        request_response_protocols,
382        reserved_peers,
383        max_established_incoming_connections,
384        max_established_outgoing_connections,
385        max_pending_incoming_connections,
386        max_pending_outgoing_connections,
387        temporary_bans_cache_size,
388        temporary_ban_backoff,
389        libp2p_metrics,
390        metrics,
391        protocol_version,
392        bootstrap_addresses,
393        kademlia_mode,
394        external_addresses,
395    } = config;
396    let local_peer_id = peer_id(&keypair);
397
398    info!(
399        %allow_non_global_addresses_in_dht,
400        peer_id = %local_peer_id,
401        %protocol_version,
402        "DSN instance configured."
403    );
404
405    let connection_limits = ConnectionLimits::default()
406        .with_max_established_per_peer(Some(SWARM_MAX_ESTABLISHED_CONNECTIONS_PER_PEER))
407        .with_max_pending_incoming(Some(max_pending_incoming_connections))
408        .with_max_pending_outgoing(Some(max_pending_outgoing_connections))
409        .with_max_established_incoming(Some(max_established_incoming_connections))
410        .with_max_established_outgoing(Some(max_established_outgoing_connections));
411
412    debug!(?connection_limits, "DSN connection limits set.");
413
414    let autonat_boot_delay = if kademlia_mode.is_static() || !external_addresses.is_empty() {
415        AUTONAT_SERVER_PROBE_DELAY
416    } else {
417        AutonatConfig::default().boot_delay
418    };
419
420    debug!(
421        ?autonat_boot_delay,
422        ?kademlia_mode,
423        ?external_addresses,
424        "Autonat boot delay set."
425    );
426
427    let mut behaviour = Behavior::new(BehaviorConfig {
428        peer_id: local_peer_id,
429        identify,
430        kademlia,
431        gossipsub,
432        request_response_protocols,
433        request_response_max_concurrent_streams: {
434            let max_num_connections = max_established_incoming_connections as usize
435                + max_established_outgoing_connections as usize;
436            max_num_connections * MAX_CONCURRENT_STREAMS_PER_CONNECTION
437        },
438        connection_limits,
439        reserved_peers: ReservedPeersConfig {
440            reserved_peers: reserved_peers.clone(),
441            dialing_interval: DIALING_INTERVAL_IN_SECS,
442        },
443        autonat: AutonatWrapperConfig {
444            inner_config: AutonatConfig {
445                use_connected: true,
446                only_global_ips: !config.allow_non_global_addresses_in_dht,
447                confidence_max: AUTONAT_MAX_CONFIDENCE,
448                boot_delay: autonat_boot_delay,
449                ..Default::default()
450            },
451            local_peer_id,
452            servers: bootstrap_addresses.clone(),
453        },
454    });
455
456    match (kademlia_mode, external_addresses.is_empty()) {
457        (KademliaMode::Static(mode), _) => {
458            behaviour.kademlia.set_mode(Some(mode));
459        }
460        (KademliaMode::Dynamic, false) => {
461            behaviour.kademlia.set_mode(Some(Mode::Server));
462        }
463        _ => {
464            // Autonat will figure it out
465        }
466    };
467
468    let temporary_bans = Arc::new(Mutex::new(TemporaryBans::new(
469        temporary_bans_cache_size,
470        temporary_ban_backoff,
471    )));
472
473    let mut swarm = SwarmBuilder::with_existing_identity(keypair)
474        .with_tokio()
475        .with_other_transport(|keypair| {
476            Ok(build_transport(
477                allow_non_global_addresses_in_dht,
478                keypair,
479                Arc::clone(&temporary_bans),
480                timeout,
481                yamux_config,
482            )?)
483        })
484        .map_err(|error| CreationError::TransportCreationError(error.into()))?
485        .with_behaviour(move |_keypair| Ok(behaviour))
486        .expect("Not fallible; qed")
487        .with_swarm_config(|config| {
488            config
489                .with_max_negotiating_inbound_streams(SWARM_MAX_NEGOTIATING_INBOUND_STREAMS)
490                .with_idle_connection_timeout(IDLE_CONNECTION_TIMEOUT)
491        })
492        .build();
493
494    let is_listening = !listen_on.is_empty();
495
496    // Setup listen_on addresses
497    for mut addr in listen_on {
498        if let Err(error) = swarm.listen_on(addr.clone()) {
499            if !listen_on_fallback_to_random_port {
500                return Err(error.into());
501            }
502
503            let addr_string = addr.to_string();
504            // Listen on random port if specified is already occupied
505            if let Some(Protocol::Tcp(_port)) = addr.pop() {
506                info!("Failed to listen on {addr_string} ({error}), falling back to random port");
507                addr.push(Protocol::Tcp(0));
508                swarm.listen_on(addr)?;
509            }
510        }
511    }
512
513    // Setup external addresses
514    for addr in external_addresses.iter().cloned() {
515        info!("DSN external address added: {addr}");
516        swarm.add_external_address(addr);
517    }
518
519    // Create final structs
520    let (command_sender, command_receiver) = mpsc::channel(1);
521
522    let rate_limiter = RateLimiter::new(
523        max_established_outgoing_connections,
524        max_pending_outgoing_connections,
525    );
526
527    let shared = Arc::new(Shared::new(local_peer_id, command_sender, rate_limiter));
528    let shared_weak = Arc::downgrade(&shared);
529
530    let node = Node::new(shared);
531    let node_runner = NodeRunner::new(NodeRunnerConfig {
532        allow_non_global_addresses_in_dht,
533        is_listening,
534        command_receiver,
535        swarm,
536        shared_weak,
537        next_random_query_interval: initial_random_query_interval,
538        known_peers_registry,
539        reserved_peers: strip_peer_id(reserved_peers).into_iter().collect(),
540        temporary_bans,
541        libp2p_metrics,
542        metrics,
543        protocol_version,
544        bootstrap_addresses,
545    });
546
547    Ok((node, node_runner))
548}