1use crate::behavior::persistent_parameters::{
2 KnownPeersRegistry, PeerAddressRemovedEvent, append_p2p_suffix, remove_p2p_suffix,
3};
4use crate::behavior::{Behavior, Event};
5use crate::constructor::DummyRecordStore;
6use crate::constructor::temporary_bans::TemporaryBans;
7use crate::protocols::request_response::request_response_factory::{
8 Event as RequestResponseEvent, IfDisconnected,
9};
10use crate::shared::{Command, CreatedSubscription, PeerDiscovered, Shared};
11use crate::utils::{SubspaceMetrics, is_global_address_or_dns, strip_peer_id};
12use async_lock::Mutex as AsyncMutex;
13use bytes::Bytes;
14use event_listener_primitives::HandlerId;
15use futures::channel::mpsc;
16use futures::future::Fuse;
17use futures::{FutureExt, StreamExt};
18use libp2p::autonat::{Event as AutonatEvent, NatStatus, OutboundProbeEvent};
19use libp2p::core::ConnectedPoint;
20use libp2p::gossipsub::{Event as GossipsubEvent, TopicHash};
21use libp2p::identify::Event as IdentifyEvent;
22use libp2p::kad::{
23 Behaviour as Kademlia, BootstrapOk, Event as KademliaEvent, GetClosestPeersError,
24 GetClosestPeersOk, GetProvidersError, GetProvidersOk, GetRecordError, GetRecordOk,
25 InboundRequest, KBucketKey, PeerRecord, ProgressStep, PutRecordOk, QueryId, QueryResult,
26 Quorum, Record, RecordKey,
27};
28use libp2p::metrics::{Metrics, Recorder};
29use libp2p::multiaddr::Protocol;
30use libp2p::swarm::dial_opts::DialOpts;
31use libp2p::swarm::{DialError, SwarmEvent};
32use libp2p::{Multiaddr, PeerId, Swarm, TransportError};
33use nohash_hasher::IntMap;
34use parking_lot::Mutex;
35use std::collections::hash_map::Entry;
36use std::collections::{HashMap, HashSet};
37use std::net::IpAddr;
38use std::pin::Pin;
39use std::sync::atomic::Ordering;
40use std::sync::{Arc, Weak};
41use std::time::Duration;
42use std::{fmt, slice};
43use tokio::sync::OwnedSemaphorePermit;
44use tokio::task::yield_now;
45use tokio::time::Sleep;
46use tracing::{debug, error, trace, warn};
47
48enum QueryResultSender {
49 Value {
50 sender: mpsc::UnboundedSender<PeerRecord>,
51 _permit: OwnedSemaphorePermit,
53 },
54 ClosestPeers {
55 sender: mpsc::UnboundedSender<PeerId>,
56 _permit: Option<OwnedSemaphorePermit>,
58 },
59 Providers {
60 key: RecordKey,
61 sender: mpsc::UnboundedSender<PeerId>,
62 _permit: Option<OwnedSemaphorePermit>,
64 },
65 PutValue {
66 sender: mpsc::UnboundedSender<()>,
67 _permit: OwnedSemaphorePermit,
69 },
70 Bootstrap {
71 sender: mpsc::UnboundedSender<()>,
72 },
73}
74
75#[derive(Debug, Default)]
76enum BootstrapCommandState {
77 #[default]
78 NotStarted,
79 InProgress(mpsc::UnboundedReceiver<()>),
80 Finished,
81}
82
83#[must_use = "Node does not function properly unless its runner is driven forward"]
85pub struct NodeRunner {
86 allow_non_global_addresses_in_dht: bool,
88 is_listening: bool,
90 command_receiver: mpsc::Receiver<Command>,
91 swarm: Swarm<Behavior>,
92 shared_weak: Weak<Shared>,
93 next_random_query_interval: Duration,
95 query_id_receivers: HashMap<QueryId, QueryResultSender>,
96 next_subscription_id: usize,
99 topic_subscription_senders: HashMap<TopicHash, IntMap<usize, mpsc::UnboundedSender<Bytes>>>,
102 random_query_timeout: Pin<Box<Fuse<Sleep>>>,
103 periodical_tasks_interval: Pin<Box<Fuse<Sleep>>>,
105 known_peers_registry: Box<dyn KnownPeersRegistry>,
107 connected_servers: HashSet<PeerId>,
108 reserved_peers: HashMap<PeerId, Multiaddr>,
110 temporary_bans: Arc<Mutex<TemporaryBans>>,
112 libp2p_metrics: Option<Metrics>,
114 metrics: Option<SubspaceMetrics>,
116 peer_ip_addresses: HashMap<PeerId, HashSet<IpAddr>>,
118 protocol_version: String,
120 bootstrap_addresses: Vec<Multiaddr>,
122 bootstrap_command_state: Arc<AsyncMutex<BootstrapCommandState>>,
124 removed_addresses_rx: mpsc::UnboundedReceiver<PeerAddressRemovedEvent>,
126 _address_removal_task_handler_id: Option<HandlerId>,
129}
130
131impl fmt::Debug for NodeRunner {
132 #[inline]
133 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134 f.debug_struct("NodeRunner").finish_non_exhaustive()
135 }
136}
137
138pub(crate) struct NodeRunnerConfig {
140 pub(crate) allow_non_global_addresses_in_dht: bool,
141 pub(crate) is_listening: bool,
143 pub(crate) command_receiver: mpsc::Receiver<Command>,
144 pub(crate) swarm: Swarm<Behavior>,
145 pub(crate) shared_weak: Weak<Shared>,
146 pub(crate) next_random_query_interval: Duration,
147 pub(crate) known_peers_registry: Box<dyn KnownPeersRegistry>,
148 pub(crate) reserved_peers: HashMap<PeerId, Multiaddr>,
149 pub(crate) temporary_bans: Arc<Mutex<TemporaryBans>>,
150 pub(crate) libp2p_metrics: Option<Metrics>,
151 pub(crate) metrics: Option<SubspaceMetrics>,
152 pub(crate) protocol_version: String,
153 pub(crate) bootstrap_addresses: Vec<Multiaddr>,
154}
155
156impl NodeRunner {
157 pub(crate) fn new(
158 NodeRunnerConfig {
159 allow_non_global_addresses_in_dht,
160 is_listening,
161 command_receiver,
162 swarm,
163 shared_weak,
164 next_random_query_interval,
165 mut known_peers_registry,
166 reserved_peers,
167 temporary_bans,
168 libp2p_metrics,
169 metrics,
170 protocol_version,
171 bootstrap_addresses,
172 }: NodeRunnerConfig,
173 ) -> Self {
174 let (removed_addresses_tx, removed_addresses_rx) = mpsc::unbounded();
176 let mut address_removal_task_handler_id = None;
177 if let Some(handler_id) = known_peers_registry.on_unreachable_address({
178 Arc::new(move |event| {
179 if let Err(error) = removed_addresses_tx.unbounded_send(event.clone()) {
180 debug!(?error, ?event, "Cannot send PeerAddressRemovedEvent");
181 }
182 })
183 }) {
184 address_removal_task_handler_id.replace(handler_id);
185 }
186
187 Self {
188 allow_non_global_addresses_in_dht,
189 is_listening,
190 command_receiver,
191 swarm,
192 shared_weak,
193 next_random_query_interval,
194 query_id_receivers: HashMap::default(),
195 next_subscription_id: 0,
196 topic_subscription_senders: HashMap::default(),
197 random_query_timeout: Box::pin(tokio::time::sleep(Duration::from_secs(0)).fuse()),
199 periodical_tasks_interval: Box::pin(tokio::time::sleep(Duration::from_secs(0)).fuse()),
201 known_peers_registry,
202 connected_servers: HashSet::new(),
203 reserved_peers,
204 temporary_bans,
205 libp2p_metrics,
206 metrics,
207 peer_ip_addresses: HashMap::new(),
208 protocol_version,
209 bootstrap_addresses,
210 bootstrap_command_state: Arc::new(AsyncMutex::new(BootstrapCommandState::default())),
211 removed_addresses_rx,
212 _address_removal_task_handler_id: address_removal_task_handler_id,
213 }
214 }
215
216 pub async fn run(&mut self) {
218 if self.is_listening {
219 loop {
222 if self.swarm.listeners().next().is_some() {
223 break;
224 }
225
226 if let Some(swarm_event) = self.swarm.next().await {
227 self.register_event_metrics(&swarm_event);
228 self.handle_swarm_event(swarm_event).await;
229 } else {
230 break;
231 }
232 }
233 }
234
235 self.bootstrap().await;
236
237 loop {
238 futures::select! {
239 () = &mut self.random_query_timeout => {
240 self.handle_random_query_interval();
241 self.random_query_timeout =
243 Box::pin(tokio::time::sleep(self.next_random_query_interval).fuse());
244 self.next_random_query_interval =
245 (self.next_random_query_interval * 2).min(Duration::from_mins(1));
246 },
247 swarm_event = self.swarm.next() => {
248 if let Some(swarm_event) = swarm_event {
249 self.register_event_metrics(&swarm_event);
250 self.handle_swarm_event(swarm_event).await;
251 } else {
252 break;
253 }
254 },
255 command = self.command_receiver.next() => {
256 if let Some(command) = command {
257 self.handle_command(command);
258 } else {
259 break;
260 }
261 },
262 () = self.known_peers_registry.run().fuse() => {
263 trace!("Network parameters registry runner exited");
264 },
265 () = &mut self.periodical_tasks_interval => {
266 self.handle_periodical_tasks();
267
268 self.periodical_tasks_interval =
269 Box::pin(tokio::time::sleep(Duration::from_secs(5)).fuse());
270 },
271 event = self.removed_addresses_rx.select_next_some() => {
272 self.handle_removed_address_event(event);
273 },
274 }
275
276 yield_now().await;
278 }
279 }
280
281 async fn bootstrap(&mut self) {
283 for (peer_id, address) in strip_peer_id(self.bootstrap_addresses.clone()) {
285 self.swarm
286 .behaviour_mut()
287 .kademlia
288 .add_address(&peer_id, address);
289 }
290
291 let known_peers = self.known_peers_registry.all_known_peers().await;
292
293 if !known_peers.is_empty() {
294 for (peer_id, addresses) in known_peers {
295 for address in addresses.clone() {
296 let address = match address.with_p2p(peer_id) {
297 Ok(address) => address,
298 Err(address) => {
299 warn!(%peer_id, %address, "Failed to add peer ID to known peer address");
300 break;
301 }
302 };
303 self.swarm
304 .behaviour_mut()
305 .kademlia
306 .add_address(&peer_id, address);
307 }
308
309 if let Err(error) = self
310 .swarm
311 .dial(DialOpts::peer_id(peer_id).addresses(addresses).build())
312 {
313 warn!(%peer_id, %error, "Failed to dial peer during bootstrapping");
314 }
315 }
316
317 self.handle_command(Command::Bootstrap {
319 result_sender: None,
320 });
321 return;
322 }
323
324 let bootstrap_command_state = Arc::clone(&self.bootstrap_command_state);
325 let mut bootstrap_command_state = bootstrap_command_state.lock().await;
326 let bootstrap_command_receiver = match &mut *bootstrap_command_state {
327 BootstrapCommandState::NotStarted => {
328 debug!("Bootstrap started.");
329
330 let (bootstrap_command_sender, bootstrap_command_receiver) = mpsc::unbounded();
331
332 self.handle_command(Command::Bootstrap {
333 result_sender: Some(bootstrap_command_sender),
334 });
335
336 *bootstrap_command_state =
337 BootstrapCommandState::InProgress(bootstrap_command_receiver);
338 match &mut *bootstrap_command_state {
339 BootstrapCommandState::InProgress(bootstrap_command_receiver) => {
340 bootstrap_command_receiver
341 }
342 _ => {
343 unreachable!("Was just set to that exact value");
344 }
345 }
346 }
347 BootstrapCommandState::InProgress(bootstrap_command_receiver) => {
348 bootstrap_command_receiver
349 }
350 BootstrapCommandState::Finished => {
351 return;
352 }
353 };
354
355 let mut bootstrap_step = 0usize;
356 loop {
357 futures::select! {
358 swarm_event = self.swarm.next() => {
359 if let Some(swarm_event) = swarm_event {
360 self.register_event_metrics(&swarm_event);
361 self.handle_swarm_event(swarm_event).await;
362 } else {
363 break;
364 }
365 },
366 result = bootstrap_command_receiver.next() => {
367 if result.is_some() {
368 debug!(%bootstrap_step, "Kademlia bootstrapping...");
369 bootstrap_step += 1;
370 } else {
371 break;
372 }
373 }
374 }
375 }
376
377 debug!("Bootstrap finished.");
378 *bootstrap_command_state = BootstrapCommandState::Finished;
379 }
380
381 fn handle_periodical_tasks(&mut self) {
383 let network_info = self.swarm.network_info();
385 let connections = network_info.connection_counters();
386
387 debug!(?connections, "Current connections and limits.");
388
389 let mut external_addresses = self.swarm.external_addresses().cloned().collect::<Vec<_>>();
391
392 if let Some(shared) = self.shared_weak.upgrade() {
393 debug!(?external_addresses, "Renew external addresses.");
394 let mut addresses = shared.external_addresses.lock();
395 addresses.clear();
396 addresses.append(&mut external_addresses);
397 }
398
399 self.log_kademlia_stats();
400 }
401
402 fn handle_random_query_interval(&mut self) {
403 let random_peer_id = PeerId::random();
404
405 trace!("Starting random Kademlia query for {}", random_peer_id);
406
407 self.swarm
408 .behaviour_mut()
409 .kademlia
410 .get_closest_peers(random_peer_id);
411 }
412
413 fn handle_removed_address_event(&mut self, event: PeerAddressRemovedEvent) {
414 trace!(?event, "Peer address removed event.");
415
416 let bootstrap_node_ids = strip_peer_id(self.bootstrap_addresses.clone())
417 .into_iter()
418 .map(|(peer_id, _)| peer_id)
419 .collect::<Vec<_>>();
420
421 if bootstrap_node_ids.contains(&event.peer_id) {
422 debug!(
423 ?event,
424 ?bootstrap_node_ids,
425 "Skipped removing bootstrap node from Kademlia buckets."
426 );
427
428 return;
429 }
430
431 self.swarm.behaviour_mut().kademlia.remove_address(
433 &event.peer_id,
434 &append_p2p_suffix(event.peer_id, event.address.clone()),
435 );
436
437 self.swarm
438 .behaviour_mut()
439 .kademlia
440 .remove_address(&event.peer_id, &remove_p2p_suffix(event.address));
441 }
442
443 fn handle_remove_listeners(&mut self, removed_listeners: &[Multiaddr]) {
444 let Some(shared) = self.shared_weak.upgrade() else {
445 return;
446 };
447
448 let peer_id = shared.id;
450 shared.listeners.lock().retain(|old_listener| {
451 !removed_listeners.contains(&append_p2p_suffix(peer_id, old_listener.clone()))
452 && !removed_listeners.contains(&remove_p2p_suffix(old_listener.clone()))
453 });
454 }
455
456 async fn handle_swarm_event(&mut self, swarm_event: SwarmEvent<Event>) {
457 #[expect(clippy::ref_patterns, reason = "Much less awkward this way")]
458 match swarm_event {
459 SwarmEvent::Behaviour(Event::Identify(event)) => {
460 self.handle_identify_event(*event);
461 }
462 SwarmEvent::Behaviour(Event::Kademlia(event)) => {
463 self.handle_kademlia_event(event);
464 }
465 SwarmEvent::Behaviour(Event::Gossipsub(event)) => {
466 self.handle_gossipsub_event(event);
467 }
468 SwarmEvent::Behaviour(Event::RequestResponse(event)) => {
469 self.handle_request_response_event(event);
470 }
471 SwarmEvent::Behaviour(Event::Autonat(event)) => {
472 self.handle_autonat_event(event);
473 }
474 ref event @ SwarmEvent::NewListenAddr { ref address, .. } => {
475 trace!(?event, "New local listener event.");
476
477 let Some(shared) = self.shared_weak.upgrade() else {
478 return;
479 };
480 shared.listeners.lock().push(address.clone());
481 shared.handlers.new_listener.call_simple(address);
482 }
483 ref event @ SwarmEvent::ListenerClosed { ref addresses, .. } => {
484 trace!(?event, "Local listener closed event.");
485 self.handle_remove_listeners(addresses);
486 }
487 ref event @ SwarmEvent::ExpiredListenAddr { ref address, .. } => {
488 trace!(?event, "Local listener expired event.");
489 self.handle_remove_listeners(slice::from_ref(address));
490 }
491 SwarmEvent::ConnectionEstablished {
492 peer_id,
493 endpoint,
494 num_established,
495 ..
496 } => {
497 if let ConnectedPoint::Dialer { address, .. } = &endpoint {
499 if self.allow_non_global_addresses_in_dht || is_global_address_or_dns(address) {
501 self.known_peers_registry
502 .add_known_peer(peer_id, vec![address.clone()])
503 .await;
504 }
505 }
506
507 let Some(shared) = self.shared_weak.upgrade() else {
508 return;
509 };
510
511 let is_reserved_peer = self.reserved_peers.contains_key(&peer_id);
512 debug!(
513 %peer_id,
514 %is_reserved_peer,
515 ?endpoint,
516 %num_established,
517 "Connection established"
518 );
519
520 let maybe_remote_ip =
521 endpoint
522 .get_remote_address()
523 .iter()
524 .find_map(|protocol| match protocol {
525 Protocol::Ip4(ip) => Some(IpAddr::V4(ip)),
526 Protocol::Ip6(ip) => Some(IpAddr::V6(ip)),
527 _ => None,
528 });
529 if let Some(ip) = maybe_remote_ip {
530 self.peer_ip_addresses
531 .entry(peer_id)
532 .and_modify(|ips| {
533 ips.insert(ip);
534 })
535 .or_insert(HashSet::from([ip]));
536 }
537
538 let num_established_peer_connections = shared
539 .num_established_peer_connections
540 .fetch_add(1, Ordering::SeqCst)
541 + 1;
542
543 shared
544 .handlers
545 .num_established_peer_connections_change
546 .call_simple(&num_established_peer_connections);
547
548 if num_established.get() == 1 {
550 shared.handlers.connected_peer.call_simple(&peer_id);
551 }
552
553 if let Some(metrics) = self.metrics.as_ref() {
554 metrics.inc_established_connections();
555 }
556 }
557 SwarmEvent::ConnectionClosed {
558 peer_id,
559 num_established,
560 cause,
561 ..
562 } => {
563 let Some(shared) = self.shared_weak.upgrade() else {
564 return;
565 };
566
567 debug!(
568 %peer_id,
569 ?cause,
570 %num_established,
571 "Connection closed with peer"
572 );
573
574 if num_established == 0 {
575 self.peer_ip_addresses.remove(&peer_id);
576 self.connected_servers.remove(&peer_id);
577 }
578 let num_established_peer_connections = shared
579 .num_established_peer_connections
580 .fetch_sub(1, Ordering::SeqCst)
581 - 1;
582
583 shared
584 .handlers
585 .num_established_peer_connections_change
586 .call_simple(&num_established_peer_connections);
587
588 if num_established == 0 {
590 shared.handlers.disconnected_peer.call_simple(&peer_id);
591 }
592
593 if let Some(metrics) = self.metrics.as_ref() {
594 metrics.dec_established_connections();
595 }
596 }
597 SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
598 if let Some(peer_id) = &peer_id {
599 let should_ban_temporarily =
600 self.should_temporary_ban_on_dial_error(peer_id, &error);
601
602 trace!(%should_ban_temporarily, "Temporary bans conditions.");
603
604 if should_ban_temporarily {
605 self.temporary_bans.lock().create_or_extend(peer_id);
606 debug!(%peer_id, ?error, "Peer was temporarily banned.");
607 }
608 }
609
610 debug!(
611 ?peer_id,
612 ?error,
613 "SwarmEvent::OutgoingConnectionError for peer."
614 );
615
616 match error {
617 DialError::Transport(ref addresses) => {
618 for (addr, _) in addresses {
619 trace!(?error, ?peer_id, %addr, "SwarmEvent::OutgoingConnectionError (DialError::Transport) for peer.");
620 if let Some(peer_id) = peer_id {
621 self.known_peers_registry
622 .remove_known_peer_addresses(peer_id, vec![addr.clone()])
623 .await;
624 }
625 }
626 }
627 DialError::WrongPeerId { obtained, .. } => {
628 trace!(?error, ?peer_id, obtained_peer_id=?obtained, "SwarmEvent::WrongPeerId (DialError::WrongPeerId) for peer.");
629
630 if let Some(ref peer_id) = peer_id {
631 let kademlia = &mut self.swarm.behaviour_mut().kademlia;
632 let _: Option<_> = kademlia.remove_peer(peer_id);
633 }
634 }
635 _ => {
636 trace!(?error, ?peer_id, "SwarmEvent::OutgoingConnectionError");
637 }
638 }
639 }
640 SwarmEvent::NewExternalAddrCandidate { address } => {
641 trace!(%address, "External address candidate");
642 }
643 SwarmEvent::ExternalAddrConfirmed { address } => {
644 debug!(%address, "Confirmed external address");
645
646 let connected_peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
647 self.swarm.behaviour_mut().identify.push(connected_peers);
648 }
649 SwarmEvent::ExternalAddrExpired { address } => {
650 debug!(%address, "External address expired");
651
652 let connected_peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
653 self.swarm.behaviour_mut().identify.push(connected_peers);
654 }
655 other => {
656 trace!("Other swarm event: {:?}", other);
657 }
658 }
659 }
660
661 fn should_temporary_ban_on_dial_error(&self, peer_id: &PeerId, error: &DialError) -> bool {
662 if true {
664 return false;
665 }
666
667 if self.swarm.is_connected(peer_id) {
669 return false;
670 }
671
672 match &error {
673 DialError::Transport(addresses) => {
674 for (_, error) in addresses {
675 match error {
676 TransportError::MultiaddrNotSupported(_) => {
677 return true;
678 }
679 TransportError::Other(_) => {
680 if self.temporary_bans.lock().is_banned(peer_id) {
682 return false;
683 }
684 }
685 }
686 }
687 true
689 }
690 DialError::LocalPeerId { .. } => {
691 debug!("Local peer dial attempt detected.");
693
694 false
695 }
696 DialError::NoAddresses => {
697 true
699 }
700 DialError::DialPeerConditionFalse(_) => {
701 false
703 }
704 DialError::Aborted => {
705 false
707 }
708 DialError::WrongPeerId { .. } => {
709 false
711 }
712 DialError::Denied { .. } => {
713 false
715 }
716 }
717 }
718
719 fn handle_identify_event(&mut self, event: IdentifyEvent) {
720 let local_peer_id = *self.swarm.local_peer_id();
721
722 if let IdentifyEvent::Received {
723 peer_id, mut info, ..
724 } = event
725 {
726 debug!(?peer_id, protocols = ?info.protocols, "IdentifyEvent::Received");
727
728 if info.protocol_version != self.protocol_version {
730 debug!(
731 %local_peer_id,
732 %peer_id,
733 local_protocol_version = %self.protocol_version,
734 peer_protocol_version = %info.protocol_version,
735 "Peer has different protocol version, banning temporarily",
736 );
737
738 self.temporary_bans.lock().create_or_extend(&peer_id);
739 let _: Result<(), ()> = self.swarm.disconnect_peer_id(peer_id);
741 self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id);
742 self.known_peers_registry
743 .remove_all_known_peer_addresses(peer_id);
744
745 return;
746 }
747
748 self.temporary_bans.lock().remove(&peer_id);
750
751 if info.listen_addrs.len() > 30 {
752 debug!(
753 %local_peer_id,
754 %peer_id,
755 "Node has reported more than 30 addresses; it is identified by {} and {}",
756 info.protocol_version, info.agent_version
757 );
758 info.listen_addrs.truncate(30);
759 }
760
761 let kademlia = &mut self.swarm.behaviour_mut().kademlia;
762 let full_kademlia_support = kademlia
763 .protocol_names()
764 .iter()
765 .all(|local_protocol| info.protocols.contains(local_protocol));
766
767 if full_kademlia_support {
768 let received_addresses = info
769 .listen_addrs
770 .into_iter()
771 .filter(|address| {
772 if self.allow_non_global_addresses_in_dht
773 || is_global_address_or_dns(address)
774 {
775 true
776 } else {
777 trace!(
778 %local_peer_id,
779 %peer_id,
780 %address,
781 "Ignoring self-reported non-global address",
782 );
783
784 false
785 }
786 })
787 .collect::<Vec<_>>();
788 let received_address_strings = received_addresses
789 .iter()
790 .map(ToString::to_string)
791 .collect::<Vec<_>>();
792 let old_addresses = kademlia
793 .kbucket(peer_id)
794 .and_then(|peers| {
795 let key = peer_id.into();
796 peers.iter().find_map(|peer| {
797 (peer.node.key == &key).then_some(
798 peer.node
799 .value
800 .iter()
801 .filter(|existing_address| {
802 let existing_address = existing_address.to_string();
803
804 !received_address_strings.iter().any(|received_address| {
805 received_address.starts_with(&existing_address)
806 || existing_address.starts_with(received_address)
807 })
808 })
809 .cloned()
810 .collect::<Vec<_>>(),
811 )
812 })
813 })
814 .unwrap_or_default();
815
816 for address in received_addresses {
817 debug!(
818 %local_peer_id,
819 %peer_id,
820 %address,
821 protocol_names = ?kademlia.protocol_names(),
822 "Adding self-reported address to Kademlia DHT",
823 );
824
825 kademlia.add_address(&peer_id, address);
826 }
827
828 for old_address in old_addresses {
829 trace!(
830 %local_peer_id,
831 %peer_id,
832 %old_address,
833 "Removing old self-reported address from Kademlia DHT",
834 );
835
836 kademlia.remove_address(&peer_id, &old_address);
837 }
838
839 self.connected_servers.insert(peer_id);
840 } else {
841 debug!(
842 %local_peer_id,
843 %peer_id,
844 peer_protocols = ?info.protocols,
845 protocol_names = ?kademlia.protocol_names(),
846 "Peer doesn't support our Kademlia DHT protocol",
847 );
848
849 kademlia.remove_peer(&peer_id);
850 self.connected_servers.remove(&peer_id);
851 }
852 }
853 }
854
855 fn handle_kademlia_event(&mut self, event: KademliaEvent) {
856 trace!("Kademlia event: {:?}", event);
857
858 match event {
859 KademliaEvent::InboundRequest {
860 request: InboundRequest::AddProvider { record, .. },
861 } => {
862 debug!("Unexpected AddProvider request received: {:?}", record);
863 }
864 KademliaEvent::UnroutablePeer { peer } => {
865 debug!(%peer, "Unroutable peer detected");
866
867 self.swarm.behaviour_mut().kademlia.remove_peer(&peer);
868
869 if let Some(shared) = self.shared_weak.upgrade() {
870 shared
871 .handlers
872 .peer_discovered
873 .call_simple(&PeerDiscovered::UnroutablePeer { peer_id: peer });
874 }
875 }
876 KademliaEvent::RoutablePeer { peer, address } => {
877 debug!(?address, "Routable peer detected: {:?}", peer);
878
879 if let Some(shared) = self.shared_weak.upgrade() {
880 shared
881 .handlers
882 .peer_discovered
883 .call_simple(&PeerDiscovered::RoutablePeer {
884 peer_id: peer,
885 address,
886 });
887 }
888 }
889 KademliaEvent::PendingRoutablePeer { peer, address } => {
890 debug!(?address, "Pending routable peer detected: {:?}", peer);
891
892 if let Some(shared) = self.shared_weak.upgrade() {
893 shared
894 .handlers
895 .peer_discovered
896 .call_simple(&PeerDiscovered::RoutablePeer {
897 peer_id: peer,
898 address,
899 });
900 }
901 }
902 KademliaEvent::OutboundQueryProgressed {
903 step: ProgressStep { last, .. },
904 id,
905 result: QueryResult::GetClosestPeers(result),
906 ..
907 } => {
908 let mut cancelled = false;
909 if let Some(QueryResultSender::ClosestPeers { sender, .. }) =
910 self.query_id_receivers.get(&id)
911 {
912 match result {
913 Ok(GetClosestPeersOk { key, peers }) => {
914 trace!(
915 "Get closest peers query for {} yielded {} results",
916 hex::encode(key),
917 peers.len(),
918 );
919
920 if peers.is_empty()
921 && self.swarm.connected_peers().next().is_some()
923 {
924 debug!("Random Kademlia query has yielded empty list of peers");
925 }
926
927 for peer in peers {
928 cancelled = Self::unbounded_send_and_cancel_on_error(
929 &mut self.swarm.behaviour_mut().kademlia,
930 sender,
931 peer.peer_id,
932 "GetClosestPeersOk",
933 id,
934 ) || cancelled;
935 }
936 }
937 Err(GetClosestPeersError::Timeout { key, peers }) => {
938 debug!(
939 "Get closest peers query for {} timed out with {} results",
940 hex::encode(key),
941 peers.len(),
942 );
943
944 for peer in peers {
945 cancelled = Self::unbounded_send_and_cancel_on_error(
946 &mut self.swarm.behaviour_mut().kademlia,
947 sender,
948 peer.peer_id,
949 "GetClosestPeersError::Timeout",
950 id,
951 ) || cancelled;
952 }
953 }
954 }
955 }
956
957 if last || cancelled {
958 self.query_id_receivers.remove(&id);
960 }
961 }
962 KademliaEvent::OutboundQueryProgressed {
963 step: ProgressStep { last, .. },
964 id,
965 result: QueryResult::GetRecord(result),
966 ..
967 } => {
968 let mut cancelled = false;
969 if let Some(QueryResultSender::Value { sender, .. }) =
970 self.query_id_receivers.get(&id)
971 {
972 match result {
973 Ok(GetRecordOk::FoundRecord(rec)) => {
974 trace!(
975 key = hex::encode(&rec.record.key),
976 "Get record query succeeded",
977 );
978
979 cancelled = Self::unbounded_send_and_cancel_on_error(
980 &mut self.swarm.behaviour_mut().kademlia,
981 sender,
982 rec,
983 "GetRecordOk",
984 id,
985 ) || cancelled;
986 }
987 Ok(GetRecordOk::FinishedWithNoAdditionalRecord { .. }) => {
988 trace!("Get record query yielded no results");
989 }
990 Err(error) => match error {
991 GetRecordError::NotFound { key, .. } => {
992 debug!(
993 key = hex::encode(&key),
994 "Get record query failed with no results",
995 );
996 }
997 GetRecordError::Timeout { key } => {
998 debug!(key = hex::encode(&key), "Get record query timed out");
999 }
1000 },
1001 }
1002 }
1003
1004 if last || cancelled {
1005 self.query_id_receivers.remove(&id);
1007 }
1008 }
1009 KademliaEvent::OutboundQueryProgressed {
1010 step: ProgressStep { last, .. },
1011 id,
1012 result: QueryResult::GetProviders(result),
1013 ..
1014 } => {
1015 let mut cancelled = false;
1016 if let Some(QueryResultSender::Providers { key, sender, .. }) =
1017 self.query_id_receivers.get(&id)
1018 {
1019 match result {
1020 Ok(GetProvidersOk::FoundProviders { key, providers }) => {
1021 trace!(
1022 key = hex::encode(&key),
1023 "Get providers query yielded {} results",
1024 providers.len(),
1025 );
1026
1027 for provider in providers {
1028 cancelled = Self::unbounded_send_and_cancel_on_error(
1029 &mut self.swarm.behaviour_mut().kademlia,
1030 sender,
1031 provider,
1032 "GetProvidersOk",
1033 id,
1034 ) || cancelled;
1035 }
1036 }
1037 Ok(GetProvidersOk::FinishedWithNoAdditionalRecord { closest_peers }) => {
1038 trace!(
1039 key = hex::encode(key),
1040 closest_peers = %closest_peers.len(),
1041 "Get providers query yielded no results"
1042 );
1043 }
1044 Err(error) => {
1045 let GetProvidersError::Timeout { key, .. } = error;
1046
1047 debug!(
1048 key = hex::encode(&key),
1049 "Get providers query failed with no results",
1050 );
1051 }
1052 }
1053 }
1054
1055 if last || cancelled {
1056 self.query_id_receivers.remove(&id);
1058 }
1059 }
1060 KademliaEvent::OutboundQueryProgressed {
1061 step: ProgressStep { last, .. },
1062 id,
1063 result: QueryResult::PutRecord(result),
1064 ..
1065 } => {
1066 let mut cancelled = false;
1067 if let Some(QueryResultSender::PutValue { sender, .. }) =
1068 self.query_id_receivers.get(&id)
1069 {
1070 match result {
1071 Ok(PutRecordOk { key }) => {
1072 trace!("Put record query for {} succeeded", hex::encode(&key));
1073
1074 cancelled = Self::unbounded_send_and_cancel_on_error(
1075 &mut self.swarm.behaviour_mut().kademlia,
1076 sender,
1077 (),
1078 "PutRecordOk",
1079 id,
1080 ) || cancelled;
1081 }
1082 Err(error) => {
1083 debug!(?error, "Put record query failed.");
1084 }
1085 }
1086 }
1087
1088 if last || cancelled {
1089 self.query_id_receivers.remove(&id);
1091 }
1092 }
1093 KademliaEvent::OutboundQueryProgressed {
1094 step: ProgressStep { last, count },
1095 id,
1096 result: QueryResult::Bootstrap(result),
1097 stats,
1098 } => {
1099 debug!(?stats, %last, %count, ?id, ?result, "Bootstrap OutboundQueryProgressed step.");
1100
1101 let mut cancelled = false;
1102 if let Some(QueryResultSender::Bootstrap { sender }) =
1103 self.query_id_receivers.get_mut(&id)
1104 {
1105 match result {
1106 Ok(BootstrapOk {
1107 peer,
1108 num_remaining,
1109 }) => {
1110 trace!(%peer, %num_remaining, %last, "Bootstrap query step succeeded");
1111
1112 cancelled = Self::unbounded_send_and_cancel_on_error(
1113 &mut self.swarm.behaviour_mut().kademlia,
1114 sender,
1115 (),
1116 "Bootstrap",
1117 id,
1118 ) || cancelled;
1119 }
1120 Err(error) => {
1121 debug!(?error, "Bootstrap query failed.");
1122 }
1123 }
1124 }
1125
1126 if last || cancelled {
1127 self.query_id_receivers.remove(&id);
1129 }
1130 }
1131 _ => {}
1132 }
1133 }
1134
1135 fn unbounded_send_and_cancel_on_error<T>(
1137 kademlia: &mut Kademlia<DummyRecordStore>,
1138 sender: &mpsc::UnboundedSender<T>,
1139 value: T,
1140 channel: &'static str,
1141 id: QueryId,
1142 ) -> bool {
1143 if sender.unbounded_send(value).is_err() {
1144 debug!("{} channel was dropped", channel);
1145
1146 if let Some(mut query) = kademlia.query_mut(&id) {
1148 query.finish();
1149 }
1150 true
1151 } else {
1152 false
1153 }
1154 }
1155
1156 fn handle_gossipsub_event(&mut self, event: GossipsubEvent) {
1157 if let GossipsubEvent::Message { message, .. } = event
1158 && let Some(senders) = self.topic_subscription_senders.get(&message.topic)
1159 {
1160 let bytes = Bytes::from(message.data);
1161
1162 for sender in senders.values() {
1163 let _: Result<(), _> = sender.unbounded_send(bytes.clone());
1165 }
1166 }
1167 }
1168
1169 fn handle_request_response_event(&mut self, event: RequestResponseEvent) {
1170 trace!("Request response event: {:?}", event);
1172 }
1173
1174 fn handle_autonat_event(&mut self, event: AutonatEvent) {
1175 trace!(?event, "Autonat event received.");
1176 let autonat = &self.swarm.behaviour().autonat;
1177 debug!(
1178 public_address=?autonat.public_address(),
1179 confidence=%autonat.confidence(),
1180 "Current public address confidence."
1181 );
1182
1183 match event {
1184 AutonatEvent::InboundProbe(_inbound_probe_event) => {
1185 }
1187 AutonatEvent::OutboundProbe(outbound_probe_event) => {
1188 match outbound_probe_event {
1189 OutboundProbeEvent::Request { peer, .. } => {
1190 self.swarm
1193 .behaviour_mut()
1194 .connection_limits
1195 .add_to_incoming_allow_list(
1197 peer,
1198 self.peer_ip_addresses
1199 .get(&peer)
1200 .iter()
1201 .flat_map(|ip_addresses| ip_addresses.iter())
1202 .copied(),
1203 1,
1204 );
1205 }
1206 OutboundProbeEvent::Response { peer, .. } => {
1207 self.swarm
1208 .behaviour_mut()
1209 .connection_limits
1210 .remove_from_incoming_allow_list(&peer, Some(1));
1211 }
1212 OutboundProbeEvent::Error { peer, .. } => {
1213 if let Some(peer) = peer {
1214 self.swarm
1215 .behaviour_mut()
1216 .connection_limits
1217 .remove_from_incoming_allow_list(&peer, Some(1));
1218 }
1219 }
1220 }
1221 }
1222 AutonatEvent::StatusChanged { old, new } => {
1223 debug!(?old, ?new, "Public address status changed.");
1224
1225 if let (NatStatus::Public(old_address), NatStatus::Private) = (old, new.clone()) {
1227 self.swarm.remove_external_address(&old_address);
1228 debug!(
1229 ?old_address,
1230 new_status = ?new,
1231 "Removing old external address...",
1232 );
1233
1234 self.swarm.behaviour_mut().kademlia.set_mode(None);
1236 }
1237
1238 let connected_peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
1239 self.swarm.behaviour_mut().identify.push(connected_peers);
1240 }
1241 }
1242 }
1243
1244 fn handle_command(&mut self, command: Command) {
1245 match command {
1246 Command::GetValue {
1247 key,
1248 result_sender,
1249 permit,
1250 } => {
1251 let query_id = self
1252 .swarm
1253 .behaviour_mut()
1254 .kademlia
1255 .get_record(key.to_bytes().into());
1256
1257 self.query_id_receivers.insert(
1258 query_id,
1259 QueryResultSender::Value {
1260 sender: result_sender,
1261 _permit: permit,
1262 },
1263 );
1264 }
1265 Command::PutValue {
1266 key,
1267 value,
1268 result_sender,
1269 permit,
1270 } => {
1271 let local_peer_id = *self.swarm.local_peer_id();
1272
1273 let record = Record {
1274 key: key.into(),
1275 value,
1276 publisher: Some(local_peer_id),
1277 expires: None, };
1279 let query_result = self
1280 .swarm
1281 .behaviour_mut()
1282 .kademlia
1283 .put_record(record, Quorum::One);
1284
1285 match query_result {
1286 Ok(query_id) => {
1287 self.query_id_receivers.insert(
1288 query_id,
1289 QueryResultSender::PutValue {
1290 sender: result_sender,
1291 _permit: permit,
1292 },
1293 );
1294 }
1295 Err(err) => {
1296 warn!(?err, "Failed to put value.");
1297 }
1298 }
1299 }
1300 Command::Subscribe {
1301 topic,
1302 result_sender,
1303 } => {
1304 assert!(
1305 self.swarm.behaviour().gossipsub.is_enabled(),
1306 "Gossipsub protocol is disabled."
1307 );
1308
1309 let topic_hash = topic.hash();
1310 let (sender, receiver) = mpsc::unbounded();
1311
1312 let subscription_id = self.next_subscription_id;
1314 self.next_subscription_id += 1;
1315
1316 let created_subscription = CreatedSubscription {
1317 subscription_id,
1318 receiver,
1319 };
1320
1321 match self.topic_subscription_senders.entry(topic_hash) {
1322 Entry::Occupied(mut entry) => {
1323 if result_sender.send(Ok(created_subscription)).is_ok() {
1325 entry.get_mut().insert(subscription_id, sender);
1326 }
1327 }
1328 Entry::Vacant(entry) => {
1329 if let Some(gossipsub) = self.swarm.behaviour_mut().gossipsub.as_mut() {
1332 match gossipsub.subscribe(&topic) {
1333 Ok(true) => {
1334 if result_sender.send(Ok(created_subscription)).is_ok() {
1335 entry
1336 .insert(IntMap::from_iter([(subscription_id, sender)]));
1337 }
1338 }
1339 Ok(false) => {
1340 panic!(
1341 "Logic error, topic subscription wasn't created, this \
1342 must never happen"
1343 );
1344 }
1345 Err(error) => {
1346 let _: Result<(), _> = result_sender.send(Err(error));
1347 }
1348 }
1349 }
1350 }
1351 }
1352 }
1353 Command::Unsubscribe {
1354 topic,
1355 subscription_id,
1356 } => {
1357 assert!(
1358 self.swarm.behaviour().gossipsub.is_enabled(),
1359 "Gossipsub protocol is disabled."
1360 );
1361
1362 if let Entry::Occupied(mut entry) =
1363 self.topic_subscription_senders.entry(topic.hash())
1364 {
1365 entry.get_mut().remove(&subscription_id);
1366
1367 if entry.get().is_empty() {
1369 entry.remove_entry();
1370
1371 if let Some(gossipsub) = self.swarm.behaviour_mut().gossipsub.as_mut()
1372 && !gossipsub.unsubscribe(&topic)
1373 {
1374 warn!(
1375 "Can't unsubscribe from topic {topic} because subscription doesn't \
1376 exist, this is a logic error in the subspace or swarm libraries"
1377 );
1378 }
1379 }
1380 } else {
1381 error!(
1382 "Can't unsubscribe from topic {topic} because subscription doesn't exist, \
1383 this is a logic error in the subspace library"
1384 );
1385 }
1386 }
1387 Command::Publish {
1388 topic,
1389 message,
1390 result_sender,
1391 } => {
1392 assert!(
1393 self.swarm.behaviour().gossipsub.is_enabled(),
1394 "Gossipsub protocol is disabled"
1395 );
1396
1397 if let Some(gossipsub) = self.swarm.behaviour_mut().gossipsub.as_mut() {
1398 let _: Result<(), _> =
1400 result_sender.send(gossipsub.publish(topic, message).map(|_message_id| ()));
1401 }
1402 }
1403 Command::GetClosestPeers {
1404 key,
1405 result_sender,
1406 permit,
1407 } => {
1408 let query_id = self.swarm.behaviour_mut().kademlia.get_closest_peers(key);
1409
1410 self.query_id_receivers.insert(
1411 query_id,
1412 QueryResultSender::ClosestPeers {
1413 sender: result_sender,
1414 _permit: permit,
1415 },
1416 );
1417 }
1418 Command::GetClosestLocalPeers {
1419 key,
1420 source,
1421 result_sender,
1422 } => {
1423 let source = source.unwrap_or_else(|| *self.swarm.local_peer_id());
1424 let result = self
1425 .swarm
1426 .behaviour_mut()
1427 .kademlia
1428 .find_closest_local_peers(&KBucketKey::from(key), &source)
1429 .filter(|peer| !peer.multiaddrs.is_empty())
1430 .map(|peer| (peer.node_id, peer.multiaddrs))
1431 .collect();
1432
1433 let _: Result<(), _> = result_sender.send(result);
1435 }
1436 Command::GenericRequest {
1437 peer_id,
1438 addresses,
1439 protocol_name,
1440 request,
1441 result_sender,
1442 } => {
1443 self.swarm.behaviour_mut().request_response.send_request(
1444 &peer_id,
1445 protocol_name,
1446 request,
1447 result_sender,
1448 IfDisconnected::TryConnect,
1449 addresses,
1450 );
1451 }
1452 Command::GetProviders {
1453 key,
1454 result_sender,
1455 permit,
1456 } => {
1457 let query_id = self
1458 .swarm
1459 .behaviour_mut()
1460 .kademlia
1461 .get_providers(key.clone());
1462
1463 self.query_id_receivers.insert(
1464 query_id,
1465 QueryResultSender::Providers {
1466 key,
1467 sender: result_sender,
1468 _permit: permit,
1469 },
1470 );
1471 }
1472 Command::BanPeer { peer_id } => {
1473 self.ban_peer(peer_id);
1474 }
1475 Command::Dial { address } => {
1476 let _: Result<(), _> = self.swarm.dial(address);
1477 }
1478 Command::ConnectedPeers { result_sender } => {
1479 let connected_peers = self.swarm.connected_peers().copied().collect();
1480
1481 let _: Result<(), _> = result_sender.send(connected_peers);
1482 }
1483 Command::ConnectedServers { result_sender } => {
1484 let connected_servers = self.connected_servers.iter().copied().collect();
1485
1486 let _: Result<(), _> = result_sender.send(connected_servers);
1487 }
1488 Command::Bootstrap { result_sender } => {
1489 let kademlia = &mut self.swarm.behaviour_mut().kademlia;
1490
1491 match kademlia.bootstrap() {
1492 Ok(query_id) => {
1493 if let Some(result_sender) = result_sender {
1494 self.query_id_receivers.insert(
1495 query_id,
1496 QueryResultSender::Bootstrap {
1497 sender: result_sender,
1498 },
1499 );
1500 }
1501 }
1502 Err(err) => {
1503 debug!(?err, "Bootstrap error.");
1504 }
1505 }
1506 }
1507 }
1508 }
1509
1510 fn ban_peer(&mut self, peer_id: PeerId) {
1511 self.temporary_bans.lock().remove(&peer_id);
1513
1514 debug!(?peer_id, "Banning peer on network level");
1515
1516 self.swarm.behaviour_mut().block_list.block_peer(peer_id);
1517 self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id);
1518 self.known_peers_registry
1519 .remove_all_known_peer_addresses(peer_id);
1520 }
1521
1522 fn register_event_metrics(&mut self, swarm_event: &SwarmEvent<Event>) {
1523 if let Some(ref mut metrics) = self.libp2p_metrics {
1524 match swarm_event {
1525 SwarmEvent::Behaviour(Event::Ping(ping_event)) => {
1526 metrics.record(ping_event);
1527 }
1528 SwarmEvent::Behaviour(Event::Identify(identify_event)) => {
1529 metrics.record(identify_event.as_ref());
1530 }
1531 SwarmEvent::Behaviour(Event::Kademlia(kademlia_event)) => {
1532 metrics.record(kademlia_event);
1533 }
1534 SwarmEvent::Behaviour(Event::Gossipsub(gossipsub_event)) => {
1535 metrics.record(gossipsub_event);
1536 }
1537 swarm_event => {
1542 metrics.record(swarm_event);
1543 }
1544 }
1545 }
1546 }
1547
1548 fn log_kademlia_stats(&mut self) {
1549 let mut peer_counter = 0usize;
1550 let mut peer_with_no_address_counter = 0usize;
1551 for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
1552 for entry in kbucket.iter() {
1553 peer_counter += 1;
1554 if entry.node.value.len() == 0 {
1555 peer_with_no_address_counter += 1;
1556 }
1557 }
1558 }
1559
1560 debug!(
1561 peers = %peer_counter,
1562 peers_with_no_address = %peer_with_no_address_counter,
1563 "Kademlia stats"
1564 );
1565 }
1566}