Skip to main content

ab_networking/
constructor.rs

1pub(crate) mod temporary_bans;
2mod transport;
3
4use crate::behavior::persistent_parameters::{KnownPeersRegistry, StubNetworkingParametersManager};
5use crate::behavior::{Behavior, BehaviorConfig};
6use crate::constructor::temporary_bans::TemporaryBans;
7use crate::constructor::transport::build_transport;
8use crate::node::Node;
9use crate::node_runner::{NodeRunner, NodeRunnerConfig};
10use crate::protocols::autonat_wrapper::Config as AutonatWrapperConfig;
11use crate::protocols::request_response::request_response_factory::RequestHandler;
12use crate::protocols::reserved_peers::Config as ReservedPeersConfig;
13use crate::shared::Shared;
14use crate::utils::rate_limiter::RateLimiter;
15use crate::utils::{SubspaceMetrics, strip_peer_id};
16use ab_core_primitives::pieces::Piece;
17use backon::ExponentialBuilder;
18use futures::channel::mpsc;
19use libp2p::autonat::Config as AutonatConfig;
20use libp2p::connection_limits::ConnectionLimits;
21use libp2p::gossipsub::{
22    Config as GossipsubConfig, ConfigBuilder as GossipsubConfigBuilder,
23    Message as GossipsubMessage, MessageId, ValidationMode,
24};
25use libp2p::identify::Config as IdentifyConfig;
26use libp2p::kad::store::RecordStore;
27use libp2p::kad::{
28    BucketInserts, Config as KademliaConfig, Mode, ProviderRecord, Record, RecordKey, StoreInserts,
29    store,
30};
31use libp2p::metrics::Metrics;
32use libp2p::multiaddr::Protocol;
33use libp2p::yamux::Config as YamuxConfig;
34use libp2p::{Multiaddr, PeerId, StreamProtocol, SwarmBuilder, TransportError, identity};
35use parking_lot::Mutex;
36use prometheus_client::registry::Registry;
37use std::borrow::Cow;
38use std::iter::Empty;
39use std::sync::Arc;
40use std::time::Duration;
41use std::{fmt, io, iter};
42use thiserror::Error;
43use tracing::{debug, info};
44
45const DEFAULT_NETWORK_PROTOCOL_VERSION: &str = "dev";
46const KADEMLIA_PROTOCOL: &str = "/subspace/kad/0.1.0";
47const GOSSIPSUB_PROTOCOL_PREFIX: &str = "subspace/gossipsub";
48
49/// Defines max_negotiating_inbound_streams constant for the swarm.
50/// It must be set for large plots.
51const SWARM_MAX_NEGOTIATING_INBOUND_STREAMS: usize = 100_000;
52/// How long will connection be allowed to be open without any usage
53const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(3);
54/// The default maximum established incoming connection number for the swarm.
55const SWARM_MAX_ESTABLISHED_INCOMING_CONNECTIONS: u32 = 100;
56/// The default maximum established incoming connection number for the swarm.
57const SWARM_MAX_ESTABLISHED_OUTGOING_CONNECTIONS: u32 = 100;
58/// The default maximum pending incoming connection number for the swarm.
59const SWARM_MAX_PENDING_INCOMING_CONNECTIONS: u32 = 80;
60/// The default maximum pending incoming connection number for the swarm.
61const SWARM_MAX_PENDING_OUTGOING_CONNECTIONS: u32 = 80;
62const KADEMLIA_QUERY_TIMEOUT: Duration = Duration::from_secs(40);
63const SWARM_MAX_ESTABLISHED_CONNECTIONS_PER_PEER: u32 = 3;
64const MAX_CONCURRENT_STREAMS_PER_CONNECTION: usize = 10;
65// TODO: Consider moving this constant to configuration or removing `Toggle` wrapper when we find a
66//  use-case for gossipsub protocol.
67const ENABLE_GOSSIP_PROTOCOL: bool = false;
68
69const TEMPORARY_BANS_CACHE_SIZE: u32 = 10_000;
70const TEMPORARY_BANS_DEFAULT_BACKOFF_INITIAL_INTERVAL: Duration = Duration::from_secs(5);
71const TEMPORARY_BANS_DEFAULT_BACKOFF_MULTIPLIER: f64 = 1.5;
72const TEMPORARY_BANS_DEFAULT_MAX_INTERVAL: Duration = Duration::from_mins(30);
73
74/// We pause between reserved peers dialing otherwise we could do multiple dials to offline peers
75/// wasting resources and producing a ton of log records.
76const DIALING_INTERVAL_IN_SECS: Duration = Duration::from_secs(1);
77
78/// Specific YAMUX settings for Subspace applications: additional buffer space for pieces and
79/// substream's limit.
80///
81/// Defines a replication factor for Kademlia on get_record operation.
82/// "Good citizen" supports the network health.
83const YAMUX_MAX_STREAMS: usize = 256;
84
85/// Max confidence for autonat protocol. Could affect Kademlia mode change.
86pub(crate) const AUTONAT_MAX_CONFIDENCE: usize = 3;
87/// We set a very long pause before autonat initialization (Duration::Max panics).
88const AUTONAT_SERVER_PROBE_DELAY: Duration = Duration::from_days(365);
89
90/// Defines Kademlia mode
91#[derive(Clone, Debug)]
92pub enum KademliaMode {
93    /// The Kademlia mode is static for the duration of the application.
94    Static(Mode),
95    /// Kademlia mode will be changed using Autonat protocol when max confidence reached.
96    Dynamic,
97}
98
99impl KademliaMode {
100    /// Returns true if the mode is Dynamic.
101    pub fn is_dynamic(&self) -> bool {
102        matches!(self, Self::Dynamic)
103    }
104
105    /// Returns true if the mode is Static.
106    pub fn is_static(&self) -> bool {
107        matches!(self, Self::Static(..))
108    }
109}
110
111pub(crate) struct DummyRecordStore;
112
113impl RecordStore for DummyRecordStore {
114    type RecordsIter<'a>
115        = Empty<Cow<'a, Record>>
116    where
117        Self: 'a;
118    type ProvidedIter<'a>
119        = Empty<Cow<'a, ProviderRecord>>
120    where
121        Self: 'a;
122
123    #[inline]
124    fn get(&self, _key: &RecordKey) -> Option<Cow<'_, Record>> {
125        // Not supported
126        None
127    }
128
129    #[inline]
130    fn put(&mut self, _record: Record) -> store::Result<()> {
131        // Not supported
132        Ok(())
133    }
134
135    #[inline]
136    fn remove(&mut self, _key: &RecordKey) {
137        // Not supported
138    }
139
140    #[inline]
141    fn records(&self) -> Self::RecordsIter<'_> {
142        // Not supported
143        iter::empty()
144    }
145
146    #[inline]
147    fn add_provider(&mut self, _record: ProviderRecord) -> store::Result<()> {
148        // Not supported
149        Ok(())
150    }
151
152    #[inline]
153    fn providers(&self, _key: &RecordKey) -> Vec<ProviderRecord> {
154        // Not supported
155        Vec::new()
156    }
157
158    #[inline]
159    fn provided(&self) -> Self::ProvidedIter<'_> {
160        // Not supported
161        iter::empty()
162    }
163
164    #[inline]
165    fn remove_provider(&mut self, _key: &RecordKey, _provider: &PeerId) {
166        // Not supported
167    }
168}
169
170/// [`Node`] configuration.
171pub struct Config {
172    /// Identity keypair of a node used for authenticated connections.
173    pub keypair: identity::Keypair,
174    /// List of [`Multiaddr`] on which to listen for incoming connections.
175    pub listen_on: Vec<Multiaddr>,
176    /// Fallback to random port if specified (or default) port is already occupied.
177    pub listen_on_fallback_to_random_port: bool,
178    /// Adds a timeout to the setup and protocol upgrade process for all inbound and outbound
179    /// connections established through the transport.
180    pub timeout: Duration,
181    /// The configuration for the Identify behaviour.
182    pub identify: IdentifyConfig,
183    /// The configuration for the Kademlia behaviour.
184    pub kademlia: KademliaConfig,
185    /// The configuration for the Gossip behaviour.
186    pub gossipsub: Option<GossipsubConfig>,
187    /// Yamux multiplexing configuration.
188    pub yamux_config: YamuxConfig,
189    /// Should non-global addresses be added to the DHT?
190    pub allow_non_global_addresses_in_dht: bool,
191    /// How frequently should random queries be done using Kademlia DHT to populate routing table.
192    pub initial_random_query_interval: Duration,
193    /// A reference to the `NetworkingParametersRegistry` implementation.
194    pub known_peers_registry: Box<dyn KnownPeersRegistry>,
195    /// The configuration for the `RequestResponsesBehaviour` protocol.
196    pub request_response_protocols: Vec<Box<dyn RequestHandler>>,
197    /// Defines set of peers with a permanent connection (and reconnection if necessary).
198    pub reserved_peers: Vec<Multiaddr>,
199    /// Established incoming swarm connection limit.
200    pub max_established_incoming_connections: u32,
201    /// Established outgoing swarm connection limit.
202    pub max_established_outgoing_connections: u32,
203    /// Pending incoming swarm connection limit.
204    pub max_pending_incoming_connections: u32,
205    /// Pending outgoing swarm connection limit.
206    pub max_pending_outgoing_connections: u32,
207    /// How many temporarily banned unreachable peers to keep in memory.
208    pub temporary_bans_cache_size: u32,
209    /// Backoff policy for temporary banning of unreachable peers.
210    pub temporary_ban_backoff: ExponentialBuilder,
211    /// Optional libp2p prometheus metrics. None will disable metrics gathering.
212    pub libp2p_metrics: Option<Metrics>,
213    /// Internal prometheus metrics. None will disable metrics gathering.
214    pub metrics: Option<SubspaceMetrics>,
215    /// Defines protocol version for the network peers. Affects network partition.
216    pub protocol_version: String,
217    /// Addresses to bootstrap Kademlia network
218    pub bootstrap_addresses: Vec<Multiaddr>,
219    /// Kademlia mode. The default value is set to Static(Client). The peer won't add its address
220    /// to other peers` Kademlia routing table. Changing this behaviour implies that a peer can
221    /// provide pieces to others.
222    pub kademlia_mode: KademliaMode,
223    /// Known external addresses to the local peer. The addresses will be added on the swarm start
224    /// and enable peer to notify others about its reachable address.
225    pub external_addresses: Vec<Multiaddr>,
226}
227
228impl fmt::Debug for Config {
229    #[inline]
230    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
231        f.debug_struct("Config").finish()
232    }
233}
234
235/// This default can only be used for `dev` networks.
236/// Other networks should use `Config::new()` to apply the correct prefix to the protocol version.
237impl Default for Config {
238    #[inline]
239    fn default() -> Self {
240        let ed25519_keypair = identity::ed25519::Keypair::generate();
241        let keypair = identity::Keypair::from(ed25519_keypair);
242
243        Self::new(DEFAULT_NETWORK_PROTOCOL_VERSION.to_string(), keypair, None)
244    }
245}
246
247impl Config {
248    /// Creates a new [`Config`].
249    /// Applies a subspace-specific version prefix to the `protocol_version`.
250    pub fn new(
251        protocol_version: String,
252        keypair: identity::Keypair,
253        prometheus_registry: Option<&mut Registry>,
254    ) -> Self {
255        let (libp2p_metrics, metrics) = prometheus_registry.map_or((None, None), |registry| {
256            (
257                Some(Metrics::new(registry)),
258                Some(SubspaceMetrics::new(registry)),
259            )
260        });
261
262        let mut kademlia = KademliaConfig::new(
263            StreamProtocol::try_from_owned(KADEMLIA_PROTOCOL.to_owned())
264                .expect("Manual protocol name creation."),
265        );
266        kademlia
267            .set_query_timeout(KADEMLIA_QUERY_TIMEOUT)
268            .disjoint_query_paths(true)
269            .set_max_packet_size(2 * Piece::SIZE)
270            .set_kbucket_inserts(BucketInserts::Manual)
271            .set_record_filtering(StoreInserts::FilterBoth)
272            // We don't use records and providers publication.
273            .set_provider_record_ttl(None)
274            .set_provider_publication_interval(None)
275            .set_record_ttl(None)
276            .set_replication_interval(None);
277
278        let mut yamux_config = YamuxConfig::default();
279        yamux_config.set_max_num_streams(YAMUX_MAX_STREAMS);
280
281        let gossipsub = ENABLE_GOSSIP_PROTOCOL.then(|| {
282            GossipsubConfigBuilder::default()
283                .protocol_id_prefix(GOSSIPSUB_PROTOCOL_PREFIX)
284                // TODO: Do we want message signing?
285                .validation_mode(ValidationMode::None)
286                // To content-address message, we can take the hash of message and use it as an ID.
287                .message_id_fn(|message: &GossipsubMessage| {
288                    MessageId::from(blake3::hash(&message.data).as_bytes())
289                })
290                .max_transmit_size(2 * 1024 * 1024) // 2MB
291                .build()
292                .expect("Default config for gossipsub is always correct; qed")
293        });
294
295        let protocol_version = format!("/subspace/2/{protocol_version}");
296        let identify = IdentifyConfig::new(protocol_version.clone(), keypair.public());
297
298        let temporary_ban_backoff = ExponentialBuilder::default()
299            .with_factor(TEMPORARY_BANS_DEFAULT_BACKOFF_MULTIPLIER as f32)
300            .with_min_delay(TEMPORARY_BANS_DEFAULT_BACKOFF_INITIAL_INTERVAL)
301            .with_max_delay(TEMPORARY_BANS_DEFAULT_MAX_INTERVAL)
302            .without_max_times();
303
304        Self {
305            keypair,
306            listen_on: vec![],
307            listen_on_fallback_to_random_port: true,
308            timeout: Duration::from_secs(10),
309            identify,
310            kademlia,
311            gossipsub,
312            allow_non_global_addresses_in_dht: false,
313            initial_random_query_interval: Duration::from_secs(1),
314            known_peers_registry: StubNetworkingParametersManager.boxed(),
315            request_response_protocols: Vec::new(),
316            yamux_config,
317            reserved_peers: Vec::new(),
318            max_established_incoming_connections: SWARM_MAX_ESTABLISHED_INCOMING_CONNECTIONS,
319            max_established_outgoing_connections: SWARM_MAX_ESTABLISHED_OUTGOING_CONNECTIONS,
320            max_pending_incoming_connections: SWARM_MAX_PENDING_INCOMING_CONNECTIONS,
321            max_pending_outgoing_connections: SWARM_MAX_PENDING_OUTGOING_CONNECTIONS,
322            temporary_bans_cache_size: TEMPORARY_BANS_CACHE_SIZE,
323            temporary_ban_backoff,
324            libp2p_metrics,
325            metrics,
326            protocol_version,
327            bootstrap_addresses: Vec::new(),
328            kademlia_mode: KademliaMode::Static(Mode::Client),
329            external_addresses: Vec::new(),
330        }
331    }
332}
333
334/// Errors that might happen during network creation.
335#[derive(Debug, Error)]
336pub enum CreationError {
337    /// Circuit relay client error.
338    #[error("Expected relay server node.")]
339    RelayServerExpected,
340    /// I/O error.
341    #[error("I/O error: {0}")]
342    Io(#[from] io::Error),
343    /// Transport creation error.
344    #[error("Transport creation error: {0}")]
345    // TODO: Restore `#[from] TransportError` once https://github.com/libp2p/rust-libp2p/issues/4824
346    //  is resolved
347    TransportCreationError(Box<dyn std::error::Error + Send + Sync>),
348    /// Transport error when attempting to listen on multiaddr.
349    #[error("Transport error when attempting to listen on multiaddr: {0}")]
350    TransportError(#[from] TransportError<io::Error>),
351}
352
353/// Converts public key from keypair to PeerId.
354/// It serves as the shared PeerId generating algorithm.
355pub fn peer_id(keypair: &identity::Keypair) -> PeerId {
356    keypair.public().to_peer_id()
357}
358
359/// Create a new network node and node runner instances.
360pub fn construct(config: Config) -> Result<(Node, NodeRunner), CreationError> {
361    let Config {
362        keypair,
363        listen_on,
364        listen_on_fallback_to_random_port,
365        timeout,
366        identify,
367        kademlia,
368        gossipsub,
369        yamux_config,
370        allow_non_global_addresses_in_dht,
371        initial_random_query_interval,
372        known_peers_registry,
373        request_response_protocols,
374        reserved_peers,
375        max_established_incoming_connections,
376        max_established_outgoing_connections,
377        max_pending_incoming_connections,
378        max_pending_outgoing_connections,
379        temporary_bans_cache_size,
380        temporary_ban_backoff,
381        libp2p_metrics,
382        metrics,
383        protocol_version,
384        bootstrap_addresses,
385        kademlia_mode,
386        external_addresses,
387    } = config;
388    let local_peer_id = peer_id(&keypair);
389
390    info!(
391        %allow_non_global_addresses_in_dht,
392        peer_id = %local_peer_id,
393        %protocol_version,
394        "DSN instance configured."
395    );
396
397    let connection_limits = ConnectionLimits::default()
398        .with_max_established_per_peer(Some(SWARM_MAX_ESTABLISHED_CONNECTIONS_PER_PEER))
399        .with_max_pending_incoming(Some(max_pending_incoming_connections))
400        .with_max_pending_outgoing(Some(max_pending_outgoing_connections))
401        .with_max_established_incoming(Some(max_established_incoming_connections))
402        .with_max_established_outgoing(Some(max_established_outgoing_connections));
403
404    debug!(?connection_limits, "DSN connection limits set.");
405
406    let autonat_boot_delay = if kademlia_mode.is_static() || !external_addresses.is_empty() {
407        AUTONAT_SERVER_PROBE_DELAY
408    } else {
409        AutonatConfig::default().boot_delay
410    };
411
412    debug!(
413        ?autonat_boot_delay,
414        ?kademlia_mode,
415        ?external_addresses,
416        "Autonat boot delay set."
417    );
418
419    let mut behaviour = Behavior::new(BehaviorConfig {
420        peer_id: local_peer_id,
421        identify,
422        kademlia,
423        gossipsub,
424        request_response_protocols,
425        request_response_max_concurrent_streams: {
426            let max_num_connections = max_established_incoming_connections as usize
427                + max_established_outgoing_connections as usize;
428            max_num_connections * MAX_CONCURRENT_STREAMS_PER_CONNECTION
429        },
430        connection_limits,
431        reserved_peers: ReservedPeersConfig {
432            reserved_peers: reserved_peers.clone(),
433            dialing_interval: DIALING_INTERVAL_IN_SECS,
434        },
435        autonat: AutonatWrapperConfig {
436            inner_config: AutonatConfig {
437                use_connected: true,
438                only_global_ips: !config.allow_non_global_addresses_in_dht,
439                confidence_max: AUTONAT_MAX_CONFIDENCE,
440                boot_delay: autonat_boot_delay,
441                ..Default::default()
442            },
443            local_peer_id,
444            servers: bootstrap_addresses.clone(),
445        },
446    });
447
448    match (kademlia_mode, external_addresses.is_empty()) {
449        (KademliaMode::Static(mode), _) => {
450            behaviour.kademlia.set_mode(Some(mode));
451        }
452        (KademliaMode::Dynamic, false) => {
453            behaviour.kademlia.set_mode(Some(Mode::Server));
454        }
455        _ => {
456            // Autonat will figure it out
457        }
458    }
459
460    let temporary_bans = Arc::new(Mutex::new(TemporaryBans::new(
461        temporary_bans_cache_size,
462        temporary_ban_backoff,
463    )));
464
465    let mut swarm = SwarmBuilder::with_existing_identity(keypair)
466        .with_tokio()
467        .with_other_transport(|keypair| {
468            Ok(build_transport(
469                allow_non_global_addresses_in_dht,
470                keypair,
471                Arc::clone(&temporary_bans),
472                timeout,
473                yamux_config,
474            )?)
475        })
476        .map_err(|error| CreationError::TransportCreationError(error.into()))?
477        .with_behaviour(move |_keypair| Ok(behaviour))
478        .expect("Not fallible; qed")
479        .with_swarm_config(|config| {
480            config
481                .with_max_negotiating_inbound_streams(SWARM_MAX_NEGOTIATING_INBOUND_STREAMS)
482                .with_idle_connection_timeout(IDLE_CONNECTION_TIMEOUT)
483        })
484        .build();
485
486    let is_listening = !listen_on.is_empty();
487
488    // Setup listen_on addresses
489    for mut addr in listen_on {
490        if let Err(error) = swarm.listen_on(addr.clone()) {
491            if !listen_on_fallback_to_random_port {
492                return Err(error.into());
493            }
494
495            let addr_string = addr.to_string();
496            // Listen on random port if specified is already occupied
497            if let Some(Protocol::Tcp(_port)) = addr.pop() {
498                info!("Failed to listen on {addr_string} ({error}), falling back to random port");
499                addr.push(Protocol::Tcp(0));
500                swarm.listen_on(addr)?;
501            }
502        }
503    }
504
505    // Setup external addresses
506    for addr in external_addresses.iter().cloned() {
507        info!("DSN external address added: {addr}");
508        swarm.add_external_address(addr);
509    }
510
511    // Create final structs
512    let (command_sender, command_receiver) = mpsc::channel(1);
513
514    let rate_limiter = RateLimiter::new(
515        max_established_outgoing_connections,
516        max_pending_outgoing_connections,
517    );
518
519    let shared = Arc::new(Shared::new(local_peer_id, command_sender, rate_limiter));
520    let shared_weak = Arc::downgrade(&shared);
521
522    let node = Node::new(shared);
523    let node_runner = NodeRunner::new(NodeRunnerConfig {
524        allow_non_global_addresses_in_dht,
525        is_listening,
526        command_receiver,
527        swarm,
528        shared_weak,
529        next_random_query_interval: initial_random_query_interval,
530        known_peers_registry,
531        reserved_peers: strip_peer_id(reserved_peers).into_iter().collect(),
532        temporary_bans,
533        libp2p_metrics,
534        metrics,
535        protocol_version,
536        bootstrap_addresses,
537    });
538
539    Ok((node, node_runner))
540}