Skip to main content

ab_networking/
node_runner.rs

1use crate::behavior::persistent_parameters::{
2    KnownPeersRegistry, PeerAddressRemovedEvent, append_p2p_suffix, remove_p2p_suffix,
3};
4use crate::behavior::{Behavior, Event};
5use crate::constructor::DummyRecordStore;
6use crate::constructor::temporary_bans::TemporaryBans;
7use crate::protocols::request_response::request_response_factory::{
8    Event as RequestResponseEvent, IfDisconnected,
9};
10use crate::shared::{Command, CreatedSubscription, PeerDiscovered, Shared};
11use crate::utils::{SubspaceMetrics, is_global_address_or_dns, strip_peer_id};
12use async_lock::Mutex as AsyncMutex;
13use bytes::Bytes;
14use event_listener_primitives::HandlerId;
15use futures::channel::mpsc;
16use futures::future::Fuse;
17use futures::{FutureExt, StreamExt};
18use libp2p::autonat::{Event as AutonatEvent, NatStatus, OutboundProbeEvent};
19use libp2p::core::ConnectedPoint;
20use libp2p::gossipsub::{Event as GossipsubEvent, TopicHash};
21use libp2p::identify::Event as IdentifyEvent;
22use libp2p::kad::{
23    Behaviour as Kademlia, BootstrapOk, Event as KademliaEvent, GetClosestPeersError,
24    GetClosestPeersOk, GetProvidersError, GetProvidersOk, GetRecordError, GetRecordOk,
25    InboundRequest, KBucketKey, PeerRecord, ProgressStep, PutRecordOk, QueryId, QueryResult,
26    Quorum, Record, RecordKey,
27};
28use libp2p::metrics::{Metrics, Recorder};
29use libp2p::multiaddr::Protocol;
30use libp2p::swarm::dial_opts::DialOpts;
31use libp2p::swarm::{DialError, SwarmEvent};
32use libp2p::{Multiaddr, PeerId, Swarm, TransportError};
33use nohash_hasher::IntMap;
34use parking_lot::Mutex;
35use std::collections::hash_map::Entry;
36use std::collections::{HashMap, HashSet};
37use std::net::IpAddr;
38use std::pin::Pin;
39use std::sync::atomic::Ordering;
40use std::sync::{Arc, Weak};
41use std::time::Duration;
42use std::{fmt, slice};
43use tokio::sync::OwnedSemaphorePermit;
44use tokio::task::yield_now;
45use tokio::time::Sleep;
46use tracing::{debug, error, trace, warn};
47
48enum QueryResultSender {
49    Value {
50        sender: mpsc::UnboundedSender<PeerRecord>,
51        // Just holding onto permit while data structure is not dropped
52        _permit: OwnedSemaphorePermit,
53    },
54    ClosestPeers {
55        sender: mpsc::UnboundedSender<PeerId>,
56        // Just holding onto permit while data structure is not dropped
57        _permit: Option<OwnedSemaphorePermit>,
58    },
59    Providers {
60        key: RecordKey,
61        sender: mpsc::UnboundedSender<PeerId>,
62        // Just holding onto permit while data structure is not dropped
63        _permit: Option<OwnedSemaphorePermit>,
64    },
65    PutValue {
66        sender: mpsc::UnboundedSender<()>,
67        // Just holding onto permit while data structure is not dropped
68        _permit: OwnedSemaphorePermit,
69    },
70    Bootstrap {
71        sender: mpsc::UnboundedSender<()>,
72    },
73}
74
75#[derive(Debug, Default)]
76enum BootstrapCommandState {
77    #[default]
78    NotStarted,
79    InProgress(mpsc::UnboundedReceiver<()>),
80    Finished,
81}
82
83/// Runner for the Node.
84#[must_use = "Node does not function properly unless its runner is driven forward"]
85pub struct NodeRunner {
86    /// Should non-global addresses be added to the DHT?
87    allow_non_global_addresses_in_dht: bool,
88    /// Whether node is listening on some addresses
89    is_listening: bool,
90    command_receiver: mpsc::Receiver<Command>,
91    swarm: Swarm<Behavior>,
92    shared_weak: Weak<Shared>,
93    /// How frequently should random queries be done using Kademlia DHT to populate routing table.
94    next_random_query_interval: Duration,
95    query_id_receivers: HashMap<QueryId, QueryResultSender>,
96    /// Global subscription counter, is assigned to every (logical) subscription and is used for
97    /// unsubscribing.
98    next_subscription_id: usize,
99    /// Topic subscription senders for logical subscriptions (multiple logical subscriptions can be
100    /// present for the same physical subscription).
101    topic_subscription_senders: HashMap<TopicHash, IntMap<usize, mpsc::UnboundedSender<Bytes>>>,
102    random_query_timeout: Pin<Box<Fuse<Sleep>>>,
103    /// Defines an interval between periodical tasks.
104    periodical_tasks_interval: Pin<Box<Fuse<Sleep>>>,
105    /// Manages the networking parameters like known peers and addresses
106    known_peers_registry: Box<dyn KnownPeersRegistry>,
107    connected_servers: HashSet<PeerId>,
108    /// Defines set of peers with a permanent connection (and reconnection if necessary).
109    reserved_peers: HashMap<PeerId, Multiaddr>,
110    /// Temporarily banned peers.
111    temporary_bans: Arc<Mutex<TemporaryBans>>,
112    /// Libp2p Prometheus metrics.
113    libp2p_metrics: Option<Metrics>,
114    /// Subspace Prometheus metrics.
115    metrics: Option<SubspaceMetrics>,
116    /// Mapping from specific peer to ip addresses
117    peer_ip_addresses: HashMap<PeerId, HashSet<IpAddr>>,
118    /// Defines protocol version for the network peers. Affects network partition.
119    protocol_version: String,
120    /// Addresses to bootstrap Kademlia network
121    bootstrap_addresses: Vec<Multiaddr>,
122    /// Ensures a single bootstrap on run() invocation.
123    bootstrap_command_state: Arc<AsyncMutex<BootstrapCommandState>>,
124    /// Receives an event on peer address removal from the persistent storage.
125    removed_addresses_rx: mpsc::UnboundedReceiver<PeerAddressRemovedEvent>,
126    /// Optional storage for the [`HandlerId`] of the address removal task.
127    /// We keep to stop the task along with the rest of the networking.
128    _address_removal_task_handler_id: Option<HandlerId>,
129}
130
131impl fmt::Debug for NodeRunner {
132    #[inline]
133    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134        f.debug_struct("NodeRunner").finish_non_exhaustive()
135    }
136}
137
138// Helper struct for NodeRunner configuration (clippy requirement).
139pub(crate) struct NodeRunnerConfig {
140    pub(crate) allow_non_global_addresses_in_dht: bool,
141    /// Whether node is listening on some addresses
142    pub(crate) is_listening: bool,
143    pub(crate) command_receiver: mpsc::Receiver<Command>,
144    pub(crate) swarm: Swarm<Behavior>,
145    pub(crate) shared_weak: Weak<Shared>,
146    pub(crate) next_random_query_interval: Duration,
147    pub(crate) known_peers_registry: Box<dyn KnownPeersRegistry>,
148    pub(crate) reserved_peers: HashMap<PeerId, Multiaddr>,
149    pub(crate) temporary_bans: Arc<Mutex<TemporaryBans>>,
150    pub(crate) libp2p_metrics: Option<Metrics>,
151    pub(crate) metrics: Option<SubspaceMetrics>,
152    pub(crate) protocol_version: String,
153    pub(crate) bootstrap_addresses: Vec<Multiaddr>,
154}
155
156impl NodeRunner {
157    pub(crate) fn new(
158        NodeRunnerConfig {
159            allow_non_global_addresses_in_dht,
160            is_listening,
161            command_receiver,
162            swarm,
163            shared_weak,
164            next_random_query_interval,
165            mut known_peers_registry,
166            reserved_peers,
167            temporary_bans,
168            libp2p_metrics,
169            metrics,
170            protocol_version,
171            bootstrap_addresses,
172        }: NodeRunnerConfig,
173    ) -> Self {
174        // Setup the address removal events exchange between persistent params storage and Kademlia.
175        let (removed_addresses_tx, removed_addresses_rx) = mpsc::unbounded();
176        let mut address_removal_task_handler_id = None;
177        if let Some(handler_id) = known_peers_registry.on_unreachable_address({
178            Arc::new(move |event| {
179                if let Err(error) = removed_addresses_tx.unbounded_send(event.clone()) {
180                    debug!(?error, ?event, "Cannot send PeerAddressRemovedEvent");
181                }
182            })
183        }) {
184            address_removal_task_handler_id.replace(handler_id);
185        }
186
187        Self {
188            allow_non_global_addresses_in_dht,
189            is_listening,
190            command_receiver,
191            swarm,
192            shared_weak,
193            next_random_query_interval,
194            query_id_receivers: HashMap::default(),
195            next_subscription_id: 0,
196            topic_subscription_senders: HashMap::default(),
197            // We'll make the first query right away and continue at the interval.
198            random_query_timeout: Box::pin(tokio::time::sleep(Duration::from_secs(0)).fuse()),
199            // We'll make the first dial right away and continue at the interval.
200            periodical_tasks_interval: Box::pin(tokio::time::sleep(Duration::from_secs(0)).fuse()),
201            known_peers_registry,
202            connected_servers: HashSet::new(),
203            reserved_peers,
204            temporary_bans,
205            libp2p_metrics,
206            metrics,
207            peer_ip_addresses: HashMap::new(),
208            protocol_version,
209            bootstrap_addresses,
210            bootstrap_command_state: Arc::new(AsyncMutex::new(BootstrapCommandState::default())),
211            removed_addresses_rx,
212            _address_removal_task_handler_id: address_removal_task_handler_id,
213        }
214    }
215
216    /// Drives the main networking future forward.
217    pub async fn run(&mut self) {
218        if self.is_listening {
219            // Wait for listen addresses, otherwise we will get ephemeral addresses in external
220            // address candidates that we do not want
221            loop {
222                if self.swarm.listeners().next().is_some() {
223                    break;
224                }
225
226                if let Some(swarm_event) = self.swarm.next().await {
227                    self.register_event_metrics(&swarm_event);
228                    self.handle_swarm_event(swarm_event).await;
229                } else {
230                    break;
231                }
232            }
233        }
234
235        self.bootstrap().await;
236
237        loop {
238            futures::select! {
239                () = &mut self.random_query_timeout => {
240                    self.handle_random_query_interval();
241                    // Increase interval 2x, but to at most 1 minute.
242                    self.random_query_timeout =
243                        Box::pin(tokio::time::sleep(self.next_random_query_interval).fuse());
244                    self.next_random_query_interval =
245                        (self.next_random_query_interval * 2).min(Duration::from_mins(1));
246                },
247                swarm_event = self.swarm.next() => {
248                    if let Some(swarm_event) = swarm_event {
249                        self.register_event_metrics(&swarm_event);
250                        self.handle_swarm_event(swarm_event).await;
251                    } else {
252                        break;
253                    }
254                },
255                command = self.command_receiver.next() => {
256                    if let Some(command) = command {
257                        self.handle_command(command);
258                    } else {
259                        break;
260                    }
261                },
262                () = self.known_peers_registry.run().fuse() => {
263                    trace!("Network parameters registry runner exited");
264                },
265                () = &mut self.periodical_tasks_interval => {
266                    self.handle_periodical_tasks();
267
268                    self.periodical_tasks_interval =
269                        Box::pin(tokio::time::sleep(Duration::from_secs(5)).fuse());
270                },
271                event = self.removed_addresses_rx.select_next_some() => {
272                    self.handle_removed_address_event(event);
273                },
274            }
275
276            // Allow to exit from busy loop during graceful shutdown
277            yield_now().await;
278        }
279    }
280
281    /// Bootstraps Kademlia network
282    async fn bootstrap(&mut self) {
283        // Add bootstrap nodes first to make sure there is space for them in k-buckets
284        for (peer_id, address) in strip_peer_id(self.bootstrap_addresses.clone()) {
285            self.swarm
286                .behaviour_mut()
287                .kademlia
288                .add_address(&peer_id, address);
289        }
290
291        let known_peers = self.known_peers_registry.all_known_peers().await;
292
293        if !known_peers.is_empty() {
294            for (peer_id, addresses) in known_peers {
295                for address in addresses.clone() {
296                    let address = match address.with_p2p(peer_id) {
297                        Ok(address) => address,
298                        Err(address) => {
299                            warn!(%peer_id, %address, "Failed to add peer ID to known peer address");
300                            break;
301                        }
302                    };
303                    self.swarm
304                        .behaviour_mut()
305                        .kademlia
306                        .add_address(&peer_id, address);
307                }
308
309                if let Err(error) = self
310                    .swarm
311                    .dial(DialOpts::peer_id(peer_id).addresses(addresses).build())
312                {
313                    warn!(%peer_id, %error, "Failed to dial peer during bootstrapping");
314                }
315            }
316
317            // Do bootstrap asynchronously
318            self.handle_command(Command::Bootstrap {
319                result_sender: None,
320            });
321            return;
322        }
323
324        let bootstrap_command_state = Arc::clone(&self.bootstrap_command_state);
325        let mut bootstrap_command_state = bootstrap_command_state.lock().await;
326        let bootstrap_command_receiver = match &mut *bootstrap_command_state {
327            BootstrapCommandState::NotStarted => {
328                debug!("Bootstrap started.");
329
330                let (bootstrap_command_sender, bootstrap_command_receiver) = mpsc::unbounded();
331
332                self.handle_command(Command::Bootstrap {
333                    result_sender: Some(bootstrap_command_sender),
334                });
335
336                *bootstrap_command_state =
337                    BootstrapCommandState::InProgress(bootstrap_command_receiver);
338                match &mut *bootstrap_command_state {
339                    BootstrapCommandState::InProgress(bootstrap_command_receiver) => {
340                        bootstrap_command_receiver
341                    }
342                    _ => {
343                        unreachable!("Was just set to that exact value");
344                    }
345                }
346            }
347            BootstrapCommandState::InProgress(bootstrap_command_receiver) => {
348                bootstrap_command_receiver
349            }
350            BootstrapCommandState::Finished => {
351                return;
352            }
353        };
354
355        let mut bootstrap_step = 0usize;
356        loop {
357            futures::select! {
358                swarm_event = self.swarm.next() => {
359                    if let Some(swarm_event) = swarm_event {
360                        self.register_event_metrics(&swarm_event);
361                        self.handle_swarm_event(swarm_event).await;
362                    } else {
363                        break;
364                    }
365                },
366                result = bootstrap_command_receiver.next() => {
367                    if result.is_some() {
368                        debug!(%bootstrap_step, "Kademlia bootstrapping...");
369                        bootstrap_step += 1;
370                    } else {
371                        break;
372                    }
373                }
374            }
375        }
376
377        debug!("Bootstrap finished.");
378        *bootstrap_command_state = BootstrapCommandState::Finished;
379    }
380
381    /// Handles periodical tasks.
382    fn handle_periodical_tasks(&mut self) {
383        // Log current connections.
384        let network_info = self.swarm.network_info();
385        let connections = network_info.connection_counters();
386
387        debug!(?connections, "Current connections and limits.");
388
389        // Renew known external addresses.
390        let mut external_addresses = self.swarm.external_addresses().cloned().collect::<Vec<_>>();
391
392        if let Some(shared) = self.shared_weak.upgrade() {
393            debug!(?external_addresses, "Renew external addresses.");
394            let mut addresses = shared.external_addresses.lock();
395            addresses.clear();
396            addresses.append(&mut external_addresses);
397        }
398
399        self.log_kademlia_stats();
400    }
401
402    fn handle_random_query_interval(&mut self) {
403        let random_peer_id = PeerId::random();
404
405        trace!("Starting random Kademlia query for {}", random_peer_id);
406
407        self.swarm
408            .behaviour_mut()
409            .kademlia
410            .get_closest_peers(random_peer_id);
411    }
412
413    fn handle_removed_address_event(&mut self, event: PeerAddressRemovedEvent) {
414        trace!(?event, "Peer address removed event.");
415
416        let bootstrap_node_ids = strip_peer_id(self.bootstrap_addresses.clone())
417            .into_iter()
418            .map(|(peer_id, _)| peer_id)
419            .collect::<Vec<_>>();
420
421        if bootstrap_node_ids.contains(&event.peer_id) {
422            debug!(
423                ?event,
424                ?bootstrap_node_ids,
425                "Skipped removing bootstrap node from Kademlia buckets."
426            );
427
428            return;
429        }
430
431        // Remove both versions of the address
432        self.swarm.behaviour_mut().kademlia.remove_address(
433            &event.peer_id,
434            &append_p2p_suffix(event.peer_id, event.address.clone()),
435        );
436
437        self.swarm
438            .behaviour_mut()
439            .kademlia
440            .remove_address(&event.peer_id, &remove_p2p_suffix(event.address));
441    }
442
443    fn handle_remove_listeners(&mut self, removed_listeners: &[Multiaddr]) {
444        let Some(shared) = self.shared_weak.upgrade() else {
445            return;
446        };
447
448        // Remove both versions of the address
449        let peer_id = shared.id;
450        shared.listeners.lock().retain(|old_listener| {
451            !removed_listeners.contains(&append_p2p_suffix(peer_id, old_listener.clone()))
452                && !removed_listeners.contains(&remove_p2p_suffix(old_listener.clone()))
453        });
454    }
455
456    async fn handle_swarm_event(&mut self, swarm_event: SwarmEvent<Event>) {
457        #[expect(clippy::ref_patterns, reason = "Much less awkward this way")]
458        match swarm_event {
459            SwarmEvent::Behaviour(Event::Identify(event)) => {
460                self.handle_identify_event(*event);
461            }
462            SwarmEvent::Behaviour(Event::Kademlia(event)) => {
463                self.handle_kademlia_event(event);
464            }
465            SwarmEvent::Behaviour(Event::Gossipsub(event)) => {
466                self.handle_gossipsub_event(event);
467            }
468            SwarmEvent::Behaviour(Event::RequestResponse(event)) => {
469                self.handle_request_response_event(event);
470            }
471            SwarmEvent::Behaviour(Event::Autonat(event)) => {
472                self.handle_autonat_event(event);
473            }
474            ref event @ SwarmEvent::NewListenAddr { ref address, .. } => {
475                trace!(?event, "New local listener  event.");
476
477                let Some(shared) = self.shared_weak.upgrade() else {
478                    return;
479                };
480                shared.listeners.lock().push(address.clone());
481                shared.handlers.new_listener.call_simple(address);
482            }
483            ref event @ SwarmEvent::ListenerClosed { ref addresses, .. } => {
484                trace!(?event, "Local listener closed event.");
485                self.handle_remove_listeners(addresses);
486            }
487            ref event @ SwarmEvent::ExpiredListenAddr { ref address, .. } => {
488                trace!(?event, "Local listener expired event.");
489                self.handle_remove_listeners(slice::from_ref(address));
490            }
491            SwarmEvent::ConnectionEstablished {
492                peer_id,
493                endpoint,
494                num_established,
495                ..
496            } => {
497                // Save known addresses that were successfully dialed.
498                if let ConnectedPoint::Dialer { address, .. } = &endpoint {
499                    // filter non-global addresses when non-globals addresses are disabled
500                    if self.allow_non_global_addresses_in_dht || is_global_address_or_dns(address) {
501                        self.known_peers_registry
502                            .add_known_peer(peer_id, vec![address.clone()])
503                            .await;
504                    }
505                }
506
507                let Some(shared) = self.shared_weak.upgrade() else {
508                    return;
509                };
510
511                let is_reserved_peer = self.reserved_peers.contains_key(&peer_id);
512                debug!(
513                    %peer_id,
514                    %is_reserved_peer,
515                    ?endpoint,
516                    %num_established,
517                    "Connection established"
518                );
519
520                let maybe_remote_ip =
521                    endpoint
522                        .get_remote_address()
523                        .iter()
524                        .find_map(|protocol| match protocol {
525                            Protocol::Ip4(ip) => Some(IpAddr::V4(ip)),
526                            Protocol::Ip6(ip) => Some(IpAddr::V6(ip)),
527                            _ => None,
528                        });
529                if let Some(ip) = maybe_remote_ip {
530                    self.peer_ip_addresses
531                        .entry(peer_id)
532                        .and_modify(|ips| {
533                            ips.insert(ip);
534                        })
535                        .or_insert(HashSet::from([ip]));
536                }
537
538                let num_established_peer_connections = shared
539                    .num_established_peer_connections
540                    .fetch_add(1, Ordering::SeqCst)
541                    + 1;
542
543                shared
544                    .handlers
545                    .num_established_peer_connections_change
546                    .call_simple(&num_established_peer_connections);
547
548                // A new connection
549                if num_established.get() == 1 {
550                    shared.handlers.connected_peer.call_simple(&peer_id);
551                }
552
553                if let Some(metrics) = self.metrics.as_ref() {
554                    metrics.inc_established_connections();
555                }
556            }
557            SwarmEvent::ConnectionClosed {
558                peer_id,
559                num_established,
560                cause,
561                ..
562            } => {
563                let Some(shared) = self.shared_weak.upgrade() else {
564                    return;
565                };
566
567                debug!(
568                    %peer_id,
569                    ?cause,
570                    %num_established,
571                    "Connection closed with peer"
572                );
573
574                if num_established == 0 {
575                    self.peer_ip_addresses.remove(&peer_id);
576                    self.connected_servers.remove(&peer_id);
577                }
578                let num_established_peer_connections = shared
579                    .num_established_peer_connections
580                    .fetch_sub(1, Ordering::SeqCst)
581                    - 1;
582
583                shared
584                    .handlers
585                    .num_established_peer_connections_change
586                    .call_simple(&num_established_peer_connections);
587
588                // No more connections
589                if num_established == 0 {
590                    shared.handlers.disconnected_peer.call_simple(&peer_id);
591                }
592
593                if let Some(metrics) = self.metrics.as_ref() {
594                    metrics.dec_established_connections();
595                }
596            }
597            SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
598                if let Some(peer_id) = &peer_id {
599                    let should_ban_temporarily =
600                        self.should_temporary_ban_on_dial_error(peer_id, &error);
601
602                    trace!(%should_ban_temporarily, "Temporary bans conditions.");
603
604                    if should_ban_temporarily {
605                        self.temporary_bans.lock().create_or_extend(peer_id);
606                        debug!(%peer_id, ?error, "Peer was temporarily banned.");
607                    }
608                }
609
610                debug!(
611                    ?peer_id,
612                    ?error,
613                    "SwarmEvent::OutgoingConnectionError for peer."
614                );
615
616                match error {
617                    DialError::Transport(ref addresses) => {
618                        for (addr, _) in addresses {
619                            trace!(?error, ?peer_id, %addr, "SwarmEvent::OutgoingConnectionError (DialError::Transport) for peer.");
620                            if let Some(peer_id) = peer_id {
621                                self.known_peers_registry
622                                    .remove_known_peer_addresses(peer_id, vec![addr.clone()])
623                                    .await;
624                            }
625                        }
626                    }
627                    DialError::WrongPeerId { obtained, .. } => {
628                        trace!(?error, ?peer_id, obtained_peer_id=?obtained, "SwarmEvent::WrongPeerId (DialError::WrongPeerId) for peer.");
629
630                        if let Some(ref peer_id) = peer_id {
631                            let kademlia = &mut self.swarm.behaviour_mut().kademlia;
632                            let _: Option<_> = kademlia.remove_peer(peer_id);
633                        }
634                    }
635                    _ => {
636                        trace!(?error, ?peer_id, "SwarmEvent::OutgoingConnectionError");
637                    }
638                }
639            }
640            SwarmEvent::NewExternalAddrCandidate { address } => {
641                trace!(%address, "External address candidate");
642            }
643            SwarmEvent::ExternalAddrConfirmed { address } => {
644                debug!(%address, "Confirmed external address");
645
646                let connected_peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
647                self.swarm.behaviour_mut().identify.push(connected_peers);
648            }
649            SwarmEvent::ExternalAddrExpired { address } => {
650                debug!(%address, "External address expired");
651
652                let connected_peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
653                self.swarm.behaviour_mut().identify.push(connected_peers);
654            }
655            other => {
656                trace!("Other swarm event: {:?}", other);
657            }
658        }
659    }
660
661    fn should_temporary_ban_on_dial_error(&self, peer_id: &PeerId, error: &DialError) -> bool {
662        // TODO: Replace with banning of addresses rather peer IDs if this helps
663        if true {
664            return false;
665        }
666
667        // Ban temporarily only peers without active connections.
668        if self.swarm.is_connected(peer_id) {
669            return false;
670        }
671
672        match &error {
673            DialError::Transport(addresses) => {
674                for (_, error) in addresses {
675                    match error {
676                        TransportError::MultiaddrNotSupported(_) => {
677                            return true;
678                        }
679                        TransportError::Other(_) => {
680                            // Ignore "temporary ban" errors
681                            if self.temporary_bans.lock().is_banned(peer_id) {
682                                return false;
683                            }
684                        }
685                    }
686                }
687                // Other errors that are not related to temporary bans
688                true
689            }
690            DialError::LocalPeerId { .. } => {
691                // We don't ban ourselves
692                debug!("Local peer dial attempt detected.");
693
694                false
695            }
696            DialError::NoAddresses => {
697                // Let's wait until we get addresses
698                true
699            }
700            DialError::DialPeerConditionFalse(_) => {
701                // These are local conditions, we don't need to ban remote peers
702                false
703            }
704            DialError::Aborted => {
705                // Seems like a transient event
706                false
707            }
708            DialError::WrongPeerId { .. } => {
709                // It's likely that peer was restarted with different identity
710                false
711            }
712            DialError::Denied { .. } => {
713                // We exceeded the connection limits or we hit a black listed peer
714                false
715            }
716        }
717    }
718
719    fn handle_identify_event(&mut self, event: IdentifyEvent) {
720        let local_peer_id = *self.swarm.local_peer_id();
721
722        if let IdentifyEvent::Received {
723            peer_id, mut info, ..
724        } = event
725        {
726            debug!(?peer_id, protocols = ?info.protocols, "IdentifyEvent::Received");
727
728            // Check for network partition
729            if info.protocol_version != self.protocol_version {
730                debug!(
731                    %local_peer_id,
732                    %peer_id,
733                    local_protocol_version = %self.protocol_version,
734                    peer_protocol_version = %info.protocol_version,
735                    "Peer has different protocol version, banning temporarily",
736                );
737
738                self.temporary_bans.lock().create_or_extend(&peer_id);
739                // Forget about this peer until they upgrade
740                let _: Result<(), ()> = self.swarm.disconnect_peer_id(peer_id);
741                self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id);
742                self.known_peers_registry
743                    .remove_all_known_peer_addresses(peer_id);
744
745                return;
746            }
747
748            // Remove temporary ban if there was any
749            self.temporary_bans.lock().remove(&peer_id);
750
751            if info.listen_addrs.len() > 30 {
752                debug!(
753                    %local_peer_id,
754                    %peer_id,
755                    "Node has reported more than 30 addresses; it is identified by {} and {}",
756                    info.protocol_version, info.agent_version
757                );
758                info.listen_addrs.truncate(30);
759            }
760
761            let kademlia = &mut self.swarm.behaviour_mut().kademlia;
762            let full_kademlia_support = kademlia
763                .protocol_names()
764                .iter()
765                .all(|local_protocol| info.protocols.contains(local_protocol));
766
767            if full_kademlia_support {
768                let received_addresses = info
769                    .listen_addrs
770                    .into_iter()
771                    .filter(|address| {
772                        if self.allow_non_global_addresses_in_dht
773                            || is_global_address_or_dns(address)
774                        {
775                            true
776                        } else {
777                            trace!(
778                                %local_peer_id,
779                                %peer_id,
780                                %address,
781                                "Ignoring self-reported non-global address",
782                            );
783
784                            false
785                        }
786                    })
787                    .collect::<Vec<_>>();
788                let received_address_strings = received_addresses
789                    .iter()
790                    .map(ToString::to_string)
791                    .collect::<Vec<_>>();
792                let old_addresses = kademlia
793                    .kbucket(peer_id)
794                    .and_then(|peers| {
795                        let key = peer_id.into();
796                        peers.iter().find_map(|peer| {
797                            (peer.node.key == &key).then_some(
798                                peer.node
799                                    .value
800                                    .iter()
801                                    .filter(|existing_address| {
802                                        let existing_address = existing_address.to_string();
803
804                                        !received_address_strings.iter().any(|received_address| {
805                                            received_address.starts_with(&existing_address)
806                                                || existing_address.starts_with(received_address)
807                                        })
808                                    })
809                                    .cloned()
810                                    .collect::<Vec<_>>(),
811                            )
812                        })
813                    })
814                    .unwrap_or_default();
815
816                for address in received_addresses {
817                    debug!(
818                        %local_peer_id,
819                        %peer_id,
820                        %address,
821                        protocol_names = ?kademlia.protocol_names(),
822                        "Adding self-reported address to Kademlia DHT",
823                    );
824
825                    kademlia.add_address(&peer_id, address);
826                }
827
828                for old_address in old_addresses {
829                    trace!(
830                        %local_peer_id,
831                        %peer_id,
832                        %old_address,
833                        "Removing old self-reported address from Kademlia DHT",
834                    );
835
836                    kademlia.remove_address(&peer_id, &old_address);
837                }
838
839                self.connected_servers.insert(peer_id);
840            } else {
841                debug!(
842                    %local_peer_id,
843                    %peer_id,
844                    peer_protocols = ?info.protocols,
845                    protocol_names = ?kademlia.protocol_names(),
846                    "Peer doesn't support our Kademlia DHT protocol",
847                );
848
849                kademlia.remove_peer(&peer_id);
850                self.connected_servers.remove(&peer_id);
851            }
852        }
853    }
854
855    fn handle_kademlia_event(&mut self, event: KademliaEvent) {
856        trace!("Kademlia event: {:?}", event);
857
858        match event {
859            KademliaEvent::InboundRequest {
860                request: InboundRequest::AddProvider { record, .. },
861            } => {
862                debug!("Unexpected AddProvider request received: {:?}", record);
863            }
864            KademliaEvent::UnroutablePeer { peer } => {
865                debug!(%peer, "Unroutable peer detected");
866
867                self.swarm.behaviour_mut().kademlia.remove_peer(&peer);
868
869                if let Some(shared) = self.shared_weak.upgrade() {
870                    shared
871                        .handlers
872                        .peer_discovered
873                        .call_simple(&PeerDiscovered::UnroutablePeer { peer_id: peer });
874                }
875            }
876            KademliaEvent::RoutablePeer { peer, address } => {
877                debug!(?address, "Routable peer detected: {:?}", peer);
878
879                if let Some(shared) = self.shared_weak.upgrade() {
880                    shared
881                        .handlers
882                        .peer_discovered
883                        .call_simple(&PeerDiscovered::RoutablePeer {
884                            peer_id: peer,
885                            address,
886                        });
887                }
888            }
889            KademliaEvent::PendingRoutablePeer { peer, address } => {
890                debug!(?address, "Pending routable peer detected: {:?}", peer);
891
892                if let Some(shared) = self.shared_weak.upgrade() {
893                    shared
894                        .handlers
895                        .peer_discovered
896                        .call_simple(&PeerDiscovered::RoutablePeer {
897                            peer_id: peer,
898                            address,
899                        });
900                }
901            }
902            KademliaEvent::OutboundQueryProgressed {
903                step: ProgressStep { last, .. },
904                id,
905                result: QueryResult::GetClosestPeers(result),
906                ..
907            } => {
908                let mut cancelled = false;
909                if let Some(QueryResultSender::ClosestPeers { sender, .. }) =
910                    self.query_id_receivers.get(&id)
911                {
912                    match result {
913                        Ok(GetClosestPeersOk { key, peers }) => {
914                            trace!(
915                                "Get closest peers query for {} yielded {} results",
916                                hex::encode(key),
917                                peers.len(),
918                            );
919
920                            if peers.is_empty()
921                                // Connected peers collection is not empty.
922                                && self.swarm.connected_peers().next().is_some()
923                            {
924                                debug!("Random Kademlia query has yielded empty list of peers");
925                            }
926
927                            for peer in peers {
928                                cancelled = Self::unbounded_send_and_cancel_on_error(
929                                    &mut self.swarm.behaviour_mut().kademlia,
930                                    sender,
931                                    peer.peer_id,
932                                    "GetClosestPeersOk",
933                                    id,
934                                ) || cancelled;
935                            }
936                        }
937                        Err(GetClosestPeersError::Timeout { key, peers }) => {
938                            debug!(
939                                "Get closest peers query for {} timed out with {} results",
940                                hex::encode(key),
941                                peers.len(),
942                            );
943
944                            for peer in peers {
945                                cancelled = Self::unbounded_send_and_cancel_on_error(
946                                    &mut self.swarm.behaviour_mut().kademlia,
947                                    sender,
948                                    peer.peer_id,
949                                    "GetClosestPeersError::Timeout",
950                                    id,
951                                ) || cancelled;
952                            }
953                        }
954                    }
955                }
956
957                if last || cancelled {
958                    // There will be no more progress
959                    self.query_id_receivers.remove(&id);
960                }
961            }
962            KademliaEvent::OutboundQueryProgressed {
963                step: ProgressStep { last, .. },
964                id,
965                result: QueryResult::GetRecord(result),
966                ..
967            } => {
968                let mut cancelled = false;
969                if let Some(QueryResultSender::Value { sender, .. }) =
970                    self.query_id_receivers.get(&id)
971                {
972                    match result {
973                        Ok(GetRecordOk::FoundRecord(rec)) => {
974                            trace!(
975                                key = hex::encode(&rec.record.key),
976                                "Get record query succeeded",
977                            );
978
979                            cancelled = Self::unbounded_send_and_cancel_on_error(
980                                &mut self.swarm.behaviour_mut().kademlia,
981                                sender,
982                                rec,
983                                "GetRecordOk",
984                                id,
985                            ) || cancelled;
986                        }
987                        Ok(GetRecordOk::FinishedWithNoAdditionalRecord { .. }) => {
988                            trace!("Get record query yielded no results");
989                        }
990                        Err(error) => match error {
991                            GetRecordError::NotFound { key, .. } => {
992                                debug!(
993                                    key = hex::encode(&key),
994                                    "Get record query failed with no results",
995                                );
996                            }
997                            GetRecordError::Timeout { key } => {
998                                debug!(key = hex::encode(&key), "Get record query timed out");
999                            }
1000                        },
1001                    }
1002                }
1003
1004                if last || cancelled {
1005                    // There will be no more progress
1006                    self.query_id_receivers.remove(&id);
1007                }
1008            }
1009            KademliaEvent::OutboundQueryProgressed {
1010                step: ProgressStep { last, .. },
1011                id,
1012                result: QueryResult::GetProviders(result),
1013                ..
1014            } => {
1015                let mut cancelled = false;
1016                if let Some(QueryResultSender::Providers { key, sender, .. }) =
1017                    self.query_id_receivers.get(&id)
1018                {
1019                    match result {
1020                        Ok(GetProvidersOk::FoundProviders { key, providers }) => {
1021                            trace!(
1022                                key = hex::encode(&key),
1023                                "Get providers query yielded {} results",
1024                                providers.len(),
1025                            );
1026
1027                            for provider in providers {
1028                                cancelled = Self::unbounded_send_and_cancel_on_error(
1029                                    &mut self.swarm.behaviour_mut().kademlia,
1030                                    sender,
1031                                    provider,
1032                                    "GetProvidersOk",
1033                                    id,
1034                                ) || cancelled;
1035                            }
1036                        }
1037                        Ok(GetProvidersOk::FinishedWithNoAdditionalRecord { closest_peers }) => {
1038                            trace!(
1039                                key = hex::encode(key),
1040                                closest_peers = %closest_peers.len(),
1041                                "Get providers query yielded no results"
1042                            );
1043                        }
1044                        Err(error) => {
1045                            let GetProvidersError::Timeout { key, .. } = error;
1046
1047                            debug!(
1048                                key = hex::encode(&key),
1049                                "Get providers query failed with no results",
1050                            );
1051                        }
1052                    }
1053                }
1054
1055                if last || cancelled {
1056                    // There will be no more progress
1057                    self.query_id_receivers.remove(&id);
1058                }
1059            }
1060            KademliaEvent::OutboundQueryProgressed {
1061                step: ProgressStep { last, .. },
1062                id,
1063                result: QueryResult::PutRecord(result),
1064                ..
1065            } => {
1066                let mut cancelled = false;
1067                if let Some(QueryResultSender::PutValue { sender, .. }) =
1068                    self.query_id_receivers.get(&id)
1069                {
1070                    match result {
1071                        Ok(PutRecordOk { key }) => {
1072                            trace!("Put record query for {} succeeded", hex::encode(&key));
1073
1074                            cancelled = Self::unbounded_send_and_cancel_on_error(
1075                                &mut self.swarm.behaviour_mut().kademlia,
1076                                sender,
1077                                (),
1078                                "PutRecordOk",
1079                                id,
1080                            ) || cancelled;
1081                        }
1082                        Err(error) => {
1083                            debug!(?error, "Put record query failed.");
1084                        }
1085                    }
1086                }
1087
1088                if last || cancelled {
1089                    // There will be no more progress
1090                    self.query_id_receivers.remove(&id);
1091                }
1092            }
1093            KademliaEvent::OutboundQueryProgressed {
1094                step: ProgressStep { last, count },
1095                id,
1096                result: QueryResult::Bootstrap(result),
1097                stats,
1098            } => {
1099                debug!(?stats, %last, %count, ?id, ?result, "Bootstrap OutboundQueryProgressed step.");
1100
1101                let mut cancelled = false;
1102                if let Some(QueryResultSender::Bootstrap { sender }) =
1103                    self.query_id_receivers.get_mut(&id)
1104                {
1105                    match result {
1106                        Ok(BootstrapOk {
1107                            peer,
1108                            num_remaining,
1109                        }) => {
1110                            trace!(%peer, %num_remaining, %last, "Bootstrap query step succeeded");
1111
1112                            cancelled = Self::unbounded_send_and_cancel_on_error(
1113                                &mut self.swarm.behaviour_mut().kademlia,
1114                                sender,
1115                                (),
1116                                "Bootstrap",
1117                                id,
1118                            ) || cancelled;
1119                        }
1120                        Err(error) => {
1121                            debug!(?error, "Bootstrap query failed.");
1122                        }
1123                    }
1124                }
1125
1126                if last || cancelled {
1127                    // There will be no more progress
1128                    self.query_id_receivers.remove(&id);
1129                }
1130            }
1131            _ => {}
1132        }
1133    }
1134
1135    // Returns `true` if query was cancelled
1136    fn unbounded_send_and_cancel_on_error<T>(
1137        kademlia: &mut Kademlia<DummyRecordStore>,
1138        sender: &mpsc::UnboundedSender<T>,
1139        value: T,
1140        channel: &'static str,
1141        id: QueryId,
1142    ) -> bool {
1143        if sender.unbounded_send(value).is_err() {
1144            debug!("{} channel was dropped", channel);
1145
1146            // Cancel query
1147            if let Some(mut query) = kademlia.query_mut(&id) {
1148                query.finish();
1149            }
1150            true
1151        } else {
1152            false
1153        }
1154    }
1155
1156    fn handle_gossipsub_event(&mut self, event: GossipsubEvent) {
1157        if let GossipsubEvent::Message { message, .. } = event
1158            && let Some(senders) = self.topic_subscription_senders.get(&message.topic)
1159        {
1160            let bytes = Bytes::from(message.data);
1161
1162            for sender in senders.values() {
1163                // Doesn't matter if receiver is still listening for messages or not.
1164                let _: Result<(), _> = sender.unbounded_send(bytes.clone());
1165            }
1166        }
1167    }
1168
1169    fn handle_request_response_event(&mut self, event: RequestResponseEvent) {
1170        // No actions on statistics events.
1171        trace!("Request response event: {:?}", event);
1172    }
1173
1174    fn handle_autonat_event(&mut self, event: AutonatEvent) {
1175        trace!(?event, "Autonat event received.");
1176        let autonat = &self.swarm.behaviour().autonat;
1177        debug!(
1178            public_address=?autonat.public_address(),
1179            confidence=%autonat.confidence(),
1180            "Current public address confidence."
1181        );
1182
1183        match event {
1184            AutonatEvent::InboundProbe(_inbound_probe_event) => {
1185                // We do not care about this event
1186            }
1187            AutonatEvent::OutboundProbe(outbound_probe_event) => {
1188                match outbound_probe_event {
1189                    OutboundProbeEvent::Request { peer, .. } => {
1190                        // For outbound probe request add peer to allow list to ensure they can dial
1191                        // us back and not hit global incoming connection limit
1192                        self.swarm
1193                            .behaviour_mut()
1194                            .connection_limits
1195                            // We expect a single successful dial from this peer
1196                            .add_to_incoming_allow_list(
1197                                peer,
1198                                self.peer_ip_addresses
1199                                    .get(&peer)
1200                                    .iter()
1201                                    .flat_map(|ip_addresses| ip_addresses.iter())
1202                                    .copied(),
1203                                1,
1204                            );
1205                    }
1206                    OutboundProbeEvent::Response { peer, .. } => {
1207                        self.swarm
1208                            .behaviour_mut()
1209                            .connection_limits
1210                            .remove_from_incoming_allow_list(&peer, Some(1));
1211                    }
1212                    OutboundProbeEvent::Error { peer, .. } => {
1213                        if let Some(peer) = peer {
1214                            self.swarm
1215                                .behaviour_mut()
1216                                .connection_limits
1217                                .remove_from_incoming_allow_list(&peer, Some(1));
1218                        }
1219                    }
1220                }
1221            }
1222            AutonatEvent::StatusChanged { old, new } => {
1223                debug!(?old, ?new, "Public address status changed.");
1224
1225                // TODO: Remove block once https://github.com/libp2p/rust-libp2p/issues/4863 is resolved
1226                if let (NatStatus::Public(old_address), NatStatus::Private) = (old, new.clone()) {
1227                    self.swarm.remove_external_address(&old_address);
1228                    debug!(
1229                        ?old_address,
1230                        new_status = ?new,
1231                        "Removing old external address...",
1232                    );
1233
1234                    // Trigger potential mode change manually
1235                    self.swarm.behaviour_mut().kademlia.set_mode(None);
1236                }
1237
1238                let connected_peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
1239                self.swarm.behaviour_mut().identify.push(connected_peers);
1240            }
1241        }
1242    }
1243
1244    fn handle_command(&mut self, command: Command) {
1245        match command {
1246            Command::GetValue {
1247                key,
1248                result_sender,
1249                permit,
1250            } => {
1251                let query_id = self
1252                    .swarm
1253                    .behaviour_mut()
1254                    .kademlia
1255                    .get_record(key.to_bytes().into());
1256
1257                self.query_id_receivers.insert(
1258                    query_id,
1259                    QueryResultSender::Value {
1260                        sender: result_sender,
1261                        _permit: permit,
1262                    },
1263                );
1264            }
1265            Command::PutValue {
1266                key,
1267                value,
1268                result_sender,
1269                permit,
1270            } => {
1271                let local_peer_id = *self.swarm.local_peer_id();
1272
1273                let record = Record {
1274                    key: key.into(),
1275                    value,
1276                    publisher: Some(local_peer_id),
1277                    expires: None, // No time expiration.
1278                };
1279                let query_result = self
1280                    .swarm
1281                    .behaviour_mut()
1282                    .kademlia
1283                    .put_record(record, Quorum::One);
1284
1285                match query_result {
1286                    Ok(query_id) => {
1287                        self.query_id_receivers.insert(
1288                            query_id,
1289                            QueryResultSender::PutValue {
1290                                sender: result_sender,
1291                                _permit: permit,
1292                            },
1293                        );
1294                    }
1295                    Err(err) => {
1296                        warn!(?err, "Failed to put value.");
1297                    }
1298                }
1299            }
1300            Command::Subscribe {
1301                topic,
1302                result_sender,
1303            } => {
1304                assert!(
1305                    self.swarm.behaviour().gossipsub.is_enabled(),
1306                    "Gossipsub protocol is disabled."
1307                );
1308
1309                let topic_hash = topic.hash();
1310                let (sender, receiver) = mpsc::unbounded();
1311
1312                // Unconditionally create subscription ID, code is simpler this way.
1313                let subscription_id = self.next_subscription_id;
1314                self.next_subscription_id += 1;
1315
1316                let created_subscription = CreatedSubscription {
1317                    subscription_id,
1318                    receiver,
1319                };
1320
1321                match self.topic_subscription_senders.entry(topic_hash) {
1322                    Entry::Occupied(mut entry) => {
1323                        // In case subscription already exists, just add one more sender to it.
1324                        if result_sender.send(Ok(created_subscription)).is_ok() {
1325                            entry.get_mut().insert(subscription_id, sender);
1326                        }
1327                    }
1328                    Entry::Vacant(entry) => {
1329                        // Otherwise subscription needs to be created.
1330
1331                        if let Some(gossipsub) = self.swarm.behaviour_mut().gossipsub.as_mut() {
1332                            match gossipsub.subscribe(&topic) {
1333                                Ok(true) => {
1334                                    if result_sender.send(Ok(created_subscription)).is_ok() {
1335                                        entry
1336                                            .insert(IntMap::from_iter([(subscription_id, sender)]));
1337                                    }
1338                                }
1339                                Ok(false) => {
1340                                    panic!(
1341                                        "Logic error, topic subscription wasn't created, this \
1342                                        must never happen"
1343                                    );
1344                                }
1345                                Err(error) => {
1346                                    let _: Result<(), _> = result_sender.send(Err(error));
1347                                }
1348                            }
1349                        }
1350                    }
1351                }
1352            }
1353            Command::Unsubscribe {
1354                topic,
1355                subscription_id,
1356            } => {
1357                assert!(
1358                    self.swarm.behaviour().gossipsub.is_enabled(),
1359                    "Gossipsub protocol is disabled."
1360                );
1361
1362                if let Entry::Occupied(mut entry) =
1363                    self.topic_subscription_senders.entry(topic.hash())
1364                {
1365                    entry.get_mut().remove(&subscription_id);
1366
1367                    // If last sender was removed - unsubscribe.
1368                    if entry.get().is_empty() {
1369                        entry.remove_entry();
1370
1371                        if let Some(gossipsub) = self.swarm.behaviour_mut().gossipsub.as_mut()
1372                            && !gossipsub.unsubscribe(&topic)
1373                        {
1374                            warn!(
1375                                "Can't unsubscribe from topic {topic} because subscription doesn't \
1376                                exist, this is a logic error in the subspace or swarm libraries"
1377                            );
1378                        }
1379                    }
1380                } else {
1381                    error!(
1382                        "Can't unsubscribe from topic {topic} because subscription doesn't exist, \
1383                        this is a logic error in the subspace library"
1384                    );
1385                }
1386            }
1387            Command::Publish {
1388                topic,
1389                message,
1390                result_sender,
1391            } => {
1392                assert!(
1393                    self.swarm.behaviour().gossipsub.is_enabled(),
1394                    "Gossipsub protocol is disabled"
1395                );
1396
1397                if let Some(gossipsub) = self.swarm.behaviour_mut().gossipsub.as_mut() {
1398                    // Doesn't matter if receiver still waits for response.
1399                    let _: Result<(), _> =
1400                        result_sender.send(gossipsub.publish(topic, message).map(|_message_id| ()));
1401                }
1402            }
1403            Command::GetClosestPeers {
1404                key,
1405                result_sender,
1406                permit,
1407            } => {
1408                let query_id = self.swarm.behaviour_mut().kademlia.get_closest_peers(key);
1409
1410                self.query_id_receivers.insert(
1411                    query_id,
1412                    QueryResultSender::ClosestPeers {
1413                        sender: result_sender,
1414                        _permit: permit,
1415                    },
1416                );
1417            }
1418            Command::GetClosestLocalPeers {
1419                key,
1420                source,
1421                result_sender,
1422            } => {
1423                let source = source.unwrap_or_else(|| *self.swarm.local_peer_id());
1424                let result = self
1425                    .swarm
1426                    .behaviour_mut()
1427                    .kademlia
1428                    .find_closest_local_peers(&KBucketKey::from(key), &source)
1429                    .filter(|peer| !peer.multiaddrs.is_empty())
1430                    .map(|peer| (peer.node_id, peer.multiaddrs))
1431                    .collect();
1432
1433                // Doesn't matter if receiver still waits for response.
1434                let _: Result<(), _> = result_sender.send(result);
1435            }
1436            Command::GenericRequest {
1437                peer_id,
1438                addresses,
1439                protocol_name,
1440                request,
1441                result_sender,
1442            } => {
1443                self.swarm.behaviour_mut().request_response.send_request(
1444                    &peer_id,
1445                    protocol_name,
1446                    request,
1447                    result_sender,
1448                    IfDisconnected::TryConnect,
1449                    addresses,
1450                );
1451            }
1452            Command::GetProviders {
1453                key,
1454                result_sender,
1455                permit,
1456            } => {
1457                let query_id = self
1458                    .swarm
1459                    .behaviour_mut()
1460                    .kademlia
1461                    .get_providers(key.clone());
1462
1463                self.query_id_receivers.insert(
1464                    query_id,
1465                    QueryResultSender::Providers {
1466                        key,
1467                        sender: result_sender,
1468                        _permit: permit,
1469                    },
1470                );
1471            }
1472            Command::BanPeer { peer_id } => {
1473                self.ban_peer(peer_id);
1474            }
1475            Command::Dial { address } => {
1476                let _: Result<(), _> = self.swarm.dial(address);
1477            }
1478            Command::ConnectedPeers { result_sender } => {
1479                let connected_peers = self.swarm.connected_peers().copied().collect();
1480
1481                let _: Result<(), _> = result_sender.send(connected_peers);
1482            }
1483            Command::ConnectedServers { result_sender } => {
1484                let connected_servers = self.connected_servers.iter().copied().collect();
1485
1486                let _: Result<(), _> = result_sender.send(connected_servers);
1487            }
1488            Command::Bootstrap { result_sender } => {
1489                let kademlia = &mut self.swarm.behaviour_mut().kademlia;
1490
1491                match kademlia.bootstrap() {
1492                    Ok(query_id) => {
1493                        if let Some(result_sender) = result_sender {
1494                            self.query_id_receivers.insert(
1495                                query_id,
1496                                QueryResultSender::Bootstrap {
1497                                    sender: result_sender,
1498                                },
1499                            );
1500                        }
1501                    }
1502                    Err(err) => {
1503                        debug!(?err, "Bootstrap error.");
1504                    }
1505                }
1506            }
1507        }
1508    }
1509
1510    fn ban_peer(&mut self, peer_id: PeerId) {
1511        // Remove temporary ban if there is one, before creating a permanent one.
1512        self.temporary_bans.lock().remove(&peer_id);
1513
1514        debug!(?peer_id, "Banning peer on network level");
1515
1516        self.swarm.behaviour_mut().block_list.block_peer(peer_id);
1517        self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id);
1518        self.known_peers_registry
1519            .remove_all_known_peer_addresses(peer_id);
1520    }
1521
1522    fn register_event_metrics(&mut self, swarm_event: &SwarmEvent<Event>) {
1523        if let Some(ref mut metrics) = self.libp2p_metrics {
1524            match swarm_event {
1525                SwarmEvent::Behaviour(Event::Ping(ping_event)) => {
1526                    metrics.record(ping_event);
1527                }
1528                SwarmEvent::Behaviour(Event::Identify(identify_event)) => {
1529                    metrics.record(identify_event.as_ref());
1530                }
1531                SwarmEvent::Behaviour(Event::Kademlia(kademlia_event)) => {
1532                    metrics.record(kademlia_event);
1533                }
1534                SwarmEvent::Behaviour(Event::Gossipsub(gossipsub_event)) => {
1535                    metrics.record(gossipsub_event);
1536                }
1537                // TODO: implement in the upstream repository
1538                // SwarmEvent::Behaviour(Event::RequestResponse(request_response_event)) => {
1539                //     self.metrics.record(request_response_event);
1540                // }
1541                swarm_event => {
1542                    metrics.record(swarm_event);
1543                }
1544            }
1545        }
1546    }
1547
1548    fn log_kademlia_stats(&mut self) {
1549        let mut peer_counter = 0usize;
1550        let mut peer_with_no_address_counter = 0usize;
1551        for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
1552            for entry in kbucket.iter() {
1553                peer_counter += 1;
1554                if entry.node.value.len() == 0 {
1555                    peer_with_no_address_counter += 1;
1556                }
1557            }
1558        }
1559
1560        debug!(
1561            peers = %peer_counter,
1562            peers_with_no_address = %peer_with_no_address_counter,
1563            "Kademlia stats"
1564        );
1565    }
1566}