Skip to main content

ab_networking/
node_runner.rs

1use crate::behavior::persistent_parameters::{
2    KnownPeersRegistry, PeerAddressRemovedEvent, append_p2p_suffix, remove_p2p_suffix,
3};
4use crate::behavior::{Behavior, Event};
5use crate::constructor::DummyRecordStore;
6use crate::constructor::temporary_bans::TemporaryBans;
7use crate::protocols::request_response::request_response_factory::{
8    Event as RequestResponseEvent, IfDisconnected,
9};
10use crate::shared::{Command, CreatedSubscription, PeerDiscovered, Shared};
11use crate::utils::{SubspaceMetrics, is_global_address_or_dns, strip_peer_id};
12use async_lock::Mutex as AsyncMutex;
13use bytes::Bytes;
14use event_listener_primitives::HandlerId;
15use futures::channel::mpsc;
16use futures::future::Fuse;
17use futures::{FutureExt, StreamExt};
18use libp2p::autonat::{Event as AutonatEvent, NatStatus, OutboundProbeEvent};
19use libp2p::core::ConnectedPoint;
20use libp2p::gossipsub::{Event as GossipsubEvent, TopicHash};
21use libp2p::identify::Event as IdentifyEvent;
22use libp2p::kad::{
23    Behaviour as Kademlia, BootstrapOk, Event as KademliaEvent, GetClosestPeersError,
24    GetClosestPeersOk, GetProvidersError, GetProvidersOk, GetRecordError, GetRecordOk,
25    InboundRequest, KBucketKey, PeerRecord, ProgressStep, PutRecordOk, QueryId, QueryResult,
26    Quorum, Record, RecordKey,
27};
28use libp2p::metrics::{Metrics, Recorder};
29use libp2p::multiaddr::Protocol;
30use libp2p::swarm::dial_opts::DialOpts;
31use libp2p::swarm::{DialError, SwarmEvent};
32use libp2p::{Multiaddr, PeerId, Swarm, TransportError};
33use nohash_hasher::IntMap;
34use parking_lot::Mutex;
35use std::collections::hash_map::Entry;
36use std::collections::{HashMap, HashSet};
37use std::net::IpAddr;
38use std::pin::Pin;
39use std::sync::atomic::Ordering;
40use std::sync::{Arc, Weak};
41use std::time::Duration;
42use std::{fmt, slice};
43use tokio::sync::OwnedSemaphorePermit;
44use tokio::task::yield_now;
45use tokio::time::Sleep;
46use tracing::{debug, error, trace, warn};
47
48enum QueryResultSender {
49    Value {
50        sender: mpsc::UnboundedSender<PeerRecord>,
51        // Just holding onto permit while data structure is not dropped
52        _permit: OwnedSemaphorePermit,
53    },
54    ClosestPeers {
55        sender: mpsc::UnboundedSender<PeerId>,
56        // Just holding onto permit while data structure is not dropped
57        _permit: Option<OwnedSemaphorePermit>,
58    },
59    Providers {
60        key: RecordKey,
61        sender: mpsc::UnboundedSender<PeerId>,
62        // Just holding onto permit while data structure is not dropped
63        _permit: Option<OwnedSemaphorePermit>,
64    },
65    PutValue {
66        sender: mpsc::UnboundedSender<()>,
67        // Just holding onto permit while data structure is not dropped
68        _permit: OwnedSemaphorePermit,
69    },
70    Bootstrap {
71        sender: mpsc::UnboundedSender<()>,
72    },
73}
74
75#[derive(Debug, Default)]
76enum BootstrapCommandState {
77    #[default]
78    NotStarted,
79    InProgress(mpsc::UnboundedReceiver<()>),
80    Finished,
81}
82
83/// Runner for the Node.
84#[must_use = "Node does not function properly unless its runner is driven forward"]
85pub struct NodeRunner {
86    /// Should non-global addresses be added to the DHT?
87    allow_non_global_addresses_in_dht: bool,
88    /// Whether node is listening on some addresses
89    is_listening: bool,
90    command_receiver: mpsc::Receiver<Command>,
91    swarm: Swarm<Behavior>,
92    shared_weak: Weak<Shared>,
93    /// How frequently should random queries be done using Kademlia DHT to populate routing table.
94    next_random_query_interval: Duration,
95    query_id_receivers: HashMap<QueryId, QueryResultSender>,
96    /// Global subscription counter, is assigned to every (logical) subscription and is used for
97    /// unsubscribing.
98    next_subscription_id: usize,
99    /// Topic subscription senders for logical subscriptions (multiple logical subscriptions can be
100    /// present for the same physical subscription).
101    topic_subscription_senders: HashMap<TopicHash, IntMap<usize, mpsc::UnboundedSender<Bytes>>>,
102    random_query_timeout: Pin<Box<Fuse<Sleep>>>,
103    /// Defines an interval between periodical tasks.
104    periodical_tasks_interval: Pin<Box<Fuse<Sleep>>>,
105    /// Manages the networking parameters like known peers and addresses
106    known_peers_registry: Box<dyn KnownPeersRegistry>,
107    connected_servers: HashSet<PeerId>,
108    /// Defines set of peers with a permanent connection (and reconnection if necessary).
109    reserved_peers: HashMap<PeerId, Multiaddr>,
110    /// Temporarily banned peers.
111    temporary_bans: Arc<Mutex<TemporaryBans>>,
112    /// Libp2p Prometheus metrics.
113    libp2p_metrics: Option<Metrics>,
114    /// Subspace Prometheus metrics.
115    metrics: Option<SubspaceMetrics>,
116    /// Mapping from specific peer to ip addresses
117    peer_ip_addresses: HashMap<PeerId, HashSet<IpAddr>>,
118    /// Defines protocol version for the network peers. Affects network partition.
119    protocol_version: String,
120    /// Addresses to bootstrap Kademlia network
121    bootstrap_addresses: Vec<Multiaddr>,
122    /// Ensures a single bootstrap on run() invocation.
123    bootstrap_command_state: Arc<AsyncMutex<BootstrapCommandState>>,
124    /// Receives an event on peer address removal from the persistent storage.
125    removed_addresses_rx: mpsc::UnboundedReceiver<PeerAddressRemovedEvent>,
126    /// Optional storage for the [`HandlerId`] of the address removal task.
127    /// We keep to stop the task along with the rest of the networking.
128    _address_removal_task_handler_id: Option<HandlerId>,
129}
130
131impl fmt::Debug for NodeRunner {
132    #[inline]
133    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134        f.debug_struct("NodeRunner").finish_non_exhaustive()
135    }
136}
137
138// Helper struct for NodeRunner configuration (clippy requirement).
139pub(crate) struct NodeRunnerConfig {
140    pub(crate) allow_non_global_addresses_in_dht: bool,
141    /// Whether node is listening on some addresses
142    pub(crate) is_listening: bool,
143    pub(crate) command_receiver: mpsc::Receiver<Command>,
144    pub(crate) swarm: Swarm<Behavior>,
145    pub(crate) shared_weak: Weak<Shared>,
146    pub(crate) next_random_query_interval: Duration,
147    pub(crate) known_peers_registry: Box<dyn KnownPeersRegistry>,
148    pub(crate) reserved_peers: HashMap<PeerId, Multiaddr>,
149    pub(crate) temporary_bans: Arc<Mutex<TemporaryBans>>,
150    pub(crate) libp2p_metrics: Option<Metrics>,
151    pub(crate) metrics: Option<SubspaceMetrics>,
152    pub(crate) protocol_version: String,
153    pub(crate) bootstrap_addresses: Vec<Multiaddr>,
154}
155
156impl NodeRunner {
157    pub(crate) fn new(
158        NodeRunnerConfig {
159            allow_non_global_addresses_in_dht,
160            is_listening,
161            command_receiver,
162            swarm,
163            shared_weak,
164            next_random_query_interval,
165            mut known_peers_registry,
166            reserved_peers,
167            temporary_bans,
168            libp2p_metrics,
169            metrics,
170            protocol_version,
171            bootstrap_addresses,
172        }: NodeRunnerConfig,
173    ) -> Self {
174        // Setup the address removal events exchange between persistent params storage and Kademlia.
175        let (removed_addresses_tx, removed_addresses_rx) = mpsc::unbounded();
176        let mut address_removal_task_handler_id = None;
177        if let Some(handler_id) = known_peers_registry.on_unreachable_address({
178            Arc::new(move |event| {
179                if let Err(error) = removed_addresses_tx.unbounded_send(event.clone()) {
180                    debug!(?error, ?event, "Cannot send PeerAddressRemovedEvent");
181                }
182            })
183        }) {
184            address_removal_task_handler_id.replace(handler_id);
185        }
186
187        Self {
188            allow_non_global_addresses_in_dht,
189            is_listening,
190            command_receiver,
191            swarm,
192            shared_weak,
193            next_random_query_interval,
194            query_id_receivers: HashMap::default(),
195            next_subscription_id: 0,
196            topic_subscription_senders: HashMap::default(),
197            // We'll make the first query right away and continue at the interval.
198            random_query_timeout: Box::pin(tokio::time::sleep(Duration::from_secs(0)).fuse()),
199            // We'll make the first dial right away and continue at the interval.
200            periodical_tasks_interval: Box::pin(tokio::time::sleep(Duration::from_secs(0)).fuse()),
201            known_peers_registry,
202            connected_servers: HashSet::new(),
203            reserved_peers,
204            temporary_bans,
205            libp2p_metrics,
206            metrics,
207            peer_ip_addresses: HashMap::new(),
208            protocol_version,
209            bootstrap_addresses,
210            bootstrap_command_state: Arc::new(AsyncMutex::new(BootstrapCommandState::default())),
211            removed_addresses_rx,
212            _address_removal_task_handler_id: address_removal_task_handler_id,
213        }
214    }
215
216    /// Drives the main networking future forward.
217    pub async fn run(&mut self) {
218        if self.is_listening {
219            // Wait for listen addresses, otherwise we will get ephemeral addresses in external
220            // address candidates that we do not want
221            loop {
222                if self.swarm.listeners().next().is_some() {
223                    break;
224                }
225
226                if let Some(swarm_event) = self.swarm.next().await {
227                    self.register_event_metrics(&swarm_event);
228                    self.handle_swarm_event(swarm_event).await;
229                } else {
230                    break;
231                }
232            }
233        }
234
235        self.bootstrap().await;
236
237        loop {
238            futures::select! {
239                _ = &mut self.random_query_timeout => {
240                    self.handle_random_query_interval();
241                    // Increase interval 2x, but to at most 1 minute.
242                    self.random_query_timeout =
243                        Box::pin(tokio::time::sleep(self.next_random_query_interval).fuse());
244                    self.next_random_query_interval =
245                        (self.next_random_query_interval * 2).min(Duration::from_mins(1));
246                },
247                swarm_event = self.swarm.next() => {
248                    if let Some(swarm_event) = swarm_event {
249                        self.register_event_metrics(&swarm_event);
250                        self.handle_swarm_event(swarm_event).await;
251                    } else {
252                        break;
253                    }
254                },
255                command = self.command_receiver.next() => {
256                    if let Some(command) = command {
257                        self.handle_command(command);
258                    } else {
259                        break;
260                    }
261                },
262                _ = self.known_peers_registry.run().fuse() => {
263                    trace!("Network parameters registry runner exited");
264                },
265                _ = &mut self.periodical_tasks_interval => {
266                    self.handle_periodical_tasks().await;
267
268                    self.periodical_tasks_interval =
269                        Box::pin(tokio::time::sleep(Duration::from_secs(5)).fuse());
270                },
271                event = self.removed_addresses_rx.select_next_some() => {
272                    self.handle_removed_address_event(event);
273                },
274            }
275
276            // Allow to exit from busy loop during graceful shutdown
277            yield_now().await;
278        }
279    }
280
281    /// Bootstraps Kademlia network
282    async fn bootstrap(&mut self) {
283        // Add bootstrap nodes first to make sure there is space for them in k-buckets
284        for (peer_id, address) in strip_peer_id(self.bootstrap_addresses.clone()) {
285            self.swarm
286                .behaviour_mut()
287                .kademlia
288                .add_address(&peer_id, address);
289        }
290
291        let known_peers = self.known_peers_registry.all_known_peers().await;
292
293        if !known_peers.is_empty() {
294            for (peer_id, addresses) in known_peers {
295                for address in addresses.clone() {
296                    let address = match address.with_p2p(peer_id) {
297                        Ok(address) => address,
298                        Err(address) => {
299                            warn!(%peer_id, %address, "Failed to add peer ID to known peer address");
300                            break;
301                        }
302                    };
303                    self.swarm
304                        .behaviour_mut()
305                        .kademlia
306                        .add_address(&peer_id, address);
307                }
308
309                if let Err(error) = self
310                    .swarm
311                    .dial(DialOpts::peer_id(peer_id).addresses(addresses).build())
312                {
313                    warn!(%peer_id, %error, "Failed to dial peer during bootstrapping");
314                }
315            }
316
317            // Do bootstrap asynchronously
318            self.handle_command(Command::Bootstrap {
319                result_sender: None,
320            });
321            return;
322        }
323
324        let bootstrap_command_state = Arc::clone(&self.bootstrap_command_state);
325        let mut bootstrap_command_state = bootstrap_command_state.lock().await;
326        let bootstrap_command_receiver = match &mut *bootstrap_command_state {
327            BootstrapCommandState::NotStarted => {
328                debug!("Bootstrap started.");
329
330                let (bootstrap_command_sender, bootstrap_command_receiver) = mpsc::unbounded();
331
332                self.handle_command(Command::Bootstrap {
333                    result_sender: Some(bootstrap_command_sender),
334                });
335
336                *bootstrap_command_state =
337                    BootstrapCommandState::InProgress(bootstrap_command_receiver);
338                match &mut *bootstrap_command_state {
339                    BootstrapCommandState::InProgress(bootstrap_command_receiver) => {
340                        bootstrap_command_receiver
341                    }
342                    _ => {
343                        unreachable!("Was just set to that exact value");
344                    }
345                }
346            }
347            BootstrapCommandState::InProgress(bootstrap_command_receiver) => {
348                bootstrap_command_receiver
349            }
350            BootstrapCommandState::Finished => {
351                return;
352            }
353        };
354
355        let mut bootstrap_step = 0usize;
356        loop {
357            futures::select! {
358                swarm_event = self.swarm.next() => {
359                    if let Some(swarm_event) = swarm_event {
360                        self.register_event_metrics(&swarm_event);
361                        self.handle_swarm_event(swarm_event).await;
362                    } else {
363                        break;
364                    }
365                },
366                result = bootstrap_command_receiver.next() => {
367                    if result.is_some() {
368                        debug!(%bootstrap_step, "Kademlia bootstrapping...");
369                        bootstrap_step += 1;
370                    } else {
371                        break;
372                    }
373                }
374            }
375        }
376
377        debug!("Bootstrap finished.");
378        *bootstrap_command_state = BootstrapCommandState::Finished;
379    }
380
381    /// Handles periodical tasks.
382    async fn handle_periodical_tasks(&mut self) {
383        // Log current connections.
384        let network_info = self.swarm.network_info();
385        let connections = network_info.connection_counters();
386
387        debug!(?connections, "Current connections and limits.");
388
389        // Renew known external addresses.
390        let mut external_addresses = self.swarm.external_addresses().cloned().collect::<Vec<_>>();
391
392        if let Some(shared) = self.shared_weak.upgrade() {
393            debug!(?external_addresses, "Renew external addresses.");
394            let mut addresses = shared.external_addresses.lock();
395            addresses.clear();
396            addresses.append(&mut external_addresses);
397        }
398
399        self.log_kademlia_stats();
400    }
401
402    fn handle_random_query_interval(&mut self) {
403        let random_peer_id = PeerId::random();
404
405        trace!("Starting random Kademlia query for {}", random_peer_id);
406
407        self.swarm
408            .behaviour_mut()
409            .kademlia
410            .get_closest_peers(random_peer_id);
411    }
412
413    fn handle_removed_address_event(&mut self, event: PeerAddressRemovedEvent) {
414        trace!(?event, "Peer address removed event.");
415
416        let bootstrap_node_ids = strip_peer_id(self.bootstrap_addresses.clone())
417            .into_iter()
418            .map(|(peer_id, _)| peer_id)
419            .collect::<Vec<_>>();
420
421        if bootstrap_node_ids.contains(&event.peer_id) {
422            debug!(
423                ?event,
424                ?bootstrap_node_ids,
425                "Skipped removing bootstrap node from Kademlia buckets."
426            );
427
428            return;
429        }
430
431        // Remove both versions of the address
432        self.swarm.behaviour_mut().kademlia.remove_address(
433            &event.peer_id,
434            &append_p2p_suffix(event.peer_id, event.address.clone()),
435        );
436
437        self.swarm
438            .behaviour_mut()
439            .kademlia
440            .remove_address(&event.peer_id, &remove_p2p_suffix(event.address));
441    }
442
443    fn handle_remove_listeners(&mut self, removed_listeners: &[Multiaddr]) {
444        let Some(shared) = self.shared_weak.upgrade() else {
445            return;
446        };
447
448        // Remove both versions of the address
449        let peer_id = shared.id;
450        shared.listeners.lock().retain(|old_listener| {
451            !removed_listeners.contains(&append_p2p_suffix(peer_id, old_listener.clone()))
452                && !removed_listeners.contains(&remove_p2p_suffix(old_listener.clone()))
453        });
454    }
455
456    async fn handle_swarm_event(&mut self, swarm_event: SwarmEvent<Event>) {
457        match swarm_event {
458            SwarmEvent::Behaviour(Event::Identify(event)) => {
459                self.handle_identify_event(event);
460            }
461            SwarmEvent::Behaviour(Event::Kademlia(event)) => {
462                self.handle_kademlia_event(event);
463            }
464            SwarmEvent::Behaviour(Event::Gossipsub(event)) => {
465                self.handle_gossipsub_event(event);
466            }
467            SwarmEvent::Behaviour(Event::RequestResponse(event)) => {
468                self.handle_request_response_event(event);
469            }
470            SwarmEvent::Behaviour(Event::Autonat(event)) => {
471                self.handle_autonat_event(event);
472            }
473            ref event @ SwarmEvent::NewListenAddr { ref address, .. } => {
474                trace!(?event, "New local listener  event.");
475
476                let Some(shared) = self.shared_weak.upgrade() else {
477                    return;
478                };
479                shared.listeners.lock().push(address.clone());
480                shared.handlers.new_listener.call_simple(address);
481            }
482            ref event @ SwarmEvent::ListenerClosed { ref addresses, .. } => {
483                trace!(?event, "Local listener closed event.");
484                self.handle_remove_listeners(addresses);
485            }
486            ref event @ SwarmEvent::ExpiredListenAddr { ref address, .. } => {
487                trace!(?event, "Local listener expired event.");
488                self.handle_remove_listeners(slice::from_ref(address));
489            }
490            SwarmEvent::ConnectionEstablished {
491                peer_id,
492                endpoint,
493                num_established,
494                ..
495            } => {
496                // Save known addresses that were successfully dialed.
497                if let ConnectedPoint::Dialer { address, .. } = &endpoint {
498                    // filter non-global addresses when non-globals addresses are disabled
499                    if self.allow_non_global_addresses_in_dht || is_global_address_or_dns(address) {
500                        self.known_peers_registry
501                            .add_known_peer(peer_id, vec![address.clone()])
502                            .await;
503                    }
504                }
505
506                let Some(shared) = self.shared_weak.upgrade() else {
507                    return;
508                };
509
510                let is_reserved_peer = self.reserved_peers.contains_key(&peer_id);
511                debug!(
512                    %peer_id,
513                    %is_reserved_peer,
514                    ?endpoint,
515                    %num_established,
516                    "Connection established"
517                );
518
519                let maybe_remote_ip =
520                    endpoint
521                        .get_remote_address()
522                        .iter()
523                        .find_map(|protocol| match protocol {
524                            Protocol::Ip4(ip) => Some(IpAddr::V4(ip)),
525                            Protocol::Ip6(ip) => Some(IpAddr::V6(ip)),
526                            _ => None,
527                        });
528                if let Some(ip) = maybe_remote_ip {
529                    self.peer_ip_addresses
530                        .entry(peer_id)
531                        .and_modify(|ips| {
532                            ips.insert(ip);
533                        })
534                        .or_insert(HashSet::from([ip]));
535                }
536
537                let num_established_peer_connections = shared
538                    .num_established_peer_connections
539                    .fetch_add(1, Ordering::SeqCst)
540                    + 1;
541
542                shared
543                    .handlers
544                    .num_established_peer_connections_change
545                    .call_simple(&num_established_peer_connections);
546
547                // A new connection
548                if num_established.get() == 1 {
549                    shared.handlers.connected_peer.call_simple(&peer_id);
550                }
551
552                if let Some(metrics) = self.metrics.as_ref() {
553                    metrics.inc_established_connections();
554                }
555            }
556            SwarmEvent::ConnectionClosed {
557                peer_id,
558                num_established,
559                cause,
560                ..
561            } => {
562                let Some(shared) = self.shared_weak.upgrade() else {
563                    return;
564                };
565
566                debug!(
567                    %peer_id,
568                    ?cause,
569                    %num_established,
570                    "Connection closed with peer"
571                );
572
573                if num_established == 0 {
574                    self.peer_ip_addresses.remove(&peer_id);
575                    self.connected_servers.remove(&peer_id);
576                }
577                let num_established_peer_connections = shared
578                    .num_established_peer_connections
579                    .fetch_sub(1, Ordering::SeqCst)
580                    - 1;
581
582                shared
583                    .handlers
584                    .num_established_peer_connections_change
585                    .call_simple(&num_established_peer_connections);
586
587                // No more connections
588                if num_established == 0 {
589                    shared.handlers.disconnected_peer.call_simple(&peer_id);
590                }
591
592                if let Some(metrics) = self.metrics.as_ref() {
593                    metrics.dec_established_connections();
594                }
595            }
596            SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
597                if let Some(peer_id) = &peer_id {
598                    let should_ban_temporarily =
599                        self.should_temporary_ban_on_dial_error(peer_id, &error);
600
601                    trace!(%should_ban_temporarily, "Temporary bans conditions.");
602
603                    if should_ban_temporarily {
604                        self.temporary_bans.lock().create_or_extend(peer_id);
605                        debug!(%peer_id, ?error, "Peer was temporarily banned.");
606                    }
607                }
608
609                debug!(
610                    ?peer_id,
611                    ?error,
612                    "SwarmEvent::OutgoingConnectionError for peer."
613                );
614
615                match error {
616                    DialError::Transport(ref addresses) => {
617                        for (addr, _) in addresses {
618                            trace!(?error, ?peer_id, %addr, "SwarmEvent::OutgoingConnectionError (DialError::Transport) for peer.");
619                            if let Some(peer_id) = peer_id {
620                                self.known_peers_registry
621                                    .remove_known_peer_addresses(peer_id, vec![addr.clone()])
622                                    .await;
623                            }
624                        }
625                    }
626                    DialError::WrongPeerId { obtained, .. } => {
627                        trace!(?error, ?peer_id, obtained_peer_id=?obtained, "SwarmEvent::WrongPeerId (DialError::WrongPeerId) for peer.");
628
629                        if let Some(ref peer_id) = peer_id {
630                            let kademlia = &mut self.swarm.behaviour_mut().kademlia;
631                            let _: Option<_> = kademlia.remove_peer(peer_id);
632                        }
633                    }
634                    _ => {
635                        trace!(?error, ?peer_id, "SwarmEvent::OutgoingConnectionError");
636                    }
637                }
638            }
639            SwarmEvent::NewExternalAddrCandidate { address } => {
640                trace!(%address, "External address candidate");
641            }
642            SwarmEvent::ExternalAddrConfirmed { address } => {
643                debug!(%address, "Confirmed external address");
644
645                let connected_peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
646                self.swarm.behaviour_mut().identify.push(connected_peers);
647            }
648            SwarmEvent::ExternalAddrExpired { address } => {
649                debug!(%address, "External address expired");
650
651                let connected_peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
652                self.swarm.behaviour_mut().identify.push(connected_peers);
653            }
654            other => {
655                trace!("Other swarm event: {:?}", other);
656            }
657        }
658    }
659
660    fn should_temporary_ban_on_dial_error(&self, peer_id: &PeerId, error: &DialError) -> bool {
661        // TODO: Replace with banning of addresses rather peer IDs if this helps
662        if true {
663            return false;
664        }
665
666        // Ban temporarily only peers without active connections.
667        if self.swarm.is_connected(peer_id) {
668            return false;
669        }
670
671        match &error {
672            DialError::Transport(addresses) => {
673                for (_, error) in addresses {
674                    match error {
675                        TransportError::MultiaddrNotSupported(_) => {
676                            return true;
677                        }
678                        TransportError::Other(_) => {
679                            // Ignore "temporary ban" errors
680                            if self.temporary_bans.lock().is_banned(peer_id) {
681                                return false;
682                            }
683                        }
684                    }
685                }
686                // Other errors that are not related to temporary bans
687                true
688            }
689            DialError::LocalPeerId { .. } => {
690                // We don't ban ourselves
691                debug!("Local peer dial attempt detected.");
692
693                false
694            }
695            DialError::NoAddresses => {
696                // Let's wait until we get addresses
697                true
698            }
699            DialError::DialPeerConditionFalse(_) => {
700                // These are local conditions, we don't need to ban remote peers
701                false
702            }
703            DialError::Aborted => {
704                // Seems like a transient event
705                false
706            }
707            DialError::WrongPeerId { .. } => {
708                // It's likely that peer was restarted with different identity
709                false
710            }
711            DialError::Denied { .. } => {
712                // We exceeded the connection limits or we hit a black listed peer
713                false
714            }
715        }
716    }
717
718    fn handle_identify_event(&mut self, event: IdentifyEvent) {
719        let local_peer_id = *self.swarm.local_peer_id();
720
721        if let IdentifyEvent::Received {
722            peer_id, mut info, ..
723        } = event
724        {
725            debug!(?peer_id, protocols = ?info.protocols, "IdentifyEvent::Received");
726
727            // Check for network partition
728            if info.protocol_version != self.protocol_version {
729                debug!(
730                    %local_peer_id,
731                    %peer_id,
732                    local_protocol_version = %self.protocol_version,
733                    peer_protocol_version = %info.protocol_version,
734                    "Peer has different protocol version, banning temporarily",
735                );
736
737                self.temporary_bans.lock().create_or_extend(&peer_id);
738                // Forget about this peer until they upgrade
739                let _: Result<(), ()> = self.swarm.disconnect_peer_id(peer_id);
740                self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id);
741                self.known_peers_registry
742                    .remove_all_known_peer_addresses(peer_id);
743
744                return;
745            }
746
747            // Remove temporary ban if there was any
748            self.temporary_bans.lock().remove(&peer_id);
749
750            if info.listen_addrs.len() > 30 {
751                debug!(
752                    %local_peer_id,
753                    %peer_id,
754                    "Node has reported more than 30 addresses; it is identified by {} and {}",
755                    info.protocol_version, info.agent_version
756                );
757                info.listen_addrs.truncate(30);
758            }
759
760            let kademlia = &mut self.swarm.behaviour_mut().kademlia;
761            let full_kademlia_support = kademlia
762                .protocol_names()
763                .iter()
764                .all(|local_protocol| info.protocols.contains(local_protocol));
765
766            if full_kademlia_support {
767                let received_addresses = info
768                    .listen_addrs
769                    .into_iter()
770                    .filter(|address| {
771                        if self.allow_non_global_addresses_in_dht
772                            || is_global_address_or_dns(address)
773                        {
774                            true
775                        } else {
776                            trace!(
777                                %local_peer_id,
778                                %peer_id,
779                                %address,
780                                "Ignoring self-reported non-global address",
781                            );
782
783                            false
784                        }
785                    })
786                    .collect::<Vec<_>>();
787                let received_address_strings = received_addresses
788                    .iter()
789                    .map(ToString::to_string)
790                    .collect::<Vec<_>>();
791                let old_addresses = kademlia
792                    .kbucket(peer_id)
793                    .and_then(|peers| {
794                        let key = peer_id.into();
795                        peers.iter().find_map(|peer| {
796                            (peer.node.key == &key).then_some(
797                                peer.node
798                                    .value
799                                    .iter()
800                                    .filter(|existing_address| {
801                                        let existing_address = existing_address.to_string();
802
803                                        !received_address_strings.iter().any(|received_address| {
804                                            received_address.starts_with(&existing_address)
805                                                || existing_address.starts_with(received_address)
806                                        })
807                                    })
808                                    .cloned()
809                                    .collect::<Vec<_>>(),
810                            )
811                        })
812                    })
813                    .unwrap_or_default();
814
815                for address in received_addresses {
816                    debug!(
817                        %local_peer_id,
818                        %peer_id,
819                        %address,
820                        protocol_names = ?kademlia.protocol_names(),
821                        "Adding self-reported address to Kademlia DHT",
822                    );
823
824                    kademlia.add_address(&peer_id, address);
825                }
826
827                for old_address in old_addresses {
828                    trace!(
829                        %local_peer_id,
830                        %peer_id,
831                        %old_address,
832                        "Removing old self-reported address from Kademlia DHT",
833                    );
834
835                    kademlia.remove_address(&peer_id, &old_address);
836                }
837
838                self.connected_servers.insert(peer_id);
839            } else {
840                debug!(
841                    %local_peer_id,
842                    %peer_id,
843                    peer_protocols = ?info.protocols,
844                    protocol_names = ?kademlia.protocol_names(),
845                    "Peer doesn't support our Kademlia DHT protocol",
846                );
847
848                kademlia.remove_peer(&peer_id);
849                self.connected_servers.remove(&peer_id);
850            }
851        }
852    }
853
854    fn handle_kademlia_event(&mut self, event: KademliaEvent) {
855        trace!("Kademlia event: {:?}", event);
856
857        match event {
858            KademliaEvent::InboundRequest {
859                request: InboundRequest::AddProvider { record, .. },
860            } => {
861                debug!("Unexpected AddProvider request received: {:?}", record);
862            }
863            KademliaEvent::UnroutablePeer { peer } => {
864                debug!(%peer, "Unroutable peer detected");
865
866                self.swarm.behaviour_mut().kademlia.remove_peer(&peer);
867
868                if let Some(shared) = self.shared_weak.upgrade() {
869                    shared
870                        .handlers
871                        .peer_discovered
872                        .call_simple(&PeerDiscovered::UnroutablePeer { peer_id: peer });
873                }
874            }
875            KademliaEvent::RoutablePeer { peer, address } => {
876                debug!(?address, "Routable peer detected: {:?}", peer);
877
878                if let Some(shared) = self.shared_weak.upgrade() {
879                    shared
880                        .handlers
881                        .peer_discovered
882                        .call_simple(&PeerDiscovered::RoutablePeer {
883                            peer_id: peer,
884                            address,
885                        });
886                }
887            }
888            KademliaEvent::PendingRoutablePeer { peer, address } => {
889                debug!(?address, "Pending routable peer detected: {:?}", peer);
890
891                if let Some(shared) = self.shared_weak.upgrade() {
892                    shared
893                        .handlers
894                        .peer_discovered
895                        .call_simple(&PeerDiscovered::RoutablePeer {
896                            peer_id: peer,
897                            address,
898                        });
899                }
900            }
901            KademliaEvent::OutboundQueryProgressed {
902                step: ProgressStep { last, .. },
903                id,
904                result: QueryResult::GetClosestPeers(result),
905                ..
906            } => {
907                let mut cancelled = false;
908                if let Some(QueryResultSender::ClosestPeers { sender, .. }) =
909                    self.query_id_receivers.get(&id)
910                {
911                    match result {
912                        Ok(GetClosestPeersOk { key, peers }) => {
913                            trace!(
914                                "Get closest peers query for {} yielded {} results",
915                                hex::encode(key),
916                                peers.len(),
917                            );
918
919                            if peers.is_empty()
920                                // Connected peers collection is not empty.
921                                && self.swarm.connected_peers().next().is_some()
922                            {
923                                debug!("Random Kademlia query has yielded empty list of peers");
924                            }
925
926                            for peer in peers {
927                                cancelled = Self::unbounded_send_and_cancel_on_error(
928                                    &mut self.swarm.behaviour_mut().kademlia,
929                                    sender,
930                                    peer.peer_id,
931                                    "GetClosestPeersOk",
932                                    &id,
933                                ) || cancelled;
934                            }
935                        }
936                        Err(GetClosestPeersError::Timeout { key, peers }) => {
937                            debug!(
938                                "Get closest peers query for {} timed out with {} results",
939                                hex::encode(key),
940                                peers.len(),
941                            );
942
943                            for peer in peers {
944                                cancelled = Self::unbounded_send_and_cancel_on_error(
945                                    &mut self.swarm.behaviour_mut().kademlia,
946                                    sender,
947                                    peer.peer_id,
948                                    "GetClosestPeersError::Timeout",
949                                    &id,
950                                ) || cancelled;
951                            }
952                        }
953                    }
954                }
955
956                if last || cancelled {
957                    // There will be no more progress
958                    self.query_id_receivers.remove(&id);
959                }
960            }
961            KademliaEvent::OutboundQueryProgressed {
962                step: ProgressStep { last, .. },
963                id,
964                result: QueryResult::GetRecord(result),
965                ..
966            } => {
967                let mut cancelled = false;
968                if let Some(QueryResultSender::Value { sender, .. }) =
969                    self.query_id_receivers.get(&id)
970                {
971                    match result {
972                        Ok(GetRecordOk::FoundRecord(rec)) => {
973                            trace!(
974                                key = hex::encode(&rec.record.key),
975                                "Get record query succeeded",
976                            );
977
978                            cancelled = Self::unbounded_send_and_cancel_on_error(
979                                &mut self.swarm.behaviour_mut().kademlia,
980                                sender,
981                                rec,
982                                "GetRecordOk",
983                                &id,
984                            ) || cancelled;
985                        }
986                        Ok(GetRecordOk::FinishedWithNoAdditionalRecord { .. }) => {
987                            trace!("Get record query yielded no results");
988                        }
989                        Err(error) => match error {
990                            GetRecordError::NotFound { key, .. } => {
991                                debug!(
992                                    key = hex::encode(&key),
993                                    "Get record query failed with no results",
994                                );
995                            }
996                            GetRecordError::Timeout { key } => {
997                                debug!(key = hex::encode(&key), "Get record query timed out");
998                            }
999                        },
1000                    }
1001                }
1002
1003                if last || cancelled {
1004                    // There will be no more progress
1005                    self.query_id_receivers.remove(&id);
1006                }
1007            }
1008            KademliaEvent::OutboundQueryProgressed {
1009                step: ProgressStep { last, .. },
1010                id,
1011                result: QueryResult::GetProviders(result),
1012                ..
1013            } => {
1014                let mut cancelled = false;
1015                if let Some(QueryResultSender::Providers { key, sender, .. }) =
1016                    self.query_id_receivers.get(&id)
1017                {
1018                    match result {
1019                        Ok(GetProvidersOk::FoundProviders { key, providers }) => {
1020                            trace!(
1021                                key = hex::encode(&key),
1022                                "Get providers query yielded {} results",
1023                                providers.len(),
1024                            );
1025
1026                            for provider in providers {
1027                                cancelled = Self::unbounded_send_and_cancel_on_error(
1028                                    &mut self.swarm.behaviour_mut().kademlia,
1029                                    sender,
1030                                    provider,
1031                                    "GetProvidersOk",
1032                                    &id,
1033                                ) || cancelled;
1034                            }
1035                        }
1036                        Ok(GetProvidersOk::FinishedWithNoAdditionalRecord { closest_peers }) => {
1037                            trace!(
1038                                key = hex::encode(key),
1039                                closest_peers = %closest_peers.len(),
1040                                "Get providers query yielded no results"
1041                            );
1042                        }
1043                        Err(error) => {
1044                            let GetProvidersError::Timeout { key, .. } = error;
1045
1046                            debug!(
1047                                key = hex::encode(&key),
1048                                "Get providers query failed with no results",
1049                            );
1050                        }
1051                    }
1052                }
1053
1054                if last || cancelled {
1055                    // There will be no more progress
1056                    self.query_id_receivers.remove(&id);
1057                }
1058            }
1059            KademliaEvent::OutboundQueryProgressed {
1060                step: ProgressStep { last, .. },
1061                id,
1062                result: QueryResult::PutRecord(result),
1063                ..
1064            } => {
1065                let mut cancelled = false;
1066                if let Some(QueryResultSender::PutValue { sender, .. }) =
1067                    self.query_id_receivers.get(&id)
1068                {
1069                    match result {
1070                        Ok(PutRecordOk { key }) => {
1071                            trace!("Put record query for {} succeeded", hex::encode(&key));
1072
1073                            cancelled = Self::unbounded_send_and_cancel_on_error(
1074                                &mut self.swarm.behaviour_mut().kademlia,
1075                                sender,
1076                                (),
1077                                "PutRecordOk",
1078                                &id,
1079                            ) || cancelled;
1080                        }
1081                        Err(error) => {
1082                            debug!(?error, "Put record query failed.");
1083                        }
1084                    }
1085                }
1086
1087                if last || cancelled {
1088                    // There will be no more progress
1089                    self.query_id_receivers.remove(&id);
1090                }
1091            }
1092            KademliaEvent::OutboundQueryProgressed {
1093                step: ProgressStep { last, count },
1094                id,
1095                result: QueryResult::Bootstrap(result),
1096                stats,
1097            } => {
1098                debug!(?stats, %last, %count, ?id, ?result, "Bootstrap OutboundQueryProgressed step.");
1099
1100                let mut cancelled = false;
1101                if let Some(QueryResultSender::Bootstrap { sender }) =
1102                    self.query_id_receivers.get_mut(&id)
1103                {
1104                    match result {
1105                        Ok(BootstrapOk {
1106                            peer,
1107                            num_remaining,
1108                        }) => {
1109                            trace!(%peer, %num_remaining, %last, "Bootstrap query step succeeded");
1110
1111                            cancelled = Self::unbounded_send_and_cancel_on_error(
1112                                &mut self.swarm.behaviour_mut().kademlia,
1113                                sender,
1114                                (),
1115                                "Bootstrap",
1116                                &id,
1117                            ) || cancelled;
1118                        }
1119                        Err(error) => {
1120                            debug!(?error, "Bootstrap query failed.");
1121                        }
1122                    }
1123                }
1124
1125                if last || cancelled {
1126                    // There will be no more progress
1127                    self.query_id_receivers.remove(&id);
1128                }
1129            }
1130            _ => {}
1131        }
1132    }
1133
1134    // Returns `true` if query was cancelled
1135    fn unbounded_send_and_cancel_on_error<T>(
1136        kademlia: &mut Kademlia<DummyRecordStore>,
1137        sender: &mpsc::UnboundedSender<T>,
1138        value: T,
1139        channel: &'static str,
1140        id: &QueryId,
1141    ) -> bool {
1142        if sender.unbounded_send(value).is_err() {
1143            debug!("{} channel was dropped", channel);
1144
1145            // Cancel query
1146            if let Some(mut query) = kademlia.query_mut(id) {
1147                query.finish();
1148            }
1149            true
1150        } else {
1151            false
1152        }
1153    }
1154
1155    fn handle_gossipsub_event(&mut self, event: GossipsubEvent) {
1156        if let GossipsubEvent::Message { message, .. } = event
1157            && let Some(senders) = self.topic_subscription_senders.get(&message.topic)
1158        {
1159            let bytes = Bytes::from(message.data);
1160
1161            for sender in senders.values() {
1162                // Doesn't matter if receiver is still listening for messages or not.
1163                let _: Result<(), _> = sender.unbounded_send(bytes.clone());
1164            }
1165        }
1166    }
1167
1168    fn handle_request_response_event(&mut self, event: RequestResponseEvent) {
1169        // No actions on statistics events.
1170        trace!("Request response event: {:?}", event);
1171    }
1172
1173    fn handle_autonat_event(&mut self, event: AutonatEvent) {
1174        trace!(?event, "Autonat event received.");
1175        let autonat = &self.swarm.behaviour().autonat;
1176        debug!(
1177            public_address=?autonat.public_address(),
1178            confidence=%autonat.confidence(),
1179            "Current public address confidence."
1180        );
1181
1182        match event {
1183            AutonatEvent::InboundProbe(_inbound_probe_event) => {
1184                // We do not care about this event
1185            }
1186            AutonatEvent::OutboundProbe(outbound_probe_event) => {
1187                match outbound_probe_event {
1188                    OutboundProbeEvent::Request { peer, .. } => {
1189                        // For outbound probe request add peer to allow list to ensure they can dial
1190                        // us back and not hit global incoming connection limit
1191                        self.swarm
1192                            .behaviour_mut()
1193                            .connection_limits
1194                            // We expect a single successful dial from this peer
1195                            .add_to_incoming_allow_list(
1196                                peer,
1197                                self.peer_ip_addresses
1198                                    .get(&peer)
1199                                    .iter()
1200                                    .flat_map(|ip_addresses| ip_addresses.iter())
1201                                    .copied(),
1202                                1,
1203                            );
1204                    }
1205                    OutboundProbeEvent::Response { peer, .. } => {
1206                        self.swarm
1207                            .behaviour_mut()
1208                            .connection_limits
1209                            .remove_from_incoming_allow_list(&peer, Some(1));
1210                    }
1211                    OutboundProbeEvent::Error { peer, .. } => {
1212                        if let Some(peer) = peer {
1213                            self.swarm
1214                                .behaviour_mut()
1215                                .connection_limits
1216                                .remove_from_incoming_allow_list(&peer, Some(1));
1217                        }
1218                    }
1219                }
1220            }
1221            AutonatEvent::StatusChanged { old, new } => {
1222                debug!(?old, ?new, "Public address status changed.");
1223
1224                // TODO: Remove block once https://github.com/libp2p/rust-libp2p/issues/4863 is resolved
1225                if let (NatStatus::Public(old_address), NatStatus::Private) = (old, new.clone()) {
1226                    self.swarm.remove_external_address(&old_address);
1227                    debug!(
1228                        ?old_address,
1229                        new_status = ?new,
1230                        "Removing old external address...",
1231                    );
1232
1233                    // Trigger potential mode change manually
1234                    self.swarm.behaviour_mut().kademlia.set_mode(None);
1235                }
1236
1237                let connected_peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
1238                self.swarm.behaviour_mut().identify.push(connected_peers);
1239            }
1240        }
1241    }
1242
1243    fn handle_command(&mut self, command: Command) {
1244        match command {
1245            Command::GetValue {
1246                key,
1247                result_sender,
1248                permit,
1249            } => {
1250                let query_id = self
1251                    .swarm
1252                    .behaviour_mut()
1253                    .kademlia
1254                    .get_record(key.to_bytes().into());
1255
1256                self.query_id_receivers.insert(
1257                    query_id,
1258                    QueryResultSender::Value {
1259                        sender: result_sender,
1260                        _permit: permit,
1261                    },
1262                );
1263            }
1264            Command::PutValue {
1265                key,
1266                value,
1267                result_sender,
1268                permit,
1269            } => {
1270                let local_peer_id = *self.swarm.local_peer_id();
1271
1272                let record = Record {
1273                    key: key.into(),
1274                    value,
1275                    publisher: Some(local_peer_id),
1276                    expires: None, // No time expiration.
1277                };
1278                let query_result = self
1279                    .swarm
1280                    .behaviour_mut()
1281                    .kademlia
1282                    .put_record(record, Quorum::One);
1283
1284                match query_result {
1285                    Ok(query_id) => {
1286                        self.query_id_receivers.insert(
1287                            query_id,
1288                            QueryResultSender::PutValue {
1289                                sender: result_sender,
1290                                _permit: permit,
1291                            },
1292                        );
1293                    }
1294                    Err(err) => {
1295                        warn!(?err, "Failed to put value.");
1296                    }
1297                }
1298            }
1299            Command::Subscribe {
1300                topic,
1301                result_sender,
1302            } => {
1303                assert!(
1304                    self.swarm.behaviour().gossipsub.is_enabled(),
1305                    "Gossipsub protocol is disabled."
1306                );
1307
1308                let topic_hash = topic.hash();
1309                let (sender, receiver) = mpsc::unbounded();
1310
1311                // Unconditionally create subscription ID, code is simpler this way.
1312                let subscription_id = self.next_subscription_id;
1313                self.next_subscription_id += 1;
1314
1315                let created_subscription = CreatedSubscription {
1316                    subscription_id,
1317                    receiver,
1318                };
1319
1320                match self.topic_subscription_senders.entry(topic_hash) {
1321                    Entry::Occupied(mut entry) => {
1322                        // In case subscription already exists, just add one more sender to it.
1323                        if result_sender.send(Ok(created_subscription)).is_ok() {
1324                            entry.get_mut().insert(subscription_id, sender);
1325                        }
1326                    }
1327                    Entry::Vacant(entry) => {
1328                        // Otherwise subscription needs to be created.
1329
1330                        if let Some(gossipsub) = self.swarm.behaviour_mut().gossipsub.as_mut() {
1331                            match gossipsub.subscribe(&topic) {
1332                                Ok(true) => {
1333                                    if result_sender.send(Ok(created_subscription)).is_ok() {
1334                                        entry
1335                                            .insert(IntMap::from_iter([(subscription_id, sender)]));
1336                                    }
1337                                }
1338                                Ok(false) => {
1339                                    panic!(
1340                                        "Logic error, topic subscription wasn't created, this \
1341                                        must never happen"
1342                                    );
1343                                }
1344                                Err(error) => {
1345                                    let _: Result<(), _> = result_sender.send(Err(error));
1346                                }
1347                            }
1348                        }
1349                    }
1350                }
1351            }
1352            Command::Unsubscribe {
1353                topic,
1354                subscription_id,
1355            } => {
1356                assert!(
1357                    self.swarm.behaviour().gossipsub.is_enabled(),
1358                    "Gossipsub protocol is disabled."
1359                );
1360
1361                if let Entry::Occupied(mut entry) =
1362                    self.topic_subscription_senders.entry(topic.hash())
1363                {
1364                    entry.get_mut().remove(&subscription_id);
1365
1366                    // If last sender was removed - unsubscribe.
1367                    if entry.get().is_empty() {
1368                        entry.remove_entry();
1369
1370                        if let Some(gossipsub) = self.swarm.behaviour_mut().gossipsub.as_mut()
1371                            && !gossipsub.unsubscribe(&topic)
1372                        {
1373                            warn!(
1374                                "Can't unsubscribe from topic {topic} because subscription doesn't \
1375                                exist, this is a logic error in the subspace or swarm libraries"
1376                            );
1377                        }
1378                    }
1379                } else {
1380                    error!(
1381                        "Can't unsubscribe from topic {topic} because subscription doesn't exist, \
1382                        this is a logic error in the subspace library"
1383                    );
1384                }
1385            }
1386            Command::Publish {
1387                topic,
1388                message,
1389                result_sender,
1390            } => {
1391                assert!(
1392                    self.swarm.behaviour().gossipsub.is_enabled(),
1393                    "Gossipsub protocol is disabled"
1394                );
1395
1396                if let Some(gossipsub) = self.swarm.behaviour_mut().gossipsub.as_mut() {
1397                    // Doesn't matter if receiver still waits for response.
1398                    let _: Result<(), _> =
1399                        result_sender.send(gossipsub.publish(topic, message).map(|_message_id| ()));
1400                }
1401            }
1402            Command::GetClosestPeers {
1403                key,
1404                result_sender,
1405                permit,
1406            } => {
1407                let query_id = self.swarm.behaviour_mut().kademlia.get_closest_peers(key);
1408
1409                self.query_id_receivers.insert(
1410                    query_id,
1411                    QueryResultSender::ClosestPeers {
1412                        sender: result_sender,
1413                        _permit: permit,
1414                    },
1415                );
1416            }
1417            Command::GetClosestLocalPeers {
1418                key,
1419                source,
1420                result_sender,
1421            } => {
1422                let source = source.unwrap_or_else(|| *self.swarm.local_peer_id());
1423                let result = self
1424                    .swarm
1425                    .behaviour_mut()
1426                    .kademlia
1427                    .find_closest_local_peers(&KBucketKey::from(key), &source)
1428                    .filter(|peer| !peer.multiaddrs.is_empty())
1429                    .map(|peer| (peer.node_id, peer.multiaddrs))
1430                    .collect();
1431
1432                // Doesn't matter if receiver still waits for response.
1433                let _: Result<(), _> = result_sender.send(result);
1434            }
1435            Command::GenericRequest {
1436                peer_id,
1437                addresses,
1438                protocol_name,
1439                request,
1440                result_sender,
1441            } => {
1442                self.swarm.behaviour_mut().request_response.send_request(
1443                    &peer_id,
1444                    protocol_name,
1445                    request,
1446                    result_sender,
1447                    IfDisconnected::TryConnect,
1448                    addresses,
1449                );
1450            }
1451            Command::GetProviders {
1452                key,
1453                result_sender,
1454                permit,
1455            } => {
1456                let query_id = self
1457                    .swarm
1458                    .behaviour_mut()
1459                    .kademlia
1460                    .get_providers(key.clone());
1461
1462                self.query_id_receivers.insert(
1463                    query_id,
1464                    QueryResultSender::Providers {
1465                        key,
1466                        sender: result_sender,
1467                        _permit: permit,
1468                    },
1469                );
1470            }
1471            Command::BanPeer { peer_id } => {
1472                self.ban_peer(peer_id);
1473            }
1474            Command::Dial { address } => {
1475                let _: Result<(), _> = self.swarm.dial(address);
1476            }
1477            Command::ConnectedPeers { result_sender } => {
1478                let connected_peers = self.swarm.connected_peers().copied().collect();
1479
1480                let _: Result<(), _> = result_sender.send(connected_peers);
1481            }
1482            Command::ConnectedServers { result_sender } => {
1483                let connected_servers = self.connected_servers.iter().copied().collect();
1484
1485                let _: Result<(), _> = result_sender.send(connected_servers);
1486            }
1487            Command::Bootstrap { result_sender } => {
1488                let kademlia = &mut self.swarm.behaviour_mut().kademlia;
1489
1490                match kademlia.bootstrap() {
1491                    Ok(query_id) => {
1492                        if let Some(result_sender) = result_sender {
1493                            self.query_id_receivers.insert(
1494                                query_id,
1495                                QueryResultSender::Bootstrap {
1496                                    sender: result_sender,
1497                                },
1498                            );
1499                        }
1500                    }
1501                    Err(err) => {
1502                        debug!(?err, "Bootstrap error.");
1503                    }
1504                }
1505            }
1506        }
1507    }
1508
1509    fn ban_peer(&mut self, peer_id: PeerId) {
1510        // Remove temporary ban if there is one, before creating a permanent one.
1511        self.temporary_bans.lock().remove(&peer_id);
1512
1513        debug!(?peer_id, "Banning peer on network level");
1514
1515        self.swarm.behaviour_mut().block_list.block_peer(peer_id);
1516        self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id);
1517        self.known_peers_registry
1518            .remove_all_known_peer_addresses(peer_id);
1519    }
1520
1521    fn register_event_metrics(&mut self, swarm_event: &SwarmEvent<Event>) {
1522        if let Some(ref mut metrics) = self.libp2p_metrics {
1523            match swarm_event {
1524                SwarmEvent::Behaviour(Event::Ping(ping_event)) => {
1525                    metrics.record(ping_event);
1526                }
1527                SwarmEvent::Behaviour(Event::Identify(identify_event)) => {
1528                    metrics.record(identify_event);
1529                }
1530                SwarmEvent::Behaviour(Event::Kademlia(kademlia_event)) => {
1531                    metrics.record(kademlia_event);
1532                }
1533                SwarmEvent::Behaviour(Event::Gossipsub(gossipsub_event)) => {
1534                    metrics.record(gossipsub_event);
1535                }
1536                // TODO: implement in the upstream repository
1537                // SwarmEvent::Behaviour(Event::RequestResponse(request_response_event)) => {
1538                //     self.metrics.record(request_response_event);
1539                // }
1540                swarm_event => {
1541                    metrics.record(swarm_event);
1542                }
1543            }
1544        }
1545    }
1546
1547    fn log_kademlia_stats(&mut self) {
1548        let mut peer_counter = 0usize;
1549        let mut peer_with_no_address_counter = 0usize;
1550        for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
1551            for entry in kbucket.iter() {
1552                peer_counter += 1;
1553                if entry.node.value.len() == 0 {
1554                    peer_with_no_address_counter += 1;
1555                }
1556            }
1557        }
1558
1559        debug!(
1560            peers = %peer_counter,
1561            peers_with_no_address = %peer_with_no_address_counter,
1562            "Kademlia stats"
1563        );
1564    }
1565}