1use crate::utils::{AsyncJoinOnDrop, Handler, HandlerFn};
2use ab_core_primitives::hashes::Blake3Hash;
3use async_trait::async_trait;
4use event_listener_primitives::HandlerId;
5use fs2::FileExt;
6use futures::FutureExt;
7use futures::future::{Fuse, pending};
8use libp2p::multiaddr::Protocol;
9use libp2p::{Multiaddr, PeerId};
10use memmap2::{MmapMut, MmapOptions};
11use parity_scale_codec::{Compact, CompactLen, Decode, Encode};
12use parking_lot::Mutex;
13use schnellru::{ByLength, LruMap};
14use std::collections::HashSet;
15use std::fs::OpenOptions;
16use std::io::{Read, Seek, SeekFrom};
17use std::path::Path;
18use std::pin::Pin;
19use std::str::FromStr;
20use std::sync::Arc;
21use std::time::{Duration, SystemTime};
22use std::{io, mem};
23use thiserror::Error;
24use tokio::time::{Sleep, sleep};
25use tracing::{debug, error, trace, warn};
26
27type FailureTime = Option<SystemTime>;
29
30const KNOWN_PEERS_CACHE_SIZE: u32 = 100;
32const ADDRESSES_CACHE_SIZE: u32 = 30;
34const DATA_FLUSH_DURATION_SECS: u64 = 5;
36const REMOVE_KNOWN_PEERS_GRACE_PERIOD: Duration = Duration::from_secs(24 * 3600);
38const REMOVE_KNOWN_PEERS_GRACE_PERIOD_FOR_KADEMLIA: Duration = Duration::from_secs(3600);
40const STALE_KNOWN_PEERS_TIMEOUT: Duration = Duration::from_secs(24 * 3600);
42
43#[derive(Debug, Clone)]
45pub struct PeerAddressRemovedEvent {
46 pub peer_id: PeerId,
48 pub address: Multiaddr,
50}
51
52#[derive(Debug, Encode, Decode)]
53struct EncodableKnownPeerAddress {
54 multiaddr: Vec<u8>,
55 failure_time: Option<u64>,
57}
58
59#[derive(Debug, Encode, Decode)]
60struct EncodableKnownPeers {
61 cache_size: u32,
62 timestamp: u64,
63 known_peers: Vec<(Vec<u8>, Vec<EncodableKnownPeerAddress>)>,
65}
66
67impl EncodableKnownPeers {
68 fn into_cache(mut self) -> LruMap<PeerId, LruMap<Multiaddr, FailureTime>> {
69 let mut peers_cache = LruMap::new(ByLength::new(self.cache_size));
70
71 self.known_peers
73 .sort_by_cached_key(|(_peer_id, addresses)| {
74 addresses.iter().fold(0u64, |acc, address| {
75 acc.max(address.failure_time.unwrap_or(u64::MAX))
76 })
77 });
78
79 'peers: for (peer_id, addresses) in self.known_peers.into_iter().rev() {
81 let mut peer_cache =
82 LruMap::<Multiaddr, FailureTime>::new(ByLength::new(ADDRESSES_CACHE_SIZE));
83
84 let peer_id = match PeerId::from_bytes(&peer_id) {
85 Ok(peer_id) => peer_id,
86 Err(error) => {
87 debug!(%error, "Failed to decode known peer ID, skipping peer entry");
88 continue;
89 }
90 };
91 for address in addresses {
92 let multiaddr = match Multiaddr::try_from(address.multiaddr) {
93 Ok(multiaddr) => multiaddr,
94 Err(error) => {
95 debug!(
96 %error,
97 "Failed to decode known peer multiaddress, skipping peer entry"
98 );
99 continue 'peers;
100 }
101 };
102
103 peer_cache.insert(
104 multiaddr,
105 address.failure_time.map(|failure_time| {
106 SystemTime::UNIX_EPOCH + Duration::from_secs(failure_time)
107 }),
108 );
109 }
110
111 peers_cache.insert(peer_id, peer_cache);
112 }
113
114 peers_cache
115 }
116
117 fn from_cache(cache: &LruMap<PeerId, LruMap<Multiaddr, FailureTime>>, cache_size: u32) -> Self {
118 let single_peer_encoded_address_size =
119 KnownPeersManager::single_peer_encoded_address_size();
120 Self {
121 cache_size,
122 timestamp: SystemTime::now()
123 .duration_since(SystemTime::UNIX_EPOCH)
124 .expect("Never before Unix epoch; qed")
125 .as_secs(),
126 known_peers: cache
127 .iter()
128 .map(|(peer_id, addresses)| {
129 (
130 peer_id.to_bytes(),
131 addresses
132 .iter()
133 .filter_map(|(multiaddr, failure_time)| {
134 let multiaddr_bytes = multiaddr.to_vec();
135
136 if multiaddr_bytes.encoded_size() > single_peer_encoded_address_size
137 {
138 debug!(
140 encoded_multiaddress_size = %multiaddr_bytes.encoded_size(),
141 limit = %single_peer_encoded_address_size,
142 ?multiaddr,
143 "Unexpectedly large multiaddress"
144 );
145 return None;
146 }
147
148 Some(EncodableKnownPeerAddress {
149 multiaddr: multiaddr_bytes,
150 failure_time: failure_time.map(|failure_time| {
151 failure_time
152 .duration_since(SystemTime::UNIX_EPOCH)
153 .expect("Never before Unix epoch; qed")
154 .as_secs()
155 }),
156 })
157 })
158 .collect(),
159 )
160 })
161 .collect(),
162 }
163 }
164}
165
166#[derive(Debug)]
168struct KnownPeersSlots {
169 a: MmapMut,
170 b: MmapMut,
171}
172
173impl KnownPeersSlots {
174 fn write_to_inactive_slot(&mut self, encodable_known_peers: &EncodableKnownPeers) {
175 let known_peers_bytes = encodable_known_peers.encode();
176 let (encoded_bytes, remaining_bytes) = self.a.split_at_mut(known_peers_bytes.len());
177 encoded_bytes.copy_from_slice(&known_peers_bytes);
178 remaining_bytes[..Blake3Hash::SIZE]
180 .copy_from_slice(blake3::hash(&known_peers_bytes).as_bytes());
181 if let Err(error) = self.a.flush() {
182 warn!(%error, "Failed to flush known peers to disk");
183 }
184
185 mem::swap(&mut self.a, &mut self.b);
187 }
188}
189
190#[async_trait]
192pub trait KnownPeersRegistry: Send + Sync {
193 async fn add_known_peer(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>);
195
196 async fn remove_known_peer_addresses(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>);
198
199 fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId);
201
202 async fn all_known_peers(&mut self) -> Vec<(PeerId, Vec<Multiaddr>)>;
204
205 async fn run(&mut self);
207
208 fn on_unreachable_address(
213 &mut self,
214 handler: HandlerFn<PeerAddressRemovedEvent>,
215 ) -> Option<HandlerId>;
216}
217
218#[derive(Clone, Default)]
220pub(crate) struct StubNetworkingParametersManager;
221
222impl StubNetworkingParametersManager {
223 pub fn boxed(self) -> Box<dyn KnownPeersRegistry> {
225 Box::new(self)
226 }
227}
228
229#[async_trait]
230impl KnownPeersRegistry for StubNetworkingParametersManager {
231 async fn add_known_peer(&mut self, _: PeerId, _: Vec<Multiaddr>) {}
232
233 async fn remove_known_peer_addresses(&mut self, _peer_id: PeerId, _addresses: Vec<Multiaddr>) {}
234
235 fn remove_all_known_peer_addresses(&mut self, _peer_id: PeerId) {}
236
237 async fn all_known_peers(&mut self) -> Vec<(PeerId, Vec<Multiaddr>)> {
238 Vec::new()
239 }
240
241 async fn run(&mut self) {
242 futures::future::pending().await
244 }
245
246 fn on_unreachable_address(
247 &mut self,
248 _handler: HandlerFn<PeerAddressRemovedEvent>,
249 ) -> Option<HandlerId> {
250 None
251 }
252}
253
254#[derive(Debug, Clone)]
256pub struct KnownPeersManagerConfig {
257 pub enable_known_peers_source: bool,
259 pub cache_size: u32,
261 pub ignore_peer_list: HashSet<PeerId>,
263 pub path: Option<Box<Path>>,
265 pub failed_address_cache_removal_interval: Duration,
267 pub failed_address_kademlia_removal_interval: Duration,
269 pub stale_known_peers_timeout: Duration,
271}
272
273impl Default for KnownPeersManagerConfig {
274 fn default() -> Self {
275 Self {
276 enable_known_peers_source: true,
277 cache_size: KNOWN_PEERS_CACHE_SIZE,
278 ignore_peer_list: Default::default(),
279 path: None,
280 failed_address_cache_removal_interval: REMOVE_KNOWN_PEERS_GRACE_PERIOD,
281 failed_address_kademlia_removal_interval: REMOVE_KNOWN_PEERS_GRACE_PERIOD_FOR_KADEMLIA,
282 stale_known_peers_timeout: STALE_KNOWN_PEERS_TIMEOUT,
283 }
284 }
285}
286
287#[derive(Debug, Error)]
289pub enum KnownPeersManagerPersistenceError {
290 #[error("I/O error: {0}")]
292 Io(#[from] io::Error),
293 #[error("Can't preallocate known peers file, probably not enough space on disk: {0}")]
295 CantPreallocateKnownPeersFile(io::Error),
296}
297
298#[derive(Debug)]
300pub struct KnownPeersManager {
301 cache_need_saving: bool,
303 known_peers: LruMap<PeerId, LruMap<Multiaddr, FailureTime>>,
305 networking_parameters_save_delay: Pin<Box<Fuse<Sleep>>>,
307 known_peers_slots: Option<Arc<Mutex<KnownPeersSlots>>>,
309 address_removed: Handler<PeerAddressRemovedEvent>,
311 config: KnownPeersManagerConfig,
313}
314
315impl Drop for KnownPeersManager {
316 fn drop(&mut self) {
317 if self.cache_need_saving
318 && let Some(known_peers_slots) = &self.known_peers_slots
319 {
320 known_peers_slots
321 .lock()
322 .write_to_inactive_slot(&EncodableKnownPeers::from_cache(
323 &self.known_peers,
324 self.config.cache_size,
325 ));
326 }
327 }
328}
329
330impl KnownPeersManager {
331 fn init_file(
332 path: &Path,
333 cache_size: u32,
334 ) -> Result<
335 (Option<EncodableKnownPeers>, Arc<Mutex<KnownPeersSlots>>),
336 KnownPeersManagerPersistenceError,
337 > {
338 let mut file = OpenOptions::new()
339 .read(true)
340 .write(true)
341 .create(true)
342 .truncate(false)
343 .open(path)?;
344
345 let known_addresses_size = Self::known_addresses_size(cache_size);
346 let file_size = Self::file_size(cache_size);
347 let mut maybe_newest_known_addresses = None::<EncodableKnownPeers>;
349
350 {
351 let mut file_contents = Vec::with_capacity(file_size);
352 file.read_to_end(&mut file_contents)?;
353 if !file_contents.is_empty() {
354 for known_addresses_bytes in file_contents.chunks_exact(file_contents.len() / 2) {
355 let known_addresses =
356 match EncodableKnownPeers::decode(&mut &*known_addresses_bytes) {
357 Ok(known_addresses) => known_addresses,
358 Err(error) => {
359 debug!(%error, "Failed to decode encodable known peers");
360 continue;
361 }
362 };
363
364 let (encoded_bytes, remaining_bytes) =
365 known_addresses_bytes.split_at(known_addresses.encoded_size());
366 if remaining_bytes.len() < Blake3Hash::SIZE {
367 debug!(
368 remaining_bytes = %remaining_bytes.len(),
369 "Not enough bytes to decode checksum, file was likely corrupted"
370 );
371 continue;
372 }
373
374 let actual_checksum = *blake3::hash(encoded_bytes).as_bytes();
376 let expected_checksum = &remaining_bytes[..Blake3Hash::SIZE];
377 if actual_checksum != expected_checksum {
378 debug!(
379 encoded_bytes_len = %encoded_bytes.len(),
380 actual_checksum = %hex::encode(actual_checksum),
381 expected_checksum = %hex::encode(expected_checksum),
382 "Hash doesn't match, possible disk corruption or file was just \
383 created, ignoring"
384 );
385 continue;
386 }
387
388 match &mut maybe_newest_known_addresses {
389 Some(newest_known_addresses) => {
390 if newest_known_addresses.timestamp < known_addresses.timestamp {
391 *newest_known_addresses = known_addresses;
392 }
393 }
394 None => {
395 maybe_newest_known_addresses.replace(known_addresses);
396 }
397 }
398 }
399 }
400 }
401
402 let file_resized = if file.seek(SeekFrom::End(0))? != file_size as u64 {
404 file.allocate(file_size as u64)
407 .map_err(KnownPeersManagerPersistenceError::CantPreallocateKnownPeersFile)?;
408 file.set_len(file_size as u64)?;
410 true
411 } else {
412 false
413 };
414
415 let mut a_mmap = unsafe {
417 MmapOptions::new()
418 .len(known_addresses_size)
419 .map_mut(&file)?
420 };
421 let mut b_mmap = unsafe {
423 MmapOptions::new()
424 .offset(known_addresses_size as u64)
425 .len(known_addresses_size)
426 .map_mut(&file)?
427 };
428
429 if file_resized {
430 if let Some(newest_known_addresses) = &maybe_newest_known_addresses {
432 let bytes = newest_known_addresses.encode();
433 a_mmap[..bytes.len()].copy_from_slice(&bytes);
434 a_mmap.flush()?;
435 b_mmap[..bytes.len()].copy_from_slice(&bytes);
436 b_mmap.flush()?;
437 }
438 }
439
440 let known_peers_slots = Arc::new(Mutex::new(KnownPeersSlots {
441 a: a_mmap,
442 b: b_mmap,
443 }));
444
445 Ok((maybe_newest_known_addresses, known_peers_slots))
446 }
447
448 pub fn new(config: KnownPeersManagerConfig) -> Result<Self, KnownPeersManagerPersistenceError> {
450 let (maybe_newest_known_addresses, known_peers_slots) = if let Some(path) = &config.path {
451 Self::init_file(path, config.cache_size)
452 .map(|(known_addresses, slots)| (known_addresses, Some(slots)))?
453 } else {
454 (None, None)
455 };
456
457 let known_peers = maybe_newest_known_addresses
458 .filter(|newest_known_addresses| {
459 let time_since_unix_epoch = SystemTime::now()
460 .duration_since(SystemTime::UNIX_EPOCH)
461 .expect("Never before Unix epoch; qed");
462 let known_peers_age = time_since_unix_epoch
463 .saturating_sub(Duration::from_secs(newest_known_addresses.timestamp));
464
465 known_peers_age <= config.stale_known_peers_timeout
466 })
467 .map(EncodableKnownPeers::into_cache)
468 .unwrap_or_else(|| LruMap::new(ByLength::new(config.cache_size)));
469
470 Ok(Self {
471 cache_need_saving: false,
472 known_peers,
473 networking_parameters_save_delay: Self::default_delay(),
474 known_peers_slots,
475 address_removed: Default::default(),
476 config,
477 })
478 }
479
480 pub fn file_size(cache_size: u32) -> usize {
482 Self::known_addresses_size(cache_size) * 2
484 }
485
486 pub fn boxed(self) -> Box<dyn KnownPeersRegistry> {
488 Box::new(self)
489 }
490
491 fn default_delay() -> Pin<Box<Fuse<Sleep>>> {
493 Box::pin(sleep(Duration::from_secs(DATA_FLUSH_DURATION_SECS)).fuse())
494 }
495
496 fn single_peer_encoded_address_size() -> usize {
497 let multiaddr = Multiaddr::from_str(
498 "/ip4/127.0.0.1/tcp/1234/p2p/12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp",
499 )
500 .expect("Valid multiaddr; qed");
501 multiaddr.to_vec().encoded_size() * 3
504 }
505
506 fn single_peer_encoded_size() -> usize {
509 PeerId::random().to_bytes().encoded_size()
512 + Compact::compact_len(&(ADDRESSES_CACHE_SIZE))
513 + (Self::single_peer_encoded_address_size() + Some(0u64).encoded_size())
514 * ADDRESSES_CACHE_SIZE as usize
515 }
516
517 fn known_addresses_size(cache_size: u32) -> usize {
522 mem::size_of::<u64>()
525 + Compact::compact_len(&(cache_size))
526 + Self::single_peer_encoded_size() * cache_size as usize
527 + Blake3Hash::SIZE
528 }
529
530 fn persistent_enabled(&self) -> bool {
531 self.config.path.is_some()
532 }
533
534 #[cfg(test)]
535 pub(crate) fn contains_address(&self, peer_id: &PeerId, address: &Multiaddr) -> bool {
536 self.known_peers
537 .peek(peer_id)
538 .map(|addresses| addresses.peek(address).is_some())
539 .unwrap_or_default()
540 }
541}
542
543#[async_trait]
544impl KnownPeersRegistry for KnownPeersManager {
545 async fn add_known_peer(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>) {
546 if self.config.ignore_peer_list.contains(&peer_id) {
547 debug!(
548 %peer_id,
549 addr_num=addresses.len(),
550 "Adding new peer addresses canceled (ignore list): {:?}",
551 addresses
552 );
553
554 return;
555 }
556
557 debug!(
558 %peer_id,
559 addr_num=addresses.len(),
560 "Add new peer addresses to the networking parameters registry: {:?}",
561 addresses
562 );
563
564 addresses
565 .iter()
566 .filter(|addr| {
567 !addr
569 .into_iter()
570 .any(|protocol| matches!(protocol, Protocol::Memory(..)))
571 })
572 .cloned()
573 .map(remove_p2p_suffix)
574 .for_each(|addr| {
575 self.known_peers
577 .get_or_insert(peer_id, || LruMap::new(ByLength::new(ADDRESSES_CACHE_SIZE)));
578
579 if let Some(addresses) = self.known_peers.get(&peer_id) {
580 let previous_entry = addresses.peek(&addr).cloned().flatten();
581 addresses.insert(addr, None);
582 if let Some(previous_entry) = previous_entry {
583 trace!(%peer_id, "Address cache entry replaced: {:?}", previous_entry);
584 }
585 }
586 });
587
588 self.cache_need_saving = true;
589 }
590
591 async fn remove_known_peer_addresses(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>) {
592 trace!(%peer_id, "Remove peer addresses from the networking parameters registry: {:?}", addresses);
593
594 let removed_addresses = remove_known_peer_addresses_internal(
595 &mut self.known_peers,
596 peer_id,
597 addresses,
598 self.config.failed_address_cache_removal_interval,
599 self.config.failed_address_kademlia_removal_interval,
600 );
601
602 for event in removed_addresses {
603 self.address_removed.call_simple(&event);
604 }
605
606 self.cache_need_saving = true;
607 }
608
609 fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId) {
610 trace!(%peer_id, "Remove all peer addresses from the networking parameters registry");
611
612 self.known_peers.remove(&peer_id);
613
614 self.cache_need_saving = true;
615 }
616
617 async fn all_known_peers(&mut self) -> Vec<(PeerId, Vec<Multiaddr>)> {
618 if !self.config.enable_known_peers_source {
619 return Vec::new();
620 }
621
622 self.known_peers
623 .iter()
624 .map(|(&peer_id, addresses)| {
625 (
626 peer_id,
627 addresses
628 .iter()
629 .map(|(addr, _failure_time)| addr.clone())
630 .collect(),
631 )
632 })
633 .collect()
634 }
635
636 async fn run(&mut self) {
637 if !self.persistent_enabled() {
638 pending().await
639 }
640
641 loop {
642 (&mut self.networking_parameters_save_delay).await;
643
644 if let Some(known_peers_slots) = &self.known_peers_slots
645 && self.cache_need_saving
646 {
647 let known_peers =
648 EncodableKnownPeers::from_cache(&self.known_peers, self.config.cache_size);
649 let known_peers_slots = Arc::clone(known_peers_slots);
650 let write_known_peers_fut =
651 AsyncJoinOnDrop::new(tokio::task::spawn_blocking(move || {
652 known_peers_slots
653 .lock()
654 .write_to_inactive_slot(&known_peers);
655 }));
656
657 if let Err(error) = write_known_peers_fut.await {
658 error!(%error, "Failed to write known peers");
659 }
660
661 self.cache_need_saving = false;
662 }
663 self.networking_parameters_save_delay = KnownPeersManager::default_delay();
664 }
665 }
666
667 fn on_unreachable_address(
668 &mut self,
669 handler: HandlerFn<PeerAddressRemovedEvent>,
670 ) -> Option<HandlerId> {
671 let handler_id = self.address_removed.add(handler);
672
673 Some(handler_id)
674 }
675}
676
677pub(crate) fn remove_p2p_suffix(mut address: Multiaddr) -> Multiaddr {
679 let last_protocol = address.pop();
680
681 if let Some(Protocol::P2p(_)) = &last_protocol {
682 return address;
683 }
684
685 if let Some(protocol) = last_protocol {
686 address.push(protocol)
687 }
688
689 address
690}
691
692pub(crate) fn append_p2p_suffix(peer_id: PeerId, mut address: Multiaddr) -> Multiaddr {
694 let last_protocol = address.pop();
695
696 if let Some(protocol) = last_protocol
697 && !matches!(protocol, Protocol::P2p(..))
698 {
699 address.push(protocol)
700 }
701 address.push(Protocol::P2p(peer_id));
702
703 address
704}
705
706pub(super) fn remove_known_peer_addresses_internal(
708 known_peers: &mut LruMap<PeerId, LruMap<Multiaddr, FailureTime>>,
709 peer_id: PeerId,
710 addresses: Vec<Multiaddr>,
711 expired_address_duration_persistent_storage: Duration,
712 expired_address_duration_kademlia: Duration,
713) -> Vec<PeerAddressRemovedEvent> {
714 let mut address_removed_events = Vec::new();
715 let now = SystemTime::now();
716
717 addresses
718 .into_iter()
719 .map(remove_p2p_suffix)
720 .for_each(|addr| {
721 if let Some(addresses) = known_peers.peek_mut(&peer_id) {
723 let last_address = addresses.peek(&addr).is_some() && addresses.len() == 1;
724 if let Some(first_failed_time) = addresses.peek_mut(&addr) {
727 if let Some(time) = first_failed_time {
729 if *time + expired_address_duration_kademlia < now {
731 let address_removed = PeerAddressRemovedEvent {
732 peer_id,
733 address: addr.clone(),
734 };
735
736 address_removed_events.push(address_removed);
737
738 trace!(%peer_id, "Address was marked for removal from Kademlia: {:?}", addr);
739 }
740
741 if *time + expired_address_duration_persistent_storage < now {
743 addresses.remove(&addr);
745
746 if last_address {
748 known_peers.remove(&peer_id);
749
750 trace!(%peer_id, "Peer removed from the cache");
751 }
752
753 trace!(%peer_id, "Address removed from the persistent cache: {:?}", addr);
754 } else {
755 trace!(
756 %peer_id, "Saving failed connection attempt to a peer: {:?}",
757 addr
758 );
759 }
760 } else {
761 first_failed_time.replace(now);
763
764 trace!(%peer_id, "Address marked for removal from the cache: {:?}", addr);
765 }
766 }
767 }
768 });
769
770 address_removed_events
771}