1use crate::behavior::persistent_parameters::{
2 KnownPeersRegistry, PeerAddressRemovedEvent, append_p2p_suffix, remove_p2p_suffix,
3};
4use crate::behavior::{Behavior, Event};
5use crate::constructor::DummyRecordStore;
6use crate::constructor::temporary_bans::TemporaryBans;
7use crate::protocols::request_response::request_response_factory::{
8 Event as RequestResponseEvent, IfDisconnected,
9};
10use crate::shared::{Command, CreatedSubscription, PeerDiscovered, Shared};
11use crate::utils::{SubspaceMetrics, is_global_address_or_dns, strip_peer_id};
12use async_lock::Mutex as AsyncMutex;
13use bytes::Bytes;
14use event_listener_primitives::HandlerId;
15use futures::channel::mpsc;
16use futures::future::Fuse;
17use futures::{FutureExt, StreamExt};
18use libp2p::autonat::{Event as AutonatEvent, NatStatus, OutboundProbeEvent};
19use libp2p::core::ConnectedPoint;
20use libp2p::gossipsub::{Event as GossipsubEvent, TopicHash};
21use libp2p::identify::Event as IdentifyEvent;
22use libp2p::kad::{
23 Behaviour as Kademlia, BootstrapOk, Event as KademliaEvent, GetClosestPeersError,
24 GetClosestPeersOk, GetProvidersError, GetProvidersOk, GetRecordError, GetRecordOk,
25 InboundRequest, KBucketKey, PeerRecord, ProgressStep, PutRecordOk, QueryId, QueryResult,
26 Quorum, Record, RecordKey,
27};
28use libp2p::metrics::{Metrics, Recorder};
29use libp2p::multiaddr::Protocol;
30use libp2p::swarm::dial_opts::DialOpts;
31use libp2p::swarm::{DialError, SwarmEvent};
32use libp2p::{Multiaddr, PeerId, Swarm, TransportError};
33use nohash_hasher::IntMap;
34use parking_lot::Mutex;
35use std::collections::hash_map::Entry;
36use std::collections::{HashMap, HashSet};
37use std::net::IpAddr;
38use std::pin::Pin;
39use std::sync::atomic::Ordering;
40use std::sync::{Arc, Weak};
41use std::time::Duration;
42use std::{fmt, slice};
43use tokio::sync::OwnedSemaphorePermit;
44use tokio::task::yield_now;
45use tokio::time::Sleep;
46use tracing::{debug, error, trace, warn};
47
48enum QueryResultSender {
49 Value {
50 sender: mpsc::UnboundedSender<PeerRecord>,
51 _permit: OwnedSemaphorePermit,
53 },
54 ClosestPeers {
55 sender: mpsc::UnboundedSender<PeerId>,
56 _permit: Option<OwnedSemaphorePermit>,
58 },
59 Providers {
60 key: RecordKey,
61 sender: mpsc::UnboundedSender<PeerId>,
62 _permit: Option<OwnedSemaphorePermit>,
64 },
65 PutValue {
66 sender: mpsc::UnboundedSender<()>,
67 _permit: OwnedSemaphorePermit,
69 },
70 Bootstrap {
71 sender: mpsc::UnboundedSender<()>,
72 },
73}
74
75#[derive(Debug, Default)]
76enum BootstrapCommandState {
77 #[default]
78 NotStarted,
79 InProgress(mpsc::UnboundedReceiver<()>),
80 Finished,
81}
82
83#[must_use = "Node does not function properly unless its runner is driven forward"]
85pub struct NodeRunner {
86 allow_non_global_addresses_in_dht: bool,
88 is_listening: bool,
90 command_receiver: mpsc::Receiver<Command>,
91 swarm: Swarm<Behavior>,
92 shared_weak: Weak<Shared>,
93 next_random_query_interval: Duration,
95 query_id_receivers: HashMap<QueryId, QueryResultSender>,
96 next_subscription_id: usize,
99 topic_subscription_senders: HashMap<TopicHash, IntMap<usize, mpsc::UnboundedSender<Bytes>>>,
102 random_query_timeout: Pin<Box<Fuse<Sleep>>>,
103 periodical_tasks_interval: Pin<Box<Fuse<Sleep>>>,
105 known_peers_registry: Box<dyn KnownPeersRegistry>,
107 connected_servers: HashSet<PeerId>,
108 reserved_peers: HashMap<PeerId, Multiaddr>,
110 temporary_bans: Arc<Mutex<TemporaryBans>>,
112 libp2p_metrics: Option<Metrics>,
114 metrics: Option<SubspaceMetrics>,
116 peer_ip_addresses: HashMap<PeerId, HashSet<IpAddr>>,
118 protocol_version: String,
120 bootstrap_addresses: Vec<Multiaddr>,
122 bootstrap_command_state: Arc<AsyncMutex<BootstrapCommandState>>,
124 removed_addresses_rx: mpsc::UnboundedReceiver<PeerAddressRemovedEvent>,
126 _address_removal_task_handler_id: Option<HandlerId>,
129}
130
131impl fmt::Debug for NodeRunner {
132 #[inline]
133 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134 f.debug_struct("NodeRunner").finish_non_exhaustive()
135 }
136}
137
138pub(crate) struct NodeRunnerConfig {
140 pub(crate) allow_non_global_addresses_in_dht: bool,
141 pub(crate) is_listening: bool,
143 pub(crate) command_receiver: mpsc::Receiver<Command>,
144 pub(crate) swarm: Swarm<Behavior>,
145 pub(crate) shared_weak: Weak<Shared>,
146 pub(crate) next_random_query_interval: Duration,
147 pub(crate) known_peers_registry: Box<dyn KnownPeersRegistry>,
148 pub(crate) reserved_peers: HashMap<PeerId, Multiaddr>,
149 pub(crate) temporary_bans: Arc<Mutex<TemporaryBans>>,
150 pub(crate) libp2p_metrics: Option<Metrics>,
151 pub(crate) metrics: Option<SubspaceMetrics>,
152 pub(crate) protocol_version: String,
153 pub(crate) bootstrap_addresses: Vec<Multiaddr>,
154}
155
156impl NodeRunner {
157 pub(crate) fn new(
158 NodeRunnerConfig {
159 allow_non_global_addresses_in_dht,
160 is_listening,
161 command_receiver,
162 swarm,
163 shared_weak,
164 next_random_query_interval,
165 mut known_peers_registry,
166 reserved_peers,
167 temporary_bans,
168 libp2p_metrics,
169 metrics,
170 protocol_version,
171 bootstrap_addresses,
172 }: NodeRunnerConfig,
173 ) -> Self {
174 let (removed_addresses_tx, removed_addresses_rx) = mpsc::unbounded();
176 let mut address_removal_task_handler_id = None;
177 if let Some(handler_id) = known_peers_registry.on_unreachable_address({
178 Arc::new(move |event| {
179 if let Err(error) = removed_addresses_tx.unbounded_send(event.clone()) {
180 debug!(?error, ?event, "Cannot send PeerAddressRemovedEvent");
181 }
182 })
183 }) {
184 address_removal_task_handler_id.replace(handler_id);
185 }
186
187 Self {
188 allow_non_global_addresses_in_dht,
189 is_listening,
190 command_receiver,
191 swarm,
192 shared_weak,
193 next_random_query_interval,
194 query_id_receivers: HashMap::default(),
195 next_subscription_id: 0,
196 topic_subscription_senders: HashMap::default(),
197 random_query_timeout: Box::pin(tokio::time::sleep(Duration::from_secs(0)).fuse()),
199 periodical_tasks_interval: Box::pin(tokio::time::sleep(Duration::from_secs(0)).fuse()),
201 known_peers_registry,
202 connected_servers: HashSet::new(),
203 reserved_peers,
204 temporary_bans,
205 libp2p_metrics,
206 metrics,
207 peer_ip_addresses: HashMap::new(),
208 protocol_version,
209 bootstrap_addresses,
210 bootstrap_command_state: Arc::new(AsyncMutex::new(BootstrapCommandState::default())),
211 removed_addresses_rx,
212 _address_removal_task_handler_id: address_removal_task_handler_id,
213 }
214 }
215
216 pub async fn run(&mut self) {
218 if self.is_listening {
219 loop {
222 if self.swarm.listeners().next().is_some() {
223 break;
224 }
225
226 if let Some(swarm_event) = self.swarm.next().await {
227 self.register_event_metrics(&swarm_event);
228 self.handle_swarm_event(swarm_event).await;
229 } else {
230 break;
231 }
232 }
233 }
234
235 self.bootstrap().await;
236
237 loop {
238 futures::select! {
239 _ = &mut self.random_query_timeout => {
240 self.handle_random_query_interval();
241 self.random_query_timeout =
243 Box::pin(tokio::time::sleep(self.next_random_query_interval).fuse());
244 self.next_random_query_interval =
245 (self.next_random_query_interval * 2).min(Duration::from_mins(1));
246 },
247 swarm_event = self.swarm.next() => {
248 if let Some(swarm_event) = swarm_event {
249 self.register_event_metrics(&swarm_event);
250 self.handle_swarm_event(swarm_event).await;
251 } else {
252 break;
253 }
254 },
255 command = self.command_receiver.next() => {
256 if let Some(command) = command {
257 self.handle_command(command);
258 } else {
259 break;
260 }
261 },
262 _ = self.known_peers_registry.run().fuse() => {
263 trace!("Network parameters registry runner exited");
264 },
265 _ = &mut self.periodical_tasks_interval => {
266 self.handle_periodical_tasks().await;
267
268 self.periodical_tasks_interval =
269 Box::pin(tokio::time::sleep(Duration::from_secs(5)).fuse());
270 },
271 event = self.removed_addresses_rx.select_next_some() => {
272 self.handle_removed_address_event(event);
273 },
274 }
275
276 yield_now().await;
278 }
279 }
280
281 async fn bootstrap(&mut self) {
283 for (peer_id, address) in strip_peer_id(self.bootstrap_addresses.clone()) {
285 self.swarm
286 .behaviour_mut()
287 .kademlia
288 .add_address(&peer_id, address);
289 }
290
291 let known_peers = self.known_peers_registry.all_known_peers().await;
292
293 if !known_peers.is_empty() {
294 for (peer_id, addresses) in known_peers {
295 for address in addresses.clone() {
296 let address = match address.with_p2p(peer_id) {
297 Ok(address) => address,
298 Err(address) => {
299 warn!(%peer_id, %address, "Failed to add peer ID to known peer address");
300 break;
301 }
302 };
303 self.swarm
304 .behaviour_mut()
305 .kademlia
306 .add_address(&peer_id, address);
307 }
308
309 if let Err(error) = self
310 .swarm
311 .dial(DialOpts::peer_id(peer_id).addresses(addresses).build())
312 {
313 warn!(%peer_id, %error, "Failed to dial peer during bootstrapping");
314 }
315 }
316
317 self.handle_command(Command::Bootstrap {
319 result_sender: None,
320 });
321 return;
322 }
323
324 let bootstrap_command_state = Arc::clone(&self.bootstrap_command_state);
325 let mut bootstrap_command_state = bootstrap_command_state.lock().await;
326 let bootstrap_command_receiver = match &mut *bootstrap_command_state {
327 BootstrapCommandState::NotStarted => {
328 debug!("Bootstrap started.");
329
330 let (bootstrap_command_sender, bootstrap_command_receiver) = mpsc::unbounded();
331
332 self.handle_command(Command::Bootstrap {
333 result_sender: Some(bootstrap_command_sender),
334 });
335
336 *bootstrap_command_state =
337 BootstrapCommandState::InProgress(bootstrap_command_receiver);
338 match &mut *bootstrap_command_state {
339 BootstrapCommandState::InProgress(bootstrap_command_receiver) => {
340 bootstrap_command_receiver
341 }
342 _ => {
343 unreachable!("Was just set to that exact value");
344 }
345 }
346 }
347 BootstrapCommandState::InProgress(bootstrap_command_receiver) => {
348 bootstrap_command_receiver
349 }
350 BootstrapCommandState::Finished => {
351 return;
352 }
353 };
354
355 let mut bootstrap_step = 0usize;
356 loop {
357 futures::select! {
358 swarm_event = self.swarm.next() => {
359 if let Some(swarm_event) = swarm_event {
360 self.register_event_metrics(&swarm_event);
361 self.handle_swarm_event(swarm_event).await;
362 } else {
363 break;
364 }
365 },
366 result = bootstrap_command_receiver.next() => {
367 if result.is_some() {
368 debug!(%bootstrap_step, "Kademlia bootstrapping...");
369 bootstrap_step += 1;
370 } else {
371 break;
372 }
373 }
374 }
375 }
376
377 debug!("Bootstrap finished.");
378 *bootstrap_command_state = BootstrapCommandState::Finished;
379 }
380
381 async fn handle_periodical_tasks(&mut self) {
383 let network_info = self.swarm.network_info();
385 let connections = network_info.connection_counters();
386
387 debug!(?connections, "Current connections and limits.");
388
389 let mut external_addresses = self.swarm.external_addresses().cloned().collect::<Vec<_>>();
391
392 if let Some(shared) = self.shared_weak.upgrade() {
393 debug!(?external_addresses, "Renew external addresses.");
394 let mut addresses = shared.external_addresses.lock();
395 addresses.clear();
396 addresses.append(&mut external_addresses);
397 }
398
399 self.log_kademlia_stats();
400 }
401
402 fn handle_random_query_interval(&mut self) {
403 let random_peer_id = PeerId::random();
404
405 trace!("Starting random Kademlia query for {}", random_peer_id);
406
407 self.swarm
408 .behaviour_mut()
409 .kademlia
410 .get_closest_peers(random_peer_id);
411 }
412
413 fn handle_removed_address_event(&mut self, event: PeerAddressRemovedEvent) {
414 trace!(?event, "Peer address removed event.");
415
416 let bootstrap_node_ids = strip_peer_id(self.bootstrap_addresses.clone())
417 .into_iter()
418 .map(|(peer_id, _)| peer_id)
419 .collect::<Vec<_>>();
420
421 if bootstrap_node_ids.contains(&event.peer_id) {
422 debug!(
423 ?event,
424 ?bootstrap_node_ids,
425 "Skipped removing bootstrap node from Kademlia buckets."
426 );
427
428 return;
429 }
430
431 self.swarm.behaviour_mut().kademlia.remove_address(
433 &event.peer_id,
434 &append_p2p_suffix(event.peer_id, event.address.clone()),
435 );
436
437 self.swarm
438 .behaviour_mut()
439 .kademlia
440 .remove_address(&event.peer_id, &remove_p2p_suffix(event.address));
441 }
442
443 fn handle_remove_listeners(&mut self, removed_listeners: &[Multiaddr]) {
444 let Some(shared) = self.shared_weak.upgrade() else {
445 return;
446 };
447
448 let peer_id = shared.id;
450 shared.listeners.lock().retain(|old_listener| {
451 !removed_listeners.contains(&append_p2p_suffix(peer_id, old_listener.clone()))
452 && !removed_listeners.contains(&remove_p2p_suffix(old_listener.clone()))
453 });
454 }
455
456 async fn handle_swarm_event(&mut self, swarm_event: SwarmEvent<Event>) {
457 match swarm_event {
458 SwarmEvent::Behaviour(Event::Identify(event)) => {
459 self.handle_identify_event(event);
460 }
461 SwarmEvent::Behaviour(Event::Kademlia(event)) => {
462 self.handle_kademlia_event(event);
463 }
464 SwarmEvent::Behaviour(Event::Gossipsub(event)) => {
465 self.handle_gossipsub_event(event);
466 }
467 SwarmEvent::Behaviour(Event::RequestResponse(event)) => {
468 self.handle_request_response_event(event);
469 }
470 SwarmEvent::Behaviour(Event::Autonat(event)) => {
471 self.handle_autonat_event(event);
472 }
473 ref event @ SwarmEvent::NewListenAddr { ref address, .. } => {
474 trace!(?event, "New local listener event.");
475
476 let Some(shared) = self.shared_weak.upgrade() else {
477 return;
478 };
479 shared.listeners.lock().push(address.clone());
480 shared.handlers.new_listener.call_simple(address);
481 }
482 ref event @ SwarmEvent::ListenerClosed { ref addresses, .. } => {
483 trace!(?event, "Local listener closed event.");
484 self.handle_remove_listeners(addresses);
485 }
486 ref event @ SwarmEvent::ExpiredListenAddr { ref address, .. } => {
487 trace!(?event, "Local listener expired event.");
488 self.handle_remove_listeners(slice::from_ref(address));
489 }
490 SwarmEvent::ConnectionEstablished {
491 peer_id,
492 endpoint,
493 num_established,
494 ..
495 } => {
496 if let ConnectedPoint::Dialer { address, .. } = &endpoint {
498 if self.allow_non_global_addresses_in_dht || is_global_address_or_dns(address) {
500 self.known_peers_registry
501 .add_known_peer(peer_id, vec![address.clone()])
502 .await;
503 }
504 }
505
506 let Some(shared) = self.shared_weak.upgrade() else {
507 return;
508 };
509
510 let is_reserved_peer = self.reserved_peers.contains_key(&peer_id);
511 debug!(
512 %peer_id,
513 %is_reserved_peer,
514 ?endpoint,
515 %num_established,
516 "Connection established"
517 );
518
519 let maybe_remote_ip =
520 endpoint
521 .get_remote_address()
522 .iter()
523 .find_map(|protocol| match protocol {
524 Protocol::Ip4(ip) => Some(IpAddr::V4(ip)),
525 Protocol::Ip6(ip) => Some(IpAddr::V6(ip)),
526 _ => None,
527 });
528 if let Some(ip) = maybe_remote_ip {
529 self.peer_ip_addresses
530 .entry(peer_id)
531 .and_modify(|ips| {
532 ips.insert(ip);
533 })
534 .or_insert(HashSet::from([ip]));
535 }
536
537 let num_established_peer_connections = shared
538 .num_established_peer_connections
539 .fetch_add(1, Ordering::SeqCst)
540 + 1;
541
542 shared
543 .handlers
544 .num_established_peer_connections_change
545 .call_simple(&num_established_peer_connections);
546
547 if num_established.get() == 1 {
549 shared.handlers.connected_peer.call_simple(&peer_id);
550 }
551
552 if let Some(metrics) = self.metrics.as_ref() {
553 metrics.inc_established_connections();
554 }
555 }
556 SwarmEvent::ConnectionClosed {
557 peer_id,
558 num_established,
559 cause,
560 ..
561 } => {
562 let Some(shared) = self.shared_weak.upgrade() else {
563 return;
564 };
565
566 debug!(
567 %peer_id,
568 ?cause,
569 %num_established,
570 "Connection closed with peer"
571 );
572
573 if num_established == 0 {
574 self.peer_ip_addresses.remove(&peer_id);
575 self.connected_servers.remove(&peer_id);
576 }
577 let num_established_peer_connections = shared
578 .num_established_peer_connections
579 .fetch_sub(1, Ordering::SeqCst)
580 - 1;
581
582 shared
583 .handlers
584 .num_established_peer_connections_change
585 .call_simple(&num_established_peer_connections);
586
587 if num_established == 0 {
589 shared.handlers.disconnected_peer.call_simple(&peer_id);
590 }
591
592 if let Some(metrics) = self.metrics.as_ref() {
593 metrics.dec_established_connections();
594 }
595 }
596 SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
597 if let Some(peer_id) = &peer_id {
598 let should_ban_temporarily =
599 self.should_temporary_ban_on_dial_error(peer_id, &error);
600
601 trace!(%should_ban_temporarily, "Temporary bans conditions.");
602
603 if should_ban_temporarily {
604 self.temporary_bans.lock().create_or_extend(peer_id);
605 debug!(%peer_id, ?error, "Peer was temporarily banned.");
606 }
607 }
608
609 debug!(
610 ?peer_id,
611 ?error,
612 "SwarmEvent::OutgoingConnectionError for peer."
613 );
614
615 match error {
616 DialError::Transport(ref addresses) => {
617 for (addr, _) in addresses {
618 trace!(?error, ?peer_id, %addr, "SwarmEvent::OutgoingConnectionError (DialError::Transport) for peer.");
619 if let Some(peer_id) = peer_id {
620 self.known_peers_registry
621 .remove_known_peer_addresses(peer_id, vec![addr.clone()])
622 .await;
623 }
624 }
625 }
626 DialError::WrongPeerId { obtained, .. } => {
627 trace!(?error, ?peer_id, obtained_peer_id=?obtained, "SwarmEvent::WrongPeerId (DialError::WrongPeerId) for peer.");
628
629 if let Some(ref peer_id) = peer_id {
630 let kademlia = &mut self.swarm.behaviour_mut().kademlia;
631 let _: Option<_> = kademlia.remove_peer(peer_id);
632 }
633 }
634 _ => {
635 trace!(?error, ?peer_id, "SwarmEvent::OutgoingConnectionError");
636 }
637 }
638 }
639 SwarmEvent::NewExternalAddrCandidate { address } => {
640 trace!(%address, "External address candidate");
641 }
642 SwarmEvent::ExternalAddrConfirmed { address } => {
643 debug!(%address, "Confirmed external address");
644
645 let connected_peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
646 self.swarm.behaviour_mut().identify.push(connected_peers);
647 }
648 SwarmEvent::ExternalAddrExpired { address } => {
649 debug!(%address, "External address expired");
650
651 let connected_peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
652 self.swarm.behaviour_mut().identify.push(connected_peers);
653 }
654 other => {
655 trace!("Other swarm event: {:?}", other);
656 }
657 }
658 }
659
660 fn should_temporary_ban_on_dial_error(&self, peer_id: &PeerId, error: &DialError) -> bool {
661 if true {
663 return false;
664 }
665
666 if self.swarm.is_connected(peer_id) {
668 return false;
669 }
670
671 match &error {
672 DialError::Transport(addresses) => {
673 for (_, error) in addresses {
674 match error {
675 TransportError::MultiaddrNotSupported(_) => {
676 return true;
677 }
678 TransportError::Other(_) => {
679 if self.temporary_bans.lock().is_banned(peer_id) {
681 return false;
682 }
683 }
684 }
685 }
686 true
688 }
689 DialError::LocalPeerId { .. } => {
690 debug!("Local peer dial attempt detected.");
692
693 false
694 }
695 DialError::NoAddresses => {
696 true
698 }
699 DialError::DialPeerConditionFalse(_) => {
700 false
702 }
703 DialError::Aborted => {
704 false
706 }
707 DialError::WrongPeerId { .. } => {
708 false
710 }
711 DialError::Denied { .. } => {
712 false
714 }
715 }
716 }
717
718 fn handle_identify_event(&mut self, event: IdentifyEvent) {
719 let local_peer_id = *self.swarm.local_peer_id();
720
721 if let IdentifyEvent::Received {
722 peer_id, mut info, ..
723 } = event
724 {
725 debug!(?peer_id, protocols = ?info.protocols, "IdentifyEvent::Received");
726
727 if info.protocol_version != self.protocol_version {
729 debug!(
730 %local_peer_id,
731 %peer_id,
732 local_protocol_version = %self.protocol_version,
733 peer_protocol_version = %info.protocol_version,
734 "Peer has different protocol version, banning temporarily",
735 );
736
737 self.temporary_bans.lock().create_or_extend(&peer_id);
738 let _: Result<(), ()> = self.swarm.disconnect_peer_id(peer_id);
740 self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id);
741 self.known_peers_registry
742 .remove_all_known_peer_addresses(peer_id);
743
744 return;
745 }
746
747 self.temporary_bans.lock().remove(&peer_id);
749
750 if info.listen_addrs.len() > 30 {
751 debug!(
752 %local_peer_id,
753 %peer_id,
754 "Node has reported more than 30 addresses; it is identified by {} and {}",
755 info.protocol_version, info.agent_version
756 );
757 info.listen_addrs.truncate(30);
758 }
759
760 let kademlia = &mut self.swarm.behaviour_mut().kademlia;
761 let full_kademlia_support = kademlia
762 .protocol_names()
763 .iter()
764 .all(|local_protocol| info.protocols.contains(local_protocol));
765
766 if full_kademlia_support {
767 let received_addresses = info
768 .listen_addrs
769 .into_iter()
770 .filter(|address| {
771 if self.allow_non_global_addresses_in_dht
772 || is_global_address_or_dns(address)
773 {
774 true
775 } else {
776 trace!(
777 %local_peer_id,
778 %peer_id,
779 %address,
780 "Ignoring self-reported non-global address",
781 );
782
783 false
784 }
785 })
786 .collect::<Vec<_>>();
787 let received_address_strings = received_addresses
788 .iter()
789 .map(ToString::to_string)
790 .collect::<Vec<_>>();
791 let old_addresses = kademlia
792 .kbucket(peer_id)
793 .and_then(|peers| {
794 let key = peer_id.into();
795 peers.iter().find_map(|peer| {
796 (peer.node.key == &key).then_some(
797 peer.node
798 .value
799 .iter()
800 .filter(|existing_address| {
801 let existing_address = existing_address.to_string();
802
803 !received_address_strings.iter().any(|received_address| {
804 received_address.starts_with(&existing_address)
805 || existing_address.starts_with(received_address)
806 })
807 })
808 .cloned()
809 .collect::<Vec<_>>(),
810 )
811 })
812 })
813 .unwrap_or_default();
814
815 for address in received_addresses {
816 debug!(
817 %local_peer_id,
818 %peer_id,
819 %address,
820 protocol_names = ?kademlia.protocol_names(),
821 "Adding self-reported address to Kademlia DHT",
822 );
823
824 kademlia.add_address(&peer_id, address);
825 }
826
827 for old_address in old_addresses {
828 trace!(
829 %local_peer_id,
830 %peer_id,
831 %old_address,
832 "Removing old self-reported address from Kademlia DHT",
833 );
834
835 kademlia.remove_address(&peer_id, &old_address);
836 }
837
838 self.connected_servers.insert(peer_id);
839 } else {
840 debug!(
841 %local_peer_id,
842 %peer_id,
843 peer_protocols = ?info.protocols,
844 protocol_names = ?kademlia.protocol_names(),
845 "Peer doesn't support our Kademlia DHT protocol",
846 );
847
848 kademlia.remove_peer(&peer_id);
849 self.connected_servers.remove(&peer_id);
850 }
851 }
852 }
853
854 fn handle_kademlia_event(&mut self, event: KademliaEvent) {
855 trace!("Kademlia event: {:?}", event);
856
857 match event {
858 KademliaEvent::InboundRequest {
859 request: InboundRequest::AddProvider { record, .. },
860 } => {
861 debug!("Unexpected AddProvider request received: {:?}", record);
862 }
863 KademliaEvent::UnroutablePeer { peer } => {
864 debug!(%peer, "Unroutable peer detected");
865
866 self.swarm.behaviour_mut().kademlia.remove_peer(&peer);
867
868 if let Some(shared) = self.shared_weak.upgrade() {
869 shared
870 .handlers
871 .peer_discovered
872 .call_simple(&PeerDiscovered::UnroutablePeer { peer_id: peer });
873 }
874 }
875 KademliaEvent::RoutablePeer { peer, address } => {
876 debug!(?address, "Routable peer detected: {:?}", peer);
877
878 if let Some(shared) = self.shared_weak.upgrade() {
879 shared
880 .handlers
881 .peer_discovered
882 .call_simple(&PeerDiscovered::RoutablePeer {
883 peer_id: peer,
884 address,
885 });
886 }
887 }
888 KademliaEvent::PendingRoutablePeer { peer, address } => {
889 debug!(?address, "Pending routable peer detected: {:?}", peer);
890
891 if let Some(shared) = self.shared_weak.upgrade() {
892 shared
893 .handlers
894 .peer_discovered
895 .call_simple(&PeerDiscovered::RoutablePeer {
896 peer_id: peer,
897 address,
898 });
899 }
900 }
901 KademliaEvent::OutboundQueryProgressed {
902 step: ProgressStep { last, .. },
903 id,
904 result: QueryResult::GetClosestPeers(result),
905 ..
906 } => {
907 let mut cancelled = false;
908 if let Some(QueryResultSender::ClosestPeers { sender, .. }) =
909 self.query_id_receivers.get(&id)
910 {
911 match result {
912 Ok(GetClosestPeersOk { key, peers }) => {
913 trace!(
914 "Get closest peers query for {} yielded {} results",
915 hex::encode(key),
916 peers.len(),
917 );
918
919 if peers.is_empty()
920 && self.swarm.connected_peers().next().is_some()
922 {
923 debug!("Random Kademlia query has yielded empty list of peers");
924 }
925
926 for peer in peers {
927 cancelled = Self::unbounded_send_and_cancel_on_error(
928 &mut self.swarm.behaviour_mut().kademlia,
929 sender,
930 peer.peer_id,
931 "GetClosestPeersOk",
932 &id,
933 ) || cancelled;
934 }
935 }
936 Err(GetClosestPeersError::Timeout { key, peers }) => {
937 debug!(
938 "Get closest peers query for {} timed out with {} results",
939 hex::encode(key),
940 peers.len(),
941 );
942
943 for peer in peers {
944 cancelled = Self::unbounded_send_and_cancel_on_error(
945 &mut self.swarm.behaviour_mut().kademlia,
946 sender,
947 peer.peer_id,
948 "GetClosestPeersError::Timeout",
949 &id,
950 ) || cancelled;
951 }
952 }
953 }
954 }
955
956 if last || cancelled {
957 self.query_id_receivers.remove(&id);
959 }
960 }
961 KademliaEvent::OutboundQueryProgressed {
962 step: ProgressStep { last, .. },
963 id,
964 result: QueryResult::GetRecord(result),
965 ..
966 } => {
967 let mut cancelled = false;
968 if let Some(QueryResultSender::Value { sender, .. }) =
969 self.query_id_receivers.get(&id)
970 {
971 match result {
972 Ok(GetRecordOk::FoundRecord(rec)) => {
973 trace!(
974 key = hex::encode(&rec.record.key),
975 "Get record query succeeded",
976 );
977
978 cancelled = Self::unbounded_send_and_cancel_on_error(
979 &mut self.swarm.behaviour_mut().kademlia,
980 sender,
981 rec,
982 "GetRecordOk",
983 &id,
984 ) || cancelled;
985 }
986 Ok(GetRecordOk::FinishedWithNoAdditionalRecord { .. }) => {
987 trace!("Get record query yielded no results");
988 }
989 Err(error) => match error {
990 GetRecordError::NotFound { key, .. } => {
991 debug!(
992 key = hex::encode(&key),
993 "Get record query failed with no results",
994 );
995 }
996 GetRecordError::Timeout { key } => {
997 debug!(key = hex::encode(&key), "Get record query timed out");
998 }
999 },
1000 }
1001 }
1002
1003 if last || cancelled {
1004 self.query_id_receivers.remove(&id);
1006 }
1007 }
1008 KademliaEvent::OutboundQueryProgressed {
1009 step: ProgressStep { last, .. },
1010 id,
1011 result: QueryResult::GetProviders(result),
1012 ..
1013 } => {
1014 let mut cancelled = false;
1015 if let Some(QueryResultSender::Providers { key, sender, .. }) =
1016 self.query_id_receivers.get(&id)
1017 {
1018 match result {
1019 Ok(GetProvidersOk::FoundProviders { key, providers }) => {
1020 trace!(
1021 key = hex::encode(&key),
1022 "Get providers query yielded {} results",
1023 providers.len(),
1024 );
1025
1026 for provider in providers {
1027 cancelled = Self::unbounded_send_and_cancel_on_error(
1028 &mut self.swarm.behaviour_mut().kademlia,
1029 sender,
1030 provider,
1031 "GetProvidersOk",
1032 &id,
1033 ) || cancelled;
1034 }
1035 }
1036 Ok(GetProvidersOk::FinishedWithNoAdditionalRecord { closest_peers }) => {
1037 trace!(
1038 key = hex::encode(key),
1039 closest_peers = %closest_peers.len(),
1040 "Get providers query yielded no results"
1041 );
1042 }
1043 Err(error) => {
1044 let GetProvidersError::Timeout { key, .. } = error;
1045
1046 debug!(
1047 key = hex::encode(&key),
1048 "Get providers query failed with no results",
1049 );
1050 }
1051 }
1052 }
1053
1054 if last || cancelled {
1055 self.query_id_receivers.remove(&id);
1057 }
1058 }
1059 KademliaEvent::OutboundQueryProgressed {
1060 step: ProgressStep { last, .. },
1061 id,
1062 result: QueryResult::PutRecord(result),
1063 ..
1064 } => {
1065 let mut cancelled = false;
1066 if let Some(QueryResultSender::PutValue { sender, .. }) =
1067 self.query_id_receivers.get(&id)
1068 {
1069 match result {
1070 Ok(PutRecordOk { key }) => {
1071 trace!("Put record query for {} succeeded", hex::encode(&key));
1072
1073 cancelled = Self::unbounded_send_and_cancel_on_error(
1074 &mut self.swarm.behaviour_mut().kademlia,
1075 sender,
1076 (),
1077 "PutRecordOk",
1078 &id,
1079 ) || cancelled;
1080 }
1081 Err(error) => {
1082 debug!(?error, "Put record query failed.");
1083 }
1084 }
1085 }
1086
1087 if last || cancelled {
1088 self.query_id_receivers.remove(&id);
1090 }
1091 }
1092 KademliaEvent::OutboundQueryProgressed {
1093 step: ProgressStep { last, count },
1094 id,
1095 result: QueryResult::Bootstrap(result),
1096 stats,
1097 } => {
1098 debug!(?stats, %last, %count, ?id, ?result, "Bootstrap OutboundQueryProgressed step.");
1099
1100 let mut cancelled = false;
1101 if let Some(QueryResultSender::Bootstrap { sender }) =
1102 self.query_id_receivers.get_mut(&id)
1103 {
1104 match result {
1105 Ok(BootstrapOk {
1106 peer,
1107 num_remaining,
1108 }) => {
1109 trace!(%peer, %num_remaining, %last, "Bootstrap query step succeeded");
1110
1111 cancelled = Self::unbounded_send_and_cancel_on_error(
1112 &mut self.swarm.behaviour_mut().kademlia,
1113 sender,
1114 (),
1115 "Bootstrap",
1116 &id,
1117 ) || cancelled;
1118 }
1119 Err(error) => {
1120 debug!(?error, "Bootstrap query failed.");
1121 }
1122 }
1123 }
1124
1125 if last || cancelled {
1126 self.query_id_receivers.remove(&id);
1128 }
1129 }
1130 _ => {}
1131 }
1132 }
1133
1134 fn unbounded_send_and_cancel_on_error<T>(
1136 kademlia: &mut Kademlia<DummyRecordStore>,
1137 sender: &mpsc::UnboundedSender<T>,
1138 value: T,
1139 channel: &'static str,
1140 id: &QueryId,
1141 ) -> bool {
1142 if sender.unbounded_send(value).is_err() {
1143 debug!("{} channel was dropped", channel);
1144
1145 if let Some(mut query) = kademlia.query_mut(id) {
1147 query.finish();
1148 }
1149 true
1150 } else {
1151 false
1152 }
1153 }
1154
1155 fn handle_gossipsub_event(&mut self, event: GossipsubEvent) {
1156 if let GossipsubEvent::Message { message, .. } = event
1157 && let Some(senders) = self.topic_subscription_senders.get(&message.topic)
1158 {
1159 let bytes = Bytes::from(message.data);
1160
1161 for sender in senders.values() {
1162 let _: Result<(), _> = sender.unbounded_send(bytes.clone());
1164 }
1165 }
1166 }
1167
1168 fn handle_request_response_event(&mut self, event: RequestResponseEvent) {
1169 trace!("Request response event: {:?}", event);
1171 }
1172
1173 fn handle_autonat_event(&mut self, event: AutonatEvent) {
1174 trace!(?event, "Autonat event received.");
1175 let autonat = &self.swarm.behaviour().autonat;
1176 debug!(
1177 public_address=?autonat.public_address(),
1178 confidence=%autonat.confidence(),
1179 "Current public address confidence."
1180 );
1181
1182 match event {
1183 AutonatEvent::InboundProbe(_inbound_probe_event) => {
1184 }
1186 AutonatEvent::OutboundProbe(outbound_probe_event) => {
1187 match outbound_probe_event {
1188 OutboundProbeEvent::Request { peer, .. } => {
1189 self.swarm
1192 .behaviour_mut()
1193 .connection_limits
1194 .add_to_incoming_allow_list(
1196 peer,
1197 self.peer_ip_addresses
1198 .get(&peer)
1199 .iter()
1200 .flat_map(|ip_addresses| ip_addresses.iter())
1201 .copied(),
1202 1,
1203 );
1204 }
1205 OutboundProbeEvent::Response { peer, .. } => {
1206 self.swarm
1207 .behaviour_mut()
1208 .connection_limits
1209 .remove_from_incoming_allow_list(&peer, Some(1));
1210 }
1211 OutboundProbeEvent::Error { peer, .. } => {
1212 if let Some(peer) = peer {
1213 self.swarm
1214 .behaviour_mut()
1215 .connection_limits
1216 .remove_from_incoming_allow_list(&peer, Some(1));
1217 }
1218 }
1219 }
1220 }
1221 AutonatEvent::StatusChanged { old, new } => {
1222 debug!(?old, ?new, "Public address status changed.");
1223
1224 if let (NatStatus::Public(old_address), NatStatus::Private) = (old, new.clone()) {
1226 self.swarm.remove_external_address(&old_address);
1227 debug!(
1228 ?old_address,
1229 new_status = ?new,
1230 "Removing old external address...",
1231 );
1232
1233 self.swarm.behaviour_mut().kademlia.set_mode(None);
1235 }
1236
1237 let connected_peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
1238 self.swarm.behaviour_mut().identify.push(connected_peers);
1239 }
1240 }
1241 }
1242
1243 fn handle_command(&mut self, command: Command) {
1244 match command {
1245 Command::GetValue {
1246 key,
1247 result_sender,
1248 permit,
1249 } => {
1250 let query_id = self
1251 .swarm
1252 .behaviour_mut()
1253 .kademlia
1254 .get_record(key.to_bytes().into());
1255
1256 self.query_id_receivers.insert(
1257 query_id,
1258 QueryResultSender::Value {
1259 sender: result_sender,
1260 _permit: permit,
1261 },
1262 );
1263 }
1264 Command::PutValue {
1265 key,
1266 value,
1267 result_sender,
1268 permit,
1269 } => {
1270 let local_peer_id = *self.swarm.local_peer_id();
1271
1272 let record = Record {
1273 key: key.into(),
1274 value,
1275 publisher: Some(local_peer_id),
1276 expires: None, };
1278 let query_result = self
1279 .swarm
1280 .behaviour_mut()
1281 .kademlia
1282 .put_record(record, Quorum::One);
1283
1284 match query_result {
1285 Ok(query_id) => {
1286 self.query_id_receivers.insert(
1287 query_id,
1288 QueryResultSender::PutValue {
1289 sender: result_sender,
1290 _permit: permit,
1291 },
1292 );
1293 }
1294 Err(err) => {
1295 warn!(?err, "Failed to put value.");
1296 }
1297 }
1298 }
1299 Command::Subscribe {
1300 topic,
1301 result_sender,
1302 } => {
1303 assert!(
1304 self.swarm.behaviour().gossipsub.is_enabled(),
1305 "Gossipsub protocol is disabled."
1306 );
1307
1308 let topic_hash = topic.hash();
1309 let (sender, receiver) = mpsc::unbounded();
1310
1311 let subscription_id = self.next_subscription_id;
1313 self.next_subscription_id += 1;
1314
1315 let created_subscription = CreatedSubscription {
1316 subscription_id,
1317 receiver,
1318 };
1319
1320 match self.topic_subscription_senders.entry(topic_hash) {
1321 Entry::Occupied(mut entry) => {
1322 if result_sender.send(Ok(created_subscription)).is_ok() {
1324 entry.get_mut().insert(subscription_id, sender);
1325 }
1326 }
1327 Entry::Vacant(entry) => {
1328 if let Some(gossipsub) = self.swarm.behaviour_mut().gossipsub.as_mut() {
1331 match gossipsub.subscribe(&topic) {
1332 Ok(true) => {
1333 if result_sender.send(Ok(created_subscription)).is_ok() {
1334 entry
1335 .insert(IntMap::from_iter([(subscription_id, sender)]));
1336 }
1337 }
1338 Ok(false) => {
1339 panic!(
1340 "Logic error, topic subscription wasn't created, this \
1341 must never happen"
1342 );
1343 }
1344 Err(error) => {
1345 let _: Result<(), _> = result_sender.send(Err(error));
1346 }
1347 }
1348 }
1349 }
1350 }
1351 }
1352 Command::Unsubscribe {
1353 topic,
1354 subscription_id,
1355 } => {
1356 assert!(
1357 self.swarm.behaviour().gossipsub.is_enabled(),
1358 "Gossipsub protocol is disabled."
1359 );
1360
1361 if let Entry::Occupied(mut entry) =
1362 self.topic_subscription_senders.entry(topic.hash())
1363 {
1364 entry.get_mut().remove(&subscription_id);
1365
1366 if entry.get().is_empty() {
1368 entry.remove_entry();
1369
1370 if let Some(gossipsub) = self.swarm.behaviour_mut().gossipsub.as_mut()
1371 && !gossipsub.unsubscribe(&topic)
1372 {
1373 warn!(
1374 "Can't unsubscribe from topic {topic} because subscription doesn't \
1375 exist, this is a logic error in the subspace or swarm libraries"
1376 );
1377 }
1378 }
1379 } else {
1380 error!(
1381 "Can't unsubscribe from topic {topic} because subscription doesn't exist, \
1382 this is a logic error in the subspace library"
1383 );
1384 }
1385 }
1386 Command::Publish {
1387 topic,
1388 message,
1389 result_sender,
1390 } => {
1391 assert!(
1392 self.swarm.behaviour().gossipsub.is_enabled(),
1393 "Gossipsub protocol is disabled"
1394 );
1395
1396 if let Some(gossipsub) = self.swarm.behaviour_mut().gossipsub.as_mut() {
1397 let _: Result<(), _> =
1399 result_sender.send(gossipsub.publish(topic, message).map(|_message_id| ()));
1400 }
1401 }
1402 Command::GetClosestPeers {
1403 key,
1404 result_sender,
1405 permit,
1406 } => {
1407 let query_id = self.swarm.behaviour_mut().kademlia.get_closest_peers(key);
1408
1409 self.query_id_receivers.insert(
1410 query_id,
1411 QueryResultSender::ClosestPeers {
1412 sender: result_sender,
1413 _permit: permit,
1414 },
1415 );
1416 }
1417 Command::GetClosestLocalPeers {
1418 key,
1419 source,
1420 result_sender,
1421 } => {
1422 let source = source.unwrap_or_else(|| *self.swarm.local_peer_id());
1423 let result = self
1424 .swarm
1425 .behaviour_mut()
1426 .kademlia
1427 .find_closest_local_peers(&KBucketKey::from(key), &source)
1428 .filter(|peer| !peer.multiaddrs.is_empty())
1429 .map(|peer| (peer.node_id, peer.multiaddrs))
1430 .collect();
1431
1432 let _: Result<(), _> = result_sender.send(result);
1434 }
1435 Command::GenericRequest {
1436 peer_id,
1437 addresses,
1438 protocol_name,
1439 request,
1440 result_sender,
1441 } => {
1442 self.swarm.behaviour_mut().request_response.send_request(
1443 &peer_id,
1444 protocol_name,
1445 request,
1446 result_sender,
1447 IfDisconnected::TryConnect,
1448 addresses,
1449 );
1450 }
1451 Command::GetProviders {
1452 key,
1453 result_sender,
1454 permit,
1455 } => {
1456 let query_id = self
1457 .swarm
1458 .behaviour_mut()
1459 .kademlia
1460 .get_providers(key.clone());
1461
1462 self.query_id_receivers.insert(
1463 query_id,
1464 QueryResultSender::Providers {
1465 key,
1466 sender: result_sender,
1467 _permit: permit,
1468 },
1469 );
1470 }
1471 Command::BanPeer { peer_id } => {
1472 self.ban_peer(peer_id);
1473 }
1474 Command::Dial { address } => {
1475 let _: Result<(), _> = self.swarm.dial(address);
1476 }
1477 Command::ConnectedPeers { result_sender } => {
1478 let connected_peers = self.swarm.connected_peers().copied().collect();
1479
1480 let _: Result<(), _> = result_sender.send(connected_peers);
1481 }
1482 Command::ConnectedServers { result_sender } => {
1483 let connected_servers = self.connected_servers.iter().copied().collect();
1484
1485 let _: Result<(), _> = result_sender.send(connected_servers);
1486 }
1487 Command::Bootstrap { result_sender } => {
1488 let kademlia = &mut self.swarm.behaviour_mut().kademlia;
1489
1490 match kademlia.bootstrap() {
1491 Ok(query_id) => {
1492 if let Some(result_sender) = result_sender {
1493 self.query_id_receivers.insert(
1494 query_id,
1495 QueryResultSender::Bootstrap {
1496 sender: result_sender,
1497 },
1498 );
1499 }
1500 }
1501 Err(err) => {
1502 debug!(?err, "Bootstrap error.");
1503 }
1504 }
1505 }
1506 }
1507 }
1508
1509 fn ban_peer(&mut self, peer_id: PeerId) {
1510 self.temporary_bans.lock().remove(&peer_id);
1512
1513 debug!(?peer_id, "Banning peer on network level");
1514
1515 self.swarm.behaviour_mut().block_list.block_peer(peer_id);
1516 self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id);
1517 self.known_peers_registry
1518 .remove_all_known_peer_addresses(peer_id);
1519 }
1520
1521 fn register_event_metrics(&mut self, swarm_event: &SwarmEvent<Event>) {
1522 if let Some(ref mut metrics) = self.libp2p_metrics {
1523 match swarm_event {
1524 SwarmEvent::Behaviour(Event::Ping(ping_event)) => {
1525 metrics.record(ping_event);
1526 }
1527 SwarmEvent::Behaviour(Event::Identify(identify_event)) => {
1528 metrics.record(identify_event);
1529 }
1530 SwarmEvent::Behaviour(Event::Kademlia(kademlia_event)) => {
1531 metrics.record(kademlia_event);
1532 }
1533 SwarmEvent::Behaviour(Event::Gossipsub(gossipsub_event)) => {
1534 metrics.record(gossipsub_event);
1535 }
1536 swarm_event => {
1541 metrics.record(swarm_event);
1542 }
1543 }
1544 }
1545 }
1546
1547 fn log_kademlia_stats(&mut self) {
1548 let mut peer_counter = 0usize;
1549 let mut peer_with_no_address_counter = 0usize;
1550 for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
1551 for entry in kbucket.iter() {
1552 peer_counter += 1;
1553 if entry.node.value.len() == 0 {
1554 peer_with_no_address_counter += 1;
1555 }
1556 }
1557 }
1558
1559 debug!(
1560 peers = %peer_counter,
1561 peers_with_no_address = %peer_with_no_address_counter,
1562 "Kademlia stats"
1563 );
1564 }
1565}