ab_networking/behavior/
persistent_parameters.rs

1use crate::utils::{AsyncJoinOnDrop, Handler, HandlerFn};
2use ab_core_primitives::hashes::Blake3Hash;
3use async_trait::async_trait;
4use event_listener_primitives::HandlerId;
5use fs2::FileExt;
6use futures::FutureExt;
7use futures::future::{Fuse, pending};
8use libp2p::multiaddr::Protocol;
9use libp2p::{Multiaddr, PeerId};
10use memmap2::{MmapMut, MmapOptions};
11use parity_scale_codec::{Compact, CompactLen, Decode, Encode};
12use parking_lot::Mutex;
13use schnellru::{ByLength, LruMap};
14use std::collections::HashSet;
15use std::fs::OpenOptions;
16use std::io::{Read, Seek, SeekFrom};
17use std::path::Path;
18use std::pin::Pin;
19use std::str::FromStr;
20use std::sync::Arc;
21use std::time::{Duration, SystemTime};
22use std::{io, mem};
23use thiserror::Error;
24use tokio::time::{Sleep, sleep};
25use tracing::{debug, error, trace, warn};
26
27/// Defines optional time for address dial failure
28type FailureTime = Option<SystemTime>;
29
30/// Size of the LRU cache for peers.
31const KNOWN_PEERS_CACHE_SIZE: u32 = 100;
32/// Size of the LRU cache for addresses of a single peer ID.
33const ADDRESSES_CACHE_SIZE: u32 = 30;
34/// Pause duration between network parameters save.
35const DATA_FLUSH_DURATION_SECS: u64 = 5;
36/// Defines an expiration period for the peer marked for the removal.
37const REMOVE_KNOWN_PEERS_GRACE_PERIOD: Duration = Duration::from_secs(24 * 3600);
38/// Defines an expiration period for the peer marked for the removal for Kademlia DHT.
39const REMOVE_KNOWN_PEERS_GRACE_PERIOD_FOR_KADEMLIA: Duration = Duration::from_secs(3600);
40/// Defines an expiration period for the peer marked for the removal for Kademlia DHT.
41const STALE_KNOWN_PEERS_TIMEOUT: Duration = Duration::from_secs(24 * 3600);
42
43/// Defines the event triggered when the peer address is removed from the permanent storage.
44#[derive(Debug, Clone)]
45pub struct PeerAddressRemovedEvent {
46    /// Peer ID
47    pub peer_id: PeerId,
48    /// Peer address
49    pub address: Multiaddr,
50}
51
52#[derive(Debug, Encode, Decode)]
53struct EncodableKnownPeerAddress {
54    multiaddr: Vec<u8>,
55    /// Failure time as Unix timestamp in seconds
56    failure_time: Option<u64>,
57}
58
59#[derive(Debug, Encode, Decode)]
60struct EncodableKnownPeers {
61    cache_size: u32,
62    timestamp: u64,
63    // Each entry is a tuple of peer ID + list of multiaddresses with corresponding failure time
64    known_peers: Vec<(Vec<u8>, Vec<EncodableKnownPeerAddress>)>,
65}
66
67impl EncodableKnownPeers {
68    fn into_cache(mut self) -> LruMap<PeerId, LruMap<Multiaddr, FailureTime>> {
69        let mut peers_cache = LruMap::new(ByLength::new(self.cache_size));
70
71        // Sort peers with the oldest expiration date first
72        self.known_peers
73            .sort_by_cached_key(|(_peer_id, addresses)| {
74                addresses.iter().fold(0u64, |acc, address| {
75                    acc.max(address.failure_time.unwrap_or(u64::MAX))
76                })
77            });
78
79        // Iterate over known peers with most recent failure time (or no failire time) first
80        'peers: for (peer_id, addresses) in self.known_peers.into_iter().rev() {
81            let mut peer_cache =
82                LruMap::<Multiaddr, FailureTime>::new(ByLength::new(ADDRESSES_CACHE_SIZE));
83
84            let peer_id = match PeerId::from_bytes(&peer_id) {
85                Ok(peer_id) => peer_id,
86                Err(error) => {
87                    debug!(%error, "Failed to decode known peer ID, skipping peer entry");
88                    continue;
89                }
90            };
91            for address in addresses {
92                let multiaddr = match Multiaddr::try_from(address.multiaddr) {
93                    Ok(multiaddr) => multiaddr,
94                    Err(error) => {
95                        debug!(
96                            %error,
97                            "Failed to decode known peer multiaddress, skipping peer entry"
98                        );
99                        continue 'peers;
100                    }
101                };
102
103                peer_cache.insert(
104                    multiaddr,
105                    address.failure_time.map(|failure_time| {
106                        SystemTime::UNIX_EPOCH + Duration::from_secs(failure_time)
107                    }),
108                );
109            }
110
111            peers_cache.insert(peer_id, peer_cache);
112        }
113
114        peers_cache
115    }
116
117    fn from_cache(cache: &LruMap<PeerId, LruMap<Multiaddr, FailureTime>>, cache_size: u32) -> Self {
118        let single_peer_encoded_address_size =
119            KnownPeersManager::single_peer_encoded_address_size();
120        Self {
121            cache_size,
122            timestamp: SystemTime::now()
123                .duration_since(SystemTime::UNIX_EPOCH)
124                .expect("Never before Unix epoch; qed")
125                .as_secs(),
126            known_peers: cache
127                .iter()
128                .map(|(peer_id, addresses)| {
129                    (
130                        peer_id.to_bytes(),
131                        addresses
132                            .iter()
133                            .filter_map(|(multiaddr, failure_time)| {
134                                let multiaddr_bytes = multiaddr.to_vec();
135
136                                if multiaddr_bytes.encoded_size() > single_peer_encoded_address_size
137                                {
138                                    // Skip unexpectedly large multiaddresses
139                                    debug!(
140                                        encoded_multiaddress_size = %multiaddr_bytes.encoded_size(),
141                                        limit = %single_peer_encoded_address_size,
142                                        ?multiaddr,
143                                        "Unexpectedly large multiaddress"
144                                    );
145                                    return None;
146                                }
147
148                                Some(EncodableKnownPeerAddress {
149                                    multiaddr: multiaddr_bytes,
150                                    failure_time: failure_time.map(|failure_time| {
151                                        failure_time
152                                            .duration_since(SystemTime::UNIX_EPOCH)
153                                            .expect("Never before Unix epoch; qed")
154                                            .as_secs()
155                                    }),
156                                })
157                            })
158                            .collect(),
159                    )
160                })
161                .collect(),
162        }
163    }
164}
165
166/// A/b slots with known peers where we write serialized known peers in one after another
167#[derive(Debug)]
168struct KnownPeersSlots {
169    a: MmapMut,
170    b: MmapMut,
171}
172
173impl KnownPeersSlots {
174    fn write_to_inactive_slot(&mut self, encodable_known_peers: &EncodableKnownPeers) {
175        let known_peers_bytes = encodable_known_peers.encode();
176        let (encoded_bytes, remaining_bytes) = self.a.split_at_mut(known_peers_bytes.len());
177        encoded_bytes.copy_from_slice(&known_peers_bytes);
178        // Write checksum
179        remaining_bytes[..Blake3Hash::SIZE]
180            .copy_from_slice(blake3::hash(&known_peers_bytes).as_bytes());
181        if let Err(error) = self.a.flush() {
182            warn!(%error, "Failed to flush known peers to disk");
183        }
184
185        // Swap slots such that we write into the opposite each time
186        mem::swap(&mut self.a, &mut self.b);
187    }
188}
189
190/// Defines operations with the networking parameters.
191#[async_trait]
192pub trait KnownPeersRegistry: Send + Sync {
193    /// Registers a peer ID and associated addresses
194    async fn add_known_peer(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>);
195
196    /// Unregisters associated addresses for peer ID.
197    async fn remove_known_peer_addresses(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>);
198
199    /// Unregisters associated addresses for peer ID.
200    fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId);
201
202    /// Returns all known peers and their addresses without P2P suffix at the end
203    async fn all_known_peers(&mut self) -> Vec<(PeerId, Vec<Multiaddr>)>;
204
205    /// Drive async work in the persistence provider
206    async fn run(&mut self);
207
208    /// Triggers when we removed the peer address from the permanent storage. Returns optional
209    /// event HandlerId. Option enables stub implementation. One of the usages is to notify
210    /// Kademlia about the expired(unreachable) address when it check for how long address was
211    /// unreachable.
212    fn on_unreachable_address(
213        &mut self,
214        handler: HandlerFn<PeerAddressRemovedEvent>,
215    ) -> Option<HandlerId>;
216}
217
218/// Networking manager implementation with NOOP implementation.
219#[derive(Clone, Default)]
220pub(crate) struct StubNetworkingParametersManager;
221
222impl StubNetworkingParametersManager {
223    /// Returns an instance of `StubNetworkingParametersManager` as the `Box` reference.
224    pub fn boxed(self) -> Box<dyn KnownPeersRegistry> {
225        Box::new(self)
226    }
227}
228
229#[async_trait]
230impl KnownPeersRegistry for StubNetworkingParametersManager {
231    async fn add_known_peer(&mut self, _: PeerId, _: Vec<Multiaddr>) {}
232
233    async fn remove_known_peer_addresses(&mut self, _peer_id: PeerId, _addresses: Vec<Multiaddr>) {}
234
235    fn remove_all_known_peer_addresses(&mut self, _peer_id: PeerId) {}
236
237    async fn all_known_peers(&mut self) -> Vec<(PeerId, Vec<Multiaddr>)> {
238        Vec::new()
239    }
240
241    async fn run(&mut self) {
242        // Never resolves
243        futures::future::pending().await
244    }
245
246    fn on_unreachable_address(
247        &mut self,
248        _handler: HandlerFn<PeerAddressRemovedEvent>,
249    ) -> Option<HandlerId> {
250        None
251    }
252}
253
254/// Configuration for [`KnownPeersManager`].
255#[derive(Debug, Clone)]
256pub struct KnownPeersManagerConfig {
257    /// Defines whether we return known peers in [`KnownPeersRegistry::all_known_peers()`]
258    pub enable_known_peers_source: bool,
259    /// Defines cache size.
260    pub cache_size: u32,
261    /// Peer ID list to filter on address adding.
262    pub ignore_peer_list: HashSet<PeerId>,
263    /// Defines whether we enable cache persistence.
264    pub path: Option<Box<Path>>,
265    /// Defines interval before the next peer address removes entry from the cache.
266    pub failed_address_cache_removal_interval: Duration,
267    /// Defines interval before the next peer address removal triggers [`PeerAddressRemovedEvent`].
268    pub failed_address_kademlia_removal_interval: Duration,
269    /// Amount of time after which stored known peers contents is assumed to be stale.
270    pub stale_known_peers_timeout: Duration,
271}
272
273impl Default for KnownPeersManagerConfig {
274    fn default() -> Self {
275        Self {
276            enable_known_peers_source: true,
277            cache_size: KNOWN_PEERS_CACHE_SIZE,
278            ignore_peer_list: Default::default(),
279            path: None,
280            failed_address_cache_removal_interval: REMOVE_KNOWN_PEERS_GRACE_PERIOD,
281            failed_address_kademlia_removal_interval: REMOVE_KNOWN_PEERS_GRACE_PERIOD_FOR_KADEMLIA,
282            stale_known_peers_timeout: STALE_KNOWN_PEERS_TIMEOUT,
283        }
284    }
285}
286
287/// Networking parameters persistence errors.
288#[derive(Debug, Error)]
289pub enum KnownPeersManagerPersistenceError {
290    /// I/O error.
291    #[error("I/O error: {0}")]
292    Io(#[from] io::Error),
293    /// Can't preallocate known peers file, probably not enough space on disk
294    #[error("Can't preallocate known peers file, probably not enough space on disk: {0}")]
295    CantPreallocateKnownPeersFile(io::Error),
296}
297
298/// Handles networking parameters. It manages network parameters set and its persistence.
299#[derive(Debug)]
300pub struct KnownPeersManager {
301    /// Defines whether the cache requires saving to DB
302    cache_need_saving: bool,
303    /// LRU cache for the known peers and their addresses
304    known_peers: LruMap<PeerId, LruMap<Multiaddr, FailureTime>>,
305    /// Period between networking parameters saves.
306    networking_parameters_save_delay: Pin<Box<Fuse<Sleep>>>,
307    /// Slots backed by file that store known peers
308    known_peers_slots: Option<Arc<Mutex<KnownPeersSlots>>>,
309    /// Event handler triggered when we decide to remove address from the storage.
310    address_removed: Handler<PeerAddressRemovedEvent>,
311    /// Defines configuration.
312    config: KnownPeersManagerConfig,
313}
314
315impl Drop for KnownPeersManager {
316    fn drop(&mut self) {
317        if self.cache_need_saving
318            && let Some(known_peers_slots) = &self.known_peers_slots
319        {
320            known_peers_slots
321                .lock()
322                .write_to_inactive_slot(&EncodableKnownPeers::from_cache(
323                    &self.known_peers,
324                    self.config.cache_size,
325                ));
326        }
327    }
328}
329
330impl KnownPeersManager {
331    fn init_file(
332        path: &Path,
333        cache_size: u32,
334    ) -> Result<
335        (Option<EncodableKnownPeers>, Arc<Mutex<KnownPeersSlots>>),
336        KnownPeersManagerPersistenceError,
337    > {
338        let mut file = OpenOptions::new()
339            .read(true)
340            .write(true)
341            .create(true)
342            .truncate(false)
343            .open(path)?;
344
345        let known_addresses_size = Self::known_addresses_size(cache_size);
346        let file_size = Self::file_size(cache_size);
347        // Try reading existing encoded known peers from file
348        let mut maybe_newest_known_addresses = None::<EncodableKnownPeers>;
349
350        {
351            let mut file_contents = Vec::with_capacity(file_size);
352            file.read_to_end(&mut file_contents)?;
353            if !file_contents.is_empty() {
354                for known_addresses_bytes in file_contents.chunks_exact(file_contents.len() / 2) {
355                    let known_addresses =
356                        match EncodableKnownPeers::decode(&mut &*known_addresses_bytes) {
357                            Ok(known_addresses) => known_addresses,
358                            Err(error) => {
359                                debug!(%error, "Failed to decode encodable known peers");
360                                continue;
361                            }
362                        };
363
364                    let (encoded_bytes, remaining_bytes) =
365                        known_addresses_bytes.split_at(known_addresses.encoded_size());
366                    if remaining_bytes.len() < Blake3Hash::SIZE {
367                        debug!(
368                            remaining_bytes = %remaining_bytes.len(),
369                            "Not enough bytes to decode checksum, file was likely corrupted"
370                        );
371                        continue;
372                    }
373
374                    // Verify checksum
375                    let actual_checksum = *blake3::hash(encoded_bytes).as_bytes();
376                    let expected_checksum = &remaining_bytes[..Blake3Hash::SIZE];
377                    if actual_checksum != expected_checksum {
378                        debug!(
379                            encoded_bytes_len = %encoded_bytes.len(),
380                            actual_checksum = %hex::encode(actual_checksum),
381                            expected_checksum = %hex::encode(expected_checksum),
382                            "Hash doesn't match, possible disk corruption or file was just \
383                            created, ignoring"
384                        );
385                        continue;
386                    }
387
388                    match &mut maybe_newest_known_addresses {
389                        Some(newest_known_addresses) => {
390                            if newest_known_addresses.timestamp < known_addresses.timestamp {
391                                *newest_known_addresses = known_addresses;
392                            }
393                        }
394                        None => {
395                            maybe_newest_known_addresses.replace(known_addresses);
396                        }
397                    }
398                }
399            }
400        }
401
402        // *2 because we have a/b parts of the file
403        let file_resized = if file.seek(SeekFrom::End(0))? != file_size as u64 {
404            // Allocating the whole file (`set_len` below can create a sparse file, which will cause
405            // writes to fail later)
406            file.allocate(file_size as u64)
407                .map_err(KnownPeersManagerPersistenceError::CantPreallocateKnownPeersFile)?;
408            // Truncating file (if necessary)
409            file.set_len(file_size as u64)?;
410            true
411        } else {
412            false
413        };
414
415        // SAFETY: Mapping disjoint sections of the file only for non-conflicting writes
416        let mut a_mmap = unsafe {
417            MmapOptions::new()
418                .len(known_addresses_size)
419                .map_mut(&file)?
420        };
421        // SAFETY: Mapping disjoint sections of the file only for non-conflicting writes
422        let mut b_mmap = unsafe {
423            MmapOptions::new()
424                .offset(known_addresses_size as u64)
425                .len(known_addresses_size)
426                .map_mut(&file)?
427        };
428
429        if file_resized {
430            // File might have been resized, write current known addresses into it
431            if let Some(newest_known_addresses) = &maybe_newest_known_addresses {
432                let bytes = newest_known_addresses.encode();
433                a_mmap[..bytes.len()].copy_from_slice(&bytes);
434                a_mmap.flush()?;
435                b_mmap[..bytes.len()].copy_from_slice(&bytes);
436                b_mmap.flush()?;
437            }
438        }
439
440        let known_peers_slots = Arc::new(Mutex::new(KnownPeersSlots {
441            a: a_mmap,
442            b: b_mmap,
443        }));
444
445        Ok((maybe_newest_known_addresses, known_peers_slots))
446    }
447
448    /// Object constructor.
449    pub fn new(config: KnownPeersManagerConfig) -> Result<Self, KnownPeersManagerPersistenceError> {
450        let (maybe_newest_known_addresses, known_peers_slots) = if let Some(path) = &config.path {
451            Self::init_file(path, config.cache_size)
452                .map(|(known_addresses, slots)| (known_addresses, Some(slots)))?
453        } else {
454            (None, None)
455        };
456
457        let known_peers = maybe_newest_known_addresses
458            .filter(|newest_known_addresses| {
459                let time_since_unix_epoch = SystemTime::now()
460                    .duration_since(SystemTime::UNIX_EPOCH)
461                    .expect("Never before Unix epoch; qed");
462                let known_peers_age = time_since_unix_epoch
463                    .saturating_sub(Duration::from_secs(newest_known_addresses.timestamp));
464
465                known_peers_age <= config.stale_known_peers_timeout
466            })
467            .map(EncodableKnownPeers::into_cache)
468            .unwrap_or_else(|| LruMap::new(ByLength::new(config.cache_size)));
469
470        Ok(Self {
471            cache_need_saving: false,
472            known_peers,
473            networking_parameters_save_delay: Self::default_delay(),
474            known_peers_slots,
475            address_removed: Default::default(),
476            config,
477        })
478    }
479
480    /// Size of the backing file on disk
481    pub fn file_size(cache_size: u32) -> usize {
482        // *2 because we have a/b parts of the file
483        Self::known_addresses_size(cache_size) * 2
484    }
485
486    /// Creates a reference to the `NetworkingParametersRegistry` trait implementation.
487    pub fn boxed(self) -> Box<dyn KnownPeersRegistry> {
488        Box::new(self)
489    }
490
491    // Create default delay for networking parameters.
492    fn default_delay() -> Pin<Box<Fuse<Sleep>>> {
493        Box::pin(sleep(Duration::from_secs(DATA_FLUSH_DURATION_SECS)).fuse())
494    }
495
496    fn single_peer_encoded_address_size() -> usize {
497        let multiaddr = Multiaddr::from_str(
498            "/ip4/127.0.0.1/tcp/1234/p2p/12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp",
499        )
500        .expect("Valid multiaddr; qed");
501        // Use multiaddr size that is 3x larger than typical, should be enough for most practical
502        // cases
503        multiaddr.to_vec().encoded_size() * 3
504    }
505
506    /// Size of single peer known addresses, this is an estimate and in some pathological cases peer
507    /// will have to be rejected if encoding exceeds this length.
508    fn single_peer_encoded_size() -> usize {
509        // Peer ID encoding + compact encoding of the length of list of addresses + (length of a
510        // single peer address entry + optional failure time) * number of entries
511        PeerId::random().to_bytes().encoded_size()
512            + Compact::compact_len(&(ADDRESSES_CACHE_SIZE))
513            + (Self::single_peer_encoded_address_size() + Some(0u64).encoded_size())
514                * ADDRESSES_CACHE_SIZE as usize
515    }
516
517    /// Size of known addresses and accompanying metadata.
518    ///
519    /// NOTE: This is max size that needs to be allocated on disk for successful write of a single
520    /// `known_addresses` copy, the actual written data can occupy only a part of this size
521    fn known_addresses_size(cache_size: u32) -> usize {
522        // Timestamp (when was written) + compact encoding of the length of peer records + peer
523        // records + checksum
524        mem::size_of::<u64>()
525            + Compact::compact_len(&(cache_size))
526            + Self::single_peer_encoded_size() * cache_size as usize
527            + Blake3Hash::SIZE
528    }
529
530    fn persistent_enabled(&self) -> bool {
531        self.config.path.is_some()
532    }
533
534    #[cfg(test)]
535    pub(crate) fn contains_address(&self, peer_id: &PeerId, address: &Multiaddr) -> bool {
536        self.known_peers
537            .peek(peer_id)
538            .map(|addresses| addresses.peek(address).is_some())
539            .unwrap_or_default()
540    }
541}
542
543#[async_trait]
544impl KnownPeersRegistry for KnownPeersManager {
545    async fn add_known_peer(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>) {
546        if self.config.ignore_peer_list.contains(&peer_id) {
547            debug!(
548                %peer_id,
549                addr_num=addresses.len(),
550                "Adding new peer addresses canceled (ignore list): {:?}",
551                addresses
552            );
553
554            return;
555        }
556
557        debug!(
558            %peer_id,
559            addr_num=addresses.len(),
560            "Add new peer addresses to the networking parameters registry: {:?}",
561            addresses
562        );
563
564        addresses
565            .iter()
566            .filter(|addr| {
567                // filter Memory addresses
568                !addr
569                    .into_iter()
570                    .any(|protocol| matches!(protocol, Protocol::Memory(..)))
571            })
572            .cloned()
573            .map(remove_p2p_suffix)
574            .for_each(|addr| {
575                // Add new address cache if it doesn't exist previously.
576                self.known_peers
577                    .get_or_insert(peer_id, || LruMap::new(ByLength::new(ADDRESSES_CACHE_SIZE)));
578
579                if let Some(addresses) = self.known_peers.get(&peer_id) {
580                    let previous_entry = addresses.peek(&addr).cloned().flatten();
581                    addresses.insert(addr, None);
582                    if let Some(previous_entry) = previous_entry {
583                        trace!(%peer_id, "Address cache entry replaced: {:?}", previous_entry);
584                    }
585                }
586            });
587
588        self.cache_need_saving = true;
589    }
590
591    async fn remove_known_peer_addresses(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>) {
592        trace!(%peer_id, "Remove peer addresses from the networking parameters registry: {:?}", addresses);
593
594        let removed_addresses = remove_known_peer_addresses_internal(
595            &mut self.known_peers,
596            peer_id,
597            addresses,
598            self.config.failed_address_cache_removal_interval,
599            self.config.failed_address_kademlia_removal_interval,
600        );
601
602        for event in removed_addresses {
603            self.address_removed.call_simple(&event);
604        }
605
606        self.cache_need_saving = true;
607    }
608
609    fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId) {
610        trace!(%peer_id, "Remove all peer addresses from the networking parameters registry");
611
612        self.known_peers.remove(&peer_id);
613
614        self.cache_need_saving = true;
615    }
616
617    async fn all_known_peers(&mut self) -> Vec<(PeerId, Vec<Multiaddr>)> {
618        if !self.config.enable_known_peers_source {
619            return Vec::new();
620        }
621
622        self.known_peers
623            .iter()
624            .map(|(&peer_id, addresses)| {
625                (
626                    peer_id,
627                    addresses
628                        .iter()
629                        .map(|(addr, _failure_time)| addr.clone())
630                        .collect(),
631                )
632            })
633            .collect()
634    }
635
636    async fn run(&mut self) {
637        if !self.persistent_enabled() {
638            pending().await
639        }
640
641        loop {
642            (&mut self.networking_parameters_save_delay).await;
643
644            if let Some(known_peers_slots) = &self.known_peers_slots
645                && self.cache_need_saving
646            {
647                let known_peers =
648                    EncodableKnownPeers::from_cache(&self.known_peers, self.config.cache_size);
649                let known_peers_slots = Arc::clone(known_peers_slots);
650                let write_known_peers_fut =
651                    AsyncJoinOnDrop::new(tokio::task::spawn_blocking(move || {
652                        known_peers_slots
653                            .lock()
654                            .write_to_inactive_slot(&known_peers);
655                    }));
656
657                if let Err(error) = write_known_peers_fut.await {
658                    error!(%error, "Failed to write known peers");
659                }
660
661                self.cache_need_saving = false;
662            }
663            self.networking_parameters_save_delay = KnownPeersManager::default_delay();
664        }
665    }
666
667    fn on_unreachable_address(
668        &mut self,
669        handler: HandlerFn<PeerAddressRemovedEvent>,
670    ) -> Option<HandlerId> {
671        let handler_id = self.address_removed.add(handler);
672
673        Some(handler_id)
674    }
675}
676
677/// Removes a P2p protocol suffix from the multiaddress if any.
678pub(crate) fn remove_p2p_suffix(mut address: Multiaddr) -> Multiaddr {
679    let last_protocol = address.pop();
680
681    if let Some(Protocol::P2p(_)) = &last_protocol {
682        return address;
683    }
684
685    if let Some(protocol) = last_protocol {
686        address.push(protocol)
687    }
688
689    address
690}
691
692/// Appends a P2p protocol suffix to the multiaddress if required.
693pub(crate) fn append_p2p_suffix(peer_id: PeerId, mut address: Multiaddr) -> Multiaddr {
694    let last_protocol = address.pop();
695
696    if let Some(protocol) = last_protocol
697        && !matches!(protocol, Protocol::P2p(..))
698    {
699        address.push(protocol)
700    }
701    address.push(Protocol::P2p(peer_id));
702
703    address
704}
705
706// Testable implementation of the `remove_known_peer_addresses`
707pub(super) fn remove_known_peer_addresses_internal(
708    known_peers: &mut LruMap<PeerId, LruMap<Multiaddr, FailureTime>>,
709    peer_id: PeerId,
710    addresses: Vec<Multiaddr>,
711    expired_address_duration_persistent_storage: Duration,
712    expired_address_duration_kademlia: Duration,
713) -> Vec<PeerAddressRemovedEvent> {
714    let mut address_removed_events = Vec::new();
715    let now = SystemTime::now();
716
717    addresses
718        .into_iter()
719        .map(remove_p2p_suffix)
720        .for_each(|addr| {
721            // if peer_id is present in the cache
722            if let Some(addresses) = known_peers.peek_mut(&peer_id) {
723                let last_address = addresses.peek(&addr).is_some() && addresses.len() == 1;
724                // Get mutable reference to first_failed_time for the address without updating
725                // the item's position in the cache
726                if let Some(first_failed_time) = addresses.peek_mut(&addr) {
727                    // if we failed previously with this address
728                    if let Some(time) = first_failed_time {
729                        // if we failed first time more than an hour ago (for Kademlia)
730                        if *time + expired_address_duration_kademlia < now {
731                            let address_removed = PeerAddressRemovedEvent {
732                                peer_id,
733                                address: addr.clone(),
734                            };
735
736                            address_removed_events.push(address_removed);
737
738                            trace!(%peer_id, "Address was marked for removal from Kademlia: {:?}", addr);
739                        }
740
741                        // if we failed first time more than a day ago (for persistent cache)
742                        if *time + expired_address_duration_persistent_storage < now {
743                            // Remove a failed address
744                            addresses.remove(&addr);
745
746                            // If the last address for peer
747                            if last_address {
748                                known_peers.remove(&peer_id);
749
750                                trace!(%peer_id, "Peer removed from the cache");
751                            }
752
753                            trace!(%peer_id, "Address removed from the persistent cache: {:?}", addr);
754                        } else {
755                            trace!(
756                                %peer_id, "Saving failed connection attempt to a peer: {:?}",
757                                addr
758                            );
759                        }
760                    } else {
761                        // Set failure time
762                        first_failed_time.replace(now);
763
764                        trace!(%peer_id, "Address marked for removal from the cache: {:?}", addr);
765                    }
766                }
767            }
768        });
769
770    address_removed_events
771}