Skip to main content

ab_networking/
constructor.rs

1pub(crate) mod temporary_bans;
2mod transport;
3
4use crate::behavior::persistent_parameters::{KnownPeersRegistry, StubNetworkingParametersManager};
5use crate::behavior::{Behavior, BehaviorConfig};
6use crate::constructor::temporary_bans::TemporaryBans;
7use crate::constructor::transport::build_transport;
8use crate::node::Node;
9use crate::node_runner::{NodeRunner, NodeRunnerConfig};
10use crate::protocols::autonat_wrapper::Config as AutonatWrapperConfig;
11use crate::protocols::request_response::request_response_factory::RequestHandler;
12use crate::protocols::reserved_peers::Config as ReservedPeersConfig;
13use crate::shared::Shared;
14use crate::utils::rate_limiter::RateLimiter;
15use crate::utils::{SubspaceMetrics, strip_peer_id};
16use ab_core_primitives::pieces::Piece;
17use backon::ExponentialBuilder;
18use futures::channel::mpsc;
19use libp2p::autonat::Config as AutonatConfig;
20use libp2p::connection_limits::ConnectionLimits;
21use libp2p::gossipsub::{
22    Config as GossipsubConfig, ConfigBuilder as GossipsubConfigBuilder,
23    Message as GossipsubMessage, MessageId, ValidationMode,
24};
25use libp2p::identify::Config as IdentifyConfig;
26use libp2p::kad::store::RecordStore;
27use libp2p::kad::{
28    BucketInserts, Config as KademliaConfig, Mode, ProviderRecord, Record, RecordKey, StoreInserts,
29    store,
30};
31use libp2p::metrics::Metrics;
32use libp2p::multiaddr::Protocol;
33use libp2p::yamux::Config as YamuxConfig;
34use libp2p::{Multiaddr, PeerId, StreamProtocol, SwarmBuilder, TransportError, identity};
35use parking_lot::Mutex;
36use prometheus_client::registry::Registry;
37use std::borrow::Cow;
38use std::iter::Empty;
39use std::sync::Arc;
40use std::time::Duration;
41use std::{fmt, io, iter};
42use thiserror::Error;
43use tracing::{debug, info};
44
45const DEFAULT_NETWORK_PROTOCOL_VERSION: &str = "dev";
46const KADEMLIA_PROTOCOL: &str = "/subspace/kad/0.1.0";
47const GOSSIPSUB_PROTOCOL_PREFIX: &str = "subspace/gossipsub";
48
49/// Defines max_negotiating_inbound_streams constant for the swarm.
50/// It must be set for large plots.
51const SWARM_MAX_NEGOTIATING_INBOUND_STREAMS: usize = 100000;
52/// How long will connection be allowed to be open without any usage
53const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(3);
54/// The default maximum established incoming connection number for the swarm.
55const SWARM_MAX_ESTABLISHED_INCOMING_CONNECTIONS: u32 = 100;
56/// The default maximum established incoming connection number for the swarm.
57const SWARM_MAX_ESTABLISHED_OUTGOING_CONNECTIONS: u32 = 100;
58/// The default maximum pending incoming connection number for the swarm.
59const SWARM_MAX_PENDING_INCOMING_CONNECTIONS: u32 = 80;
60/// The default maximum pending incoming connection number for the swarm.
61const SWARM_MAX_PENDING_OUTGOING_CONNECTIONS: u32 = 80;
62const KADEMLIA_QUERY_TIMEOUT: Duration = Duration::from_secs(40);
63const SWARM_MAX_ESTABLISHED_CONNECTIONS_PER_PEER: u32 = 3;
64const MAX_CONCURRENT_STREAMS_PER_CONNECTION: usize = 10;
65// TODO: Consider moving this constant to configuration or removing `Toggle` wrapper when we find a
66//  use-case for gossipsub protocol.
67const ENABLE_GOSSIP_PROTOCOL: bool = false;
68
69const TEMPORARY_BANS_CACHE_SIZE: u32 = 10_000;
70const TEMPORARY_BANS_DEFAULT_BACKOFF_INITIAL_INTERVAL: Duration = Duration::from_secs(5);
71const TEMPORARY_BANS_DEFAULT_BACKOFF_MULTIPLIER: f64 = 1.5;
72const TEMPORARY_BANS_DEFAULT_MAX_INTERVAL: Duration = Duration::from_secs(30 * 60);
73
74/// We pause between reserved peers dialing otherwise we could do multiple dials to offline peers
75/// wasting resources and producing a ton of log records.
76const DIALING_INTERVAL_IN_SECS: Duration = Duration::from_secs(1);
77
78/// Specific YAMUX settings for Subspace applications: additional buffer space for pieces and
79/// substream's limit.
80///
81/// Defines a replication factor for Kademlia on get_record operation.
82/// "Good citizen" supports the network health.
83const YAMUX_MAX_STREAMS: usize = 256;
84
85/// Max confidence for autonat protocol. Could affect Kademlia mode change.
86pub(crate) const AUTONAT_MAX_CONFIDENCE: usize = 3;
87/// We set a very long pause before autonat initialization (Duration::Max panics).
88const AUTONAT_SERVER_PROBE_DELAY: Duration = Duration::from_secs(3600 * 24 * 365);
89
90/// Defines Kademlia mode
91#[derive(Clone, Debug)]
92pub enum KademliaMode {
93    /// The Kademlia mode is static for the duration of the application.
94    Static(Mode),
95    /// Kademlia mode will be changed using Autonat protocol when max confidence reached.
96    Dynamic,
97}
98
99impl KademliaMode {
100    /// Returns true if the mode is Dynamic.
101    pub fn is_dynamic(&self) -> bool {
102        matches!(self, Self::Dynamic)
103    }
104
105    /// Returns true if the mode is Static.
106    pub fn is_static(&self) -> bool {
107        matches!(self, Self::Static(..))
108    }
109}
110
111pub(crate) struct DummyRecordStore;
112
113impl RecordStore for DummyRecordStore {
114    type RecordsIter<'a>
115        = Empty<Cow<'a, Record>>
116    where
117        Self: 'a;
118    type ProvidedIter<'a>
119        = Empty<Cow<'a, ProviderRecord>>
120    where
121        Self: 'a;
122
123    #[inline]
124    fn get(&self, _key: &RecordKey) -> Option<Cow<'_, Record>> {
125        // Not supported
126        None
127    }
128
129    #[inline]
130    fn put(&mut self, _record: Record) -> store::Result<()> {
131        // Not supported
132        Ok(())
133    }
134
135    #[inline]
136    fn remove(&mut self, _key: &RecordKey) {
137        // Not supported
138    }
139
140    #[inline]
141    fn records(&self) -> Self::RecordsIter<'_> {
142        // Not supported
143        iter::empty()
144    }
145
146    #[inline]
147    fn add_provider(&mut self, _record: ProviderRecord) -> store::Result<()> {
148        // Not supported
149        Ok(())
150    }
151
152    #[inline]
153    fn providers(&self, _key: &RecordKey) -> Vec<ProviderRecord> {
154        // Not supported
155        Vec::new()
156    }
157
158    #[inline]
159    fn provided(&self) -> Self::ProvidedIter<'_> {
160        // Not supported
161        iter::empty()
162    }
163
164    #[inline]
165    fn remove_provider(&mut self, _key: &RecordKey, _provider: &PeerId) {
166        // Not supported
167    }
168}
169
170/// [`Node`] configuration.
171pub struct Config {
172    /// Identity keypair of a node used for authenticated connections.
173    pub keypair: identity::Keypair,
174    /// List of [`Multiaddr`] on which to listen for incoming connections.
175    pub listen_on: Vec<Multiaddr>,
176    /// Fallback to random port if specified (or default) port is already occupied.
177    pub listen_on_fallback_to_random_port: bool,
178    /// Adds a timeout to the setup and protocol upgrade process for all inbound and outbound
179    /// connections established through the transport.
180    pub timeout: Duration,
181    /// The configuration for the Identify behaviour.
182    pub identify: IdentifyConfig,
183    /// The configuration for the Kademlia behaviour.
184    pub kademlia: KademliaConfig,
185    /// The configuration for the Gossip behaviour.
186    pub gossipsub: Option<GossipsubConfig>,
187    /// Yamux multiplexing configuration.
188    pub yamux_config: YamuxConfig,
189    /// Should non-global addresses be added to the DHT?
190    pub allow_non_global_addresses_in_dht: bool,
191    /// How frequently should random queries be done using Kademlia DHT to populate routing table.
192    pub initial_random_query_interval: Duration,
193    /// A reference to the `NetworkingParametersRegistry` implementation.
194    pub known_peers_registry: Box<dyn KnownPeersRegistry>,
195    /// The configuration for the `RequestResponsesBehaviour` protocol.
196    pub request_response_protocols: Vec<Box<dyn RequestHandler>>,
197    /// Defines set of peers with a permanent connection (and reconnection if necessary).
198    pub reserved_peers: Vec<Multiaddr>,
199    /// Established incoming swarm connection limit.
200    pub max_established_incoming_connections: u32,
201    /// Established outgoing swarm connection limit.
202    pub max_established_outgoing_connections: u32,
203    /// Pending incoming swarm connection limit.
204    pub max_pending_incoming_connections: u32,
205    /// Pending outgoing swarm connection limit.
206    pub max_pending_outgoing_connections: u32,
207    /// How many temporarily banned unreachable peers to keep in memory.
208    pub temporary_bans_cache_size: u32,
209    /// Backoff policy for temporary banning of unreachable peers.
210    pub temporary_ban_backoff: ExponentialBuilder,
211    /// Optional libp2p prometheus metrics. None will disable metrics gathering.
212    pub libp2p_metrics: Option<Metrics>,
213    /// Internal prometheus metrics. None will disable metrics gathering.
214    pub metrics: Option<SubspaceMetrics>,
215    /// Defines protocol version for the network peers. Affects network partition.
216    pub protocol_version: String,
217    /// Addresses to bootstrap Kademlia network
218    pub bootstrap_addresses: Vec<Multiaddr>,
219    /// Kademlia mode. The default value is set to Static(Client). The peer won't add its address
220    /// to other peers` Kademlia routing table. Changing this behaviour implies that a peer can
221    /// provide pieces to others.
222    pub kademlia_mode: KademliaMode,
223    /// Known external addresses to the local peer. The addresses will be added on the swarm start
224    /// and enable peer to notify others about its reachable address.
225    pub external_addresses: Vec<Multiaddr>,
226}
227
228impl fmt::Debug for Config {
229    #[inline]
230    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
231        f.debug_struct("Config").finish()
232    }
233}
234
235/// This default can only be used for `dev` networks.
236/// Other networks should use `Config::new()` to apply the correct prefix to the protocol version.
237impl Default for Config {
238    #[inline]
239    fn default() -> Self {
240        let ed25519_keypair = identity::ed25519::Keypair::generate();
241        let keypair = identity::Keypair::from(ed25519_keypair);
242
243        Self::new(DEFAULT_NETWORK_PROTOCOL_VERSION.to_string(), keypair, None)
244    }
245}
246
247impl Config {
248    /// Creates a new [`Config`].
249    /// Applies a subspace-specific version prefix to the `protocol_version`.
250    pub fn new(
251        protocol_version: String,
252        keypair: identity::Keypair,
253        prometheus_registry: Option<&mut Registry>,
254    ) -> Self {
255        let (libp2p_metrics, metrics) = prometheus_registry
256            .map(|registry| {
257                (
258                    Some(Metrics::new(registry)),
259                    Some(SubspaceMetrics::new(registry)),
260                )
261            })
262            .unwrap_or((None, None));
263
264        let mut kademlia = KademliaConfig::new(
265            StreamProtocol::try_from_owned(KADEMLIA_PROTOCOL.to_owned())
266                .expect("Manual protocol name creation."),
267        );
268        kademlia
269            .set_query_timeout(KADEMLIA_QUERY_TIMEOUT)
270            .disjoint_query_paths(true)
271            .set_max_packet_size(2 * Piece::SIZE)
272            .set_kbucket_inserts(BucketInserts::Manual)
273            .set_record_filtering(StoreInserts::FilterBoth)
274            // We don't use records and providers publication.
275            .set_provider_record_ttl(None)
276            .set_provider_publication_interval(None)
277            .set_record_ttl(None)
278            .set_replication_interval(None);
279
280        let mut yamux_config = YamuxConfig::default();
281        yamux_config.set_max_num_streams(YAMUX_MAX_STREAMS);
282
283        let gossipsub = ENABLE_GOSSIP_PROTOCOL.then(|| {
284            GossipsubConfigBuilder::default()
285                .protocol_id_prefix(GOSSIPSUB_PROTOCOL_PREFIX)
286                // TODO: Do we want message signing?
287                .validation_mode(ValidationMode::None)
288                // To content-address message, we can take the hash of message and use it as an ID.
289                .message_id_fn(|message: &GossipsubMessage| {
290                    MessageId::from(blake3::hash(&message.data).as_bytes())
291                })
292                .max_transmit_size(2 * 1024 * 1024) // 2MB
293                .build()
294                .expect("Default config for gossipsub is always correct; qed")
295        });
296
297        let protocol_version = format!("/subspace/2/{protocol_version}");
298        let identify = IdentifyConfig::new(protocol_version.clone(), keypair.public());
299
300        let temporary_ban_backoff = ExponentialBuilder::default()
301            .with_factor(TEMPORARY_BANS_DEFAULT_BACKOFF_MULTIPLIER as f32)
302            .with_min_delay(TEMPORARY_BANS_DEFAULT_BACKOFF_INITIAL_INTERVAL)
303            .with_max_delay(TEMPORARY_BANS_DEFAULT_MAX_INTERVAL)
304            .without_max_times();
305
306        Self {
307            keypair,
308            listen_on: vec![],
309            listen_on_fallback_to_random_port: true,
310            timeout: Duration::from_secs(10),
311            identify,
312            kademlia,
313            gossipsub,
314            allow_non_global_addresses_in_dht: false,
315            initial_random_query_interval: Duration::from_secs(1),
316            known_peers_registry: StubNetworkingParametersManager.boxed(),
317            request_response_protocols: Vec::new(),
318            yamux_config,
319            reserved_peers: Vec::new(),
320            max_established_incoming_connections: SWARM_MAX_ESTABLISHED_INCOMING_CONNECTIONS,
321            max_established_outgoing_connections: SWARM_MAX_ESTABLISHED_OUTGOING_CONNECTIONS,
322            max_pending_incoming_connections: SWARM_MAX_PENDING_INCOMING_CONNECTIONS,
323            max_pending_outgoing_connections: SWARM_MAX_PENDING_OUTGOING_CONNECTIONS,
324            temporary_bans_cache_size: TEMPORARY_BANS_CACHE_SIZE,
325            temporary_ban_backoff,
326            libp2p_metrics,
327            metrics,
328            protocol_version,
329            bootstrap_addresses: Vec::new(),
330            kademlia_mode: KademliaMode::Static(Mode::Client),
331            external_addresses: Vec::new(),
332        }
333    }
334}
335
336/// Errors that might happen during network creation.
337#[derive(Debug, Error)]
338pub enum CreationError {
339    /// Circuit relay client error.
340    #[error("Expected relay server node.")]
341    RelayServerExpected,
342    /// I/O error.
343    #[error("I/O error: {0}")]
344    Io(#[from] io::Error),
345    /// Transport creation error.
346    #[error("Transport creation error: {0}")]
347    // TODO: Restore `#[from] TransportError` once https://github.com/libp2p/rust-libp2p/issues/4824
348    //  is resolved
349    TransportCreationError(Box<dyn std::error::Error + Send + Sync>),
350    /// Transport error when attempting to listen on multiaddr.
351    #[error("Transport error when attempting to listen on multiaddr: {0}")]
352    TransportError(#[from] TransportError<io::Error>),
353}
354
355/// Converts public key from keypair to PeerId.
356/// It serves as the shared PeerId generating algorithm.
357pub fn peer_id(keypair: &identity::Keypair) -> PeerId {
358    keypair.public().to_peer_id()
359}
360
361/// Create a new network node and node runner instances.
362pub fn construct(config: Config) -> Result<(Node, NodeRunner), CreationError> {
363    let Config {
364        keypair,
365        listen_on,
366        listen_on_fallback_to_random_port,
367        timeout,
368        identify,
369        kademlia,
370        gossipsub,
371        yamux_config,
372        allow_non_global_addresses_in_dht,
373        initial_random_query_interval,
374        known_peers_registry,
375        request_response_protocols,
376        reserved_peers,
377        max_established_incoming_connections,
378        max_established_outgoing_connections,
379        max_pending_incoming_connections,
380        max_pending_outgoing_connections,
381        temporary_bans_cache_size,
382        temporary_ban_backoff,
383        libp2p_metrics,
384        metrics,
385        protocol_version,
386        bootstrap_addresses,
387        kademlia_mode,
388        external_addresses,
389    } = config;
390    let local_peer_id = peer_id(&keypair);
391
392    info!(
393        %allow_non_global_addresses_in_dht,
394        peer_id = %local_peer_id,
395        %protocol_version,
396        "DSN instance configured."
397    );
398
399    let connection_limits = ConnectionLimits::default()
400        .with_max_established_per_peer(Some(SWARM_MAX_ESTABLISHED_CONNECTIONS_PER_PEER))
401        .with_max_pending_incoming(Some(max_pending_incoming_connections))
402        .with_max_pending_outgoing(Some(max_pending_outgoing_connections))
403        .with_max_established_incoming(Some(max_established_incoming_connections))
404        .with_max_established_outgoing(Some(max_established_outgoing_connections));
405
406    debug!(?connection_limits, "DSN connection limits set.");
407
408    let autonat_boot_delay = if kademlia_mode.is_static() || !external_addresses.is_empty() {
409        AUTONAT_SERVER_PROBE_DELAY
410    } else {
411        AutonatConfig::default().boot_delay
412    };
413
414    debug!(
415        ?autonat_boot_delay,
416        ?kademlia_mode,
417        ?external_addresses,
418        "Autonat boot delay set."
419    );
420
421    let mut behaviour = Behavior::new(BehaviorConfig {
422        peer_id: local_peer_id,
423        identify,
424        kademlia,
425        gossipsub,
426        request_response_protocols,
427        request_response_max_concurrent_streams: {
428            let max_num_connections = max_established_incoming_connections as usize
429                + max_established_outgoing_connections as usize;
430            max_num_connections * MAX_CONCURRENT_STREAMS_PER_CONNECTION
431        },
432        connection_limits,
433        reserved_peers: ReservedPeersConfig {
434            reserved_peers: reserved_peers.clone(),
435            dialing_interval: DIALING_INTERVAL_IN_SECS,
436        },
437        autonat: AutonatWrapperConfig {
438            inner_config: AutonatConfig {
439                use_connected: true,
440                only_global_ips: !config.allow_non_global_addresses_in_dht,
441                confidence_max: AUTONAT_MAX_CONFIDENCE,
442                boot_delay: autonat_boot_delay,
443                ..Default::default()
444            },
445            local_peer_id,
446            servers: bootstrap_addresses.clone(),
447        },
448    });
449
450    match (kademlia_mode, external_addresses.is_empty()) {
451        (KademliaMode::Static(mode), _) => {
452            behaviour.kademlia.set_mode(Some(mode));
453        }
454        (KademliaMode::Dynamic, false) => {
455            behaviour.kademlia.set_mode(Some(Mode::Server));
456        }
457        _ => {
458            // Autonat will figure it out
459        }
460    };
461
462    let temporary_bans = Arc::new(Mutex::new(TemporaryBans::new(
463        temporary_bans_cache_size,
464        temporary_ban_backoff,
465    )));
466
467    let mut swarm = SwarmBuilder::with_existing_identity(keypair)
468        .with_tokio()
469        .with_other_transport(|keypair| {
470            Ok(build_transport(
471                allow_non_global_addresses_in_dht,
472                keypair,
473                Arc::clone(&temporary_bans),
474                timeout,
475                yamux_config,
476            )?)
477        })
478        .map_err(|error| CreationError::TransportCreationError(error.into()))?
479        .with_behaviour(move |_keypair| Ok(behaviour))
480        .expect("Not fallible; qed")
481        .with_swarm_config(|config| {
482            config
483                .with_max_negotiating_inbound_streams(SWARM_MAX_NEGOTIATING_INBOUND_STREAMS)
484                .with_idle_connection_timeout(IDLE_CONNECTION_TIMEOUT)
485        })
486        .build();
487
488    let is_listening = !listen_on.is_empty();
489
490    // Setup listen_on addresses
491    for mut addr in listen_on {
492        if let Err(error) = swarm.listen_on(addr.clone()) {
493            if !listen_on_fallback_to_random_port {
494                return Err(error.into());
495            }
496
497            let addr_string = addr.to_string();
498            // Listen on random port if specified is already occupied
499            if let Some(Protocol::Tcp(_port)) = addr.pop() {
500                info!("Failed to listen on {addr_string} ({error}), falling back to random port");
501                addr.push(Protocol::Tcp(0));
502                swarm.listen_on(addr)?;
503            }
504        }
505    }
506
507    // Setup external addresses
508    for addr in external_addresses.iter().cloned() {
509        info!("DSN external address added: {addr}");
510        swarm.add_external_address(addr);
511    }
512
513    // Create final structs
514    let (command_sender, command_receiver) = mpsc::channel(1);
515
516    let rate_limiter = RateLimiter::new(
517        max_established_outgoing_connections,
518        max_pending_outgoing_connections,
519    );
520
521    let shared = Arc::new(Shared::new(local_peer_id, command_sender, rate_limiter));
522    let shared_weak = Arc::downgrade(&shared);
523
524    let node = Node::new(shared);
525    let node_runner = NodeRunner::new(NodeRunnerConfig {
526        allow_non_global_addresses_in_dht,
527        is_listening,
528        command_receiver,
529        swarm,
530        shared_weak,
531        next_random_query_interval: initial_random_query_interval,
532        known_peers_registry,
533        reserved_peers: strip_peer_id(reserved_peers).into_iter().collect(),
534        temporary_bans,
535        libp2p_metrics,
536        metrics,
537        protocol_version,
538        bootstrap_addresses,
539    });
540
541    Ok((node, node_runner))
542}