Skip to main content

ab_networking/behavior/
persistent_parameters.rs

1use crate::utils::{AsyncJoinOnDrop, Handler, HandlerFn};
2use ab_core_primitives::hashes::Blake3Hash;
3use async_trait::async_trait;
4use event_listener_primitives::HandlerId;
5use fs4::FileExt;
6use futures::FutureExt;
7use futures::future::{Fuse, pending};
8use libp2p::multiaddr::Protocol;
9use libp2p::{Multiaddr, PeerId};
10use memmap2::{MmapMut, MmapOptions};
11use parity_scale_codec::{Compact, CompactLen, Decode, Encode};
12use parking_lot::Mutex;
13use schnellru::{ByLength, LruMap};
14use std::collections::HashSet;
15use std::fs::OpenOptions;
16use std::io::{Read, Seek, SeekFrom};
17use std::path::Path;
18use std::pin::Pin;
19use std::str::FromStr;
20use std::sync::Arc;
21use std::time::{Duration, SystemTime};
22use std::{io, mem};
23use thiserror::Error;
24use tokio::time::{Sleep, sleep};
25use tracing::{debug, error, trace, warn};
26
27/// Defines optional time for address dial failure
28type FailureTime = Option<SystemTime>;
29
30/// Size of the LRU cache for peers.
31const KNOWN_PEERS_CACHE_SIZE: u32 = 100;
32/// Size of the LRU cache for addresses of a single peer ID.
33const ADDRESSES_CACHE_SIZE: u32 = 30;
34/// Pause duration between network parameters save.
35const DATA_FLUSH_DURATION_SECS: u64 = 5;
36/// Defines an expiration period for the peer marked for the removal.
37const REMOVE_KNOWN_PEERS_GRACE_PERIOD: Duration = Duration::from_days(1);
38/// Defines an expiration period for the peer marked for the removal for Kademlia DHT.
39const REMOVE_KNOWN_PEERS_GRACE_PERIOD_FOR_KADEMLIA: Duration = Duration::from_hours(1);
40/// Defines an expiration period for the peer marked for the removal for Kademlia DHT.
41const STALE_KNOWN_PEERS_TIMEOUT: Duration = Duration::from_days(1);
42
43/// Defines the event triggered when the peer address is removed from the permanent storage.
44#[derive(Debug, Clone)]
45pub struct PeerAddressRemovedEvent {
46    /// Peer ID
47    pub peer_id: PeerId,
48    /// Peer address
49    pub address: Multiaddr,
50}
51
52#[derive(Debug, Encode, Decode)]
53struct EncodableKnownPeerAddress {
54    multiaddr: Vec<u8>,
55    /// Failure time as Unix timestamp in seconds
56    failure_time: Option<u64>,
57}
58
59#[derive(Debug, Encode, Decode)]
60struct EncodableKnownPeers {
61    cache_size: u32,
62    timestamp: u64,
63    // Each entry is a tuple of peer ID + list of multiaddresses with corresponding failure time
64    known_peers: Vec<(Vec<u8>, Vec<EncodableKnownPeerAddress>)>,
65}
66
67impl EncodableKnownPeers {
68    fn into_cache(mut self) -> LruMap<PeerId, LruMap<Multiaddr, FailureTime>> {
69        let mut peers_cache = LruMap::new(ByLength::new(self.cache_size));
70
71        // Sort peers with the oldest expiration date first
72        self.known_peers
73            .sort_by_cached_key(|(_peer_id, addresses)| {
74                addresses.iter().fold(0u64, |acc, address| {
75                    acc.max(address.failure_time.unwrap_or(u64::MAX))
76                })
77            });
78
79        // Iterate over known peers with most recent failure time (or no failire time) first
80        'peers: for (peer_id, addresses) in self.known_peers.into_iter().rev() {
81            let mut peer_cache =
82                LruMap::<Multiaddr, FailureTime>::new(ByLength::new(ADDRESSES_CACHE_SIZE));
83
84            let peer_id = match PeerId::from_bytes(&peer_id) {
85                Ok(peer_id) => peer_id,
86                Err(error) => {
87                    debug!(%error, "Failed to decode known peer ID, skipping peer entry");
88                    continue;
89                }
90            };
91            for address in addresses {
92                let multiaddr = match Multiaddr::try_from(address.multiaddr) {
93                    Ok(multiaddr) => multiaddr,
94                    Err(error) => {
95                        debug!(
96                            %error,
97                            "Failed to decode known peer multiaddress, skipping peer entry"
98                        );
99                        continue 'peers;
100                    }
101                };
102
103                peer_cache.insert(
104                    multiaddr,
105                    address.failure_time.map(|failure_time| {
106                        SystemTime::UNIX_EPOCH + Duration::from_secs(failure_time)
107                    }),
108                );
109            }
110
111            peers_cache.insert(peer_id, peer_cache);
112        }
113
114        peers_cache
115    }
116
117    fn from_cache(cache: &LruMap<PeerId, LruMap<Multiaddr, FailureTime>>, cache_size: u32) -> Self {
118        let single_peer_encoded_address_size =
119            KnownPeersManager::single_peer_encoded_address_size();
120        Self {
121            cache_size,
122            timestamp: SystemTime::now()
123                .duration_since(SystemTime::UNIX_EPOCH)
124                .expect("Never before Unix epoch; qed")
125                .as_secs(),
126            known_peers: cache
127                .iter()
128                .map(|(peer_id, addresses)| {
129                    (
130                        peer_id.to_bytes(),
131                        addresses
132                            .iter()
133                            .filter_map(|(multiaddr, failure_time)| {
134                                let multiaddr_bytes = multiaddr.to_vec();
135
136                                if multiaddr_bytes.encoded_size() > single_peer_encoded_address_size
137                                {
138                                    // Skip unexpectedly large multiaddresses
139                                    debug!(
140                                        encoded_multiaddress_size = %multiaddr_bytes.encoded_size(),
141                                        limit = %single_peer_encoded_address_size,
142                                        ?multiaddr,
143                                        "Unexpectedly large multiaddress"
144                                    );
145                                    return None;
146                                }
147
148                                Some(EncodableKnownPeerAddress {
149                                    multiaddr: multiaddr_bytes,
150                                    failure_time: failure_time.map(|failure_time| {
151                                        failure_time
152                                            .duration_since(SystemTime::UNIX_EPOCH)
153                                            .expect("Never before Unix epoch; qed")
154                                            .as_secs()
155                                    }),
156                                })
157                            })
158                            .collect(),
159                    )
160                })
161                .collect(),
162        }
163    }
164}
165
166/// A/b slots with known peers where we write serialized known peers in one after another
167#[derive(Debug)]
168struct KnownPeersSlots {
169    a: MmapMut,
170    b: MmapMut,
171}
172
173impl KnownPeersSlots {
174    fn write_to_inactive_slot(&mut self, encodable_known_peers: &EncodableKnownPeers) {
175        let known_peers_bytes = encodable_known_peers.encode();
176        let (encoded_bytes, remaining_bytes) = self.a.split_at_mut(known_peers_bytes.len());
177        encoded_bytes.copy_from_slice(&known_peers_bytes);
178        // Write checksum
179        remaining_bytes[..Blake3Hash::SIZE]
180            .copy_from_slice(blake3::hash(&known_peers_bytes).as_bytes());
181        if let Err(error) = self.a.flush() {
182            warn!(%error, "Failed to flush known peers to disk");
183        }
184
185        // Swap slots such that we write into the opposite each time
186        mem::swap(&mut self.a, &mut self.b);
187    }
188}
189
190/// Defines operations with the networking parameters.
191#[async_trait]
192pub trait KnownPeersRegistry: Send + Sync {
193    /// Registers a peer ID and associated addresses
194    async fn add_known_peer(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>);
195
196    /// Unregisters associated addresses for peer ID.
197    async fn remove_known_peer_addresses(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>);
198
199    /// Unregisters associated addresses for peer ID.
200    fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId);
201
202    /// Returns all known peers and their addresses without P2P suffix at the end
203    async fn all_known_peers(&mut self) -> Vec<(PeerId, Vec<Multiaddr>)>;
204
205    /// Drive async work in the persistence provider
206    async fn run(&mut self);
207
208    /// Triggers when we removed the peer address from the permanent storage. Returns optional
209    /// event HandlerId. Option enables stub implementation. One of the usages is to notify
210    /// Kademlia about the expired(unreachable) address when it check for how long address was
211    /// unreachable.
212    fn on_unreachable_address(
213        &mut self,
214        handler: HandlerFn<PeerAddressRemovedEvent>,
215    ) -> Option<HandlerId>;
216}
217
218/// Networking manager implementation with NOOP implementation.
219#[derive(Clone, Default)]
220pub(crate) struct StubNetworkingParametersManager;
221
222impl StubNetworkingParametersManager {
223    /// Returns an instance of `StubNetworkingParametersManager` as the `Box` reference.
224    pub(crate) fn boxed(self) -> Box<dyn KnownPeersRegistry> {
225        Box::new(self)
226    }
227}
228
229#[async_trait]
230impl KnownPeersRegistry for StubNetworkingParametersManager {
231    async fn add_known_peer(&mut self, _: PeerId, _: Vec<Multiaddr>) {}
232
233    async fn remove_known_peer_addresses(&mut self, _peer_id: PeerId, _addresses: Vec<Multiaddr>) {}
234
235    fn remove_all_known_peer_addresses(&mut self, _peer_id: PeerId) {}
236
237    async fn all_known_peers(&mut self) -> Vec<(PeerId, Vec<Multiaddr>)> {
238        Vec::new()
239    }
240
241    async fn run(&mut self) {
242        // Never resolves
243        futures::future::pending::<()>().await;
244    }
245
246    fn on_unreachable_address(
247        &mut self,
248        _handler: HandlerFn<PeerAddressRemovedEvent>,
249    ) -> Option<HandlerId> {
250        None
251    }
252}
253
254/// Configuration for [`KnownPeersManager`].
255#[derive(Debug, Clone)]
256pub struct KnownPeersManagerConfig {
257    /// Defines whether we return known peers in [`KnownPeersRegistry::all_known_peers()`]
258    pub enable_known_peers_source: bool,
259    /// Defines cache size.
260    pub cache_size: u32,
261    /// Peer ID list to filter on address adding.
262    pub ignore_peer_list: HashSet<PeerId>,
263    /// Defines whether we enable cache persistence.
264    pub path: Option<Box<Path>>,
265    /// Defines interval before the next peer address removes entry from the cache.
266    pub failed_address_cache_removal_interval: Duration,
267    /// Defines interval before the next peer address removal triggers [`PeerAddressRemovedEvent`].
268    pub failed_address_kademlia_removal_interval: Duration,
269    /// Amount of time after which stored known peers contents is assumed to be stale.
270    pub stale_known_peers_timeout: Duration,
271}
272
273impl Default for KnownPeersManagerConfig {
274    fn default() -> Self {
275        Self {
276            enable_known_peers_source: true,
277            cache_size: KNOWN_PEERS_CACHE_SIZE,
278            ignore_peer_list: Default::default(),
279            path: None,
280            failed_address_cache_removal_interval: REMOVE_KNOWN_PEERS_GRACE_PERIOD,
281            failed_address_kademlia_removal_interval: REMOVE_KNOWN_PEERS_GRACE_PERIOD_FOR_KADEMLIA,
282            stale_known_peers_timeout: STALE_KNOWN_PEERS_TIMEOUT,
283        }
284    }
285}
286
287/// Networking parameters persistence errors.
288#[derive(Debug, Error)]
289pub enum KnownPeersManagerPersistenceError {
290    /// I/O error.
291    #[error("I/O error: {0}")]
292    Io(#[from] io::Error),
293    /// Can't preallocate known peers file, probably not enough space on disk
294    #[error("Can't preallocate known peers file, probably not enough space on disk: {0}")]
295    CantPreallocateKnownPeersFile(io::Error),
296}
297
298/// Handles networking parameters. It manages network parameters set and its persistence.
299#[derive(Debug)]
300pub struct KnownPeersManager {
301    /// Defines whether the cache requires saving to DB
302    cache_need_saving: bool,
303    /// LRU cache for the known peers and their addresses
304    known_peers: LruMap<PeerId, LruMap<Multiaddr, FailureTime>>,
305    /// Period between networking parameters saves.
306    networking_parameters_save_delay: Pin<Box<Fuse<Sleep>>>,
307    /// Slots backed by file that store known peers
308    known_peers_slots: Option<Arc<Mutex<KnownPeersSlots>>>,
309    /// Event handler triggered when we decide to remove address from the storage.
310    address_removed: Handler<PeerAddressRemovedEvent>,
311    /// Defines configuration.
312    config: KnownPeersManagerConfig,
313}
314
315impl Drop for KnownPeersManager {
316    fn drop(&mut self) {
317        if self.cache_need_saving
318            && let Some(known_peers_slots) = &self.known_peers_slots
319        {
320            known_peers_slots
321                .lock()
322                .write_to_inactive_slot(&EncodableKnownPeers::from_cache(
323                    &self.known_peers,
324                    self.config.cache_size,
325                ));
326        }
327    }
328}
329
330impl KnownPeersManager {
331    fn init_file(
332        path: &Path,
333        cache_size: u32,
334    ) -> Result<
335        (Option<EncodableKnownPeers>, Arc<Mutex<KnownPeersSlots>>),
336        KnownPeersManagerPersistenceError,
337    > {
338        let mut file = OpenOptions::new()
339            .read(true)
340            .write(true)
341            .create(true)
342            .truncate(false)
343            .open(path)?;
344
345        let known_addresses_size = Self::known_addresses_size(cache_size);
346        let file_size = Self::file_size(cache_size);
347        // Try reading existing encoded known peers from file
348        let mut maybe_newest_known_addresses = None::<EncodableKnownPeers>;
349
350        {
351            let mut file_contents = Vec::with_capacity(file_size);
352            file.read_to_end(&mut file_contents)?;
353            if !file_contents.is_empty() {
354                for known_addresses_bytes in file_contents.chunks_exact(file_contents.len() / 2) {
355                    let known_addresses =
356                        match EncodableKnownPeers::decode(&mut &*known_addresses_bytes) {
357                            Ok(known_addresses) => known_addresses,
358                            Err(error) => {
359                                debug!(%error, "Failed to decode encodable known peers");
360                                continue;
361                            }
362                        };
363
364                    let (encoded_bytes, remaining_bytes) =
365                        known_addresses_bytes.split_at(known_addresses.encoded_size());
366                    if remaining_bytes.len() < Blake3Hash::SIZE {
367                        debug!(
368                            remaining_bytes = %remaining_bytes.len(),
369                            "Not enough bytes to decode checksum, file was likely corrupted"
370                        );
371                        continue;
372                    }
373
374                    // Verify checksum
375                    let actual_checksum = *blake3::hash(encoded_bytes).as_bytes();
376                    let expected_checksum = &remaining_bytes[..Blake3Hash::SIZE];
377                    if actual_checksum != expected_checksum {
378                        debug!(
379                            encoded_bytes_len = %encoded_bytes.len(),
380                            actual_checksum = %hex::encode(actual_checksum),
381                            expected_checksum = %hex::encode(expected_checksum),
382                            "Hash doesn't match, possible disk corruption or file was just \
383                            created, ignoring"
384                        );
385                        continue;
386                    }
387
388                    match &mut maybe_newest_known_addresses {
389                        Some(newest_known_addresses) => {
390                            if newest_known_addresses.timestamp < known_addresses.timestamp {
391                                *newest_known_addresses = known_addresses;
392                            }
393                        }
394                        None => {
395                            maybe_newest_known_addresses.replace(known_addresses);
396                        }
397                    }
398                }
399            }
400        }
401
402        // *2 because we have a/b parts of the file
403        let file_resized = if file.seek(SeekFrom::End(0))? == file_size as u64 {
404            false
405        } else {
406            // Allocating the whole file (`set_len` below can create a sparse file, which will cause
407            // writes to fail later)
408            file.allocate(file_size as u64)
409                .map_err(KnownPeersManagerPersistenceError::CantPreallocateKnownPeersFile)?;
410            // Truncating file (if necessary)
411            file.set_len(file_size as u64)?;
412            true
413        };
414
415        // SAFETY: Mapping disjoint sections of the file only for non-conflicting writes
416        let mut a_mmap = unsafe {
417            MmapOptions::new()
418                .len(known_addresses_size)
419                .map_mut(&file)?
420        };
421        // SAFETY: Mapping disjoint sections of the file only for non-conflicting writes
422        let mut b_mmap = unsafe {
423            MmapOptions::new()
424                .offset(known_addresses_size as u64)
425                .len(known_addresses_size)
426                .map_mut(&file)?
427        };
428
429        if file_resized {
430            // File might have been resized, write current known addresses into it
431            if let Some(newest_known_addresses) = &maybe_newest_known_addresses {
432                let bytes = newest_known_addresses.encode();
433                a_mmap[..bytes.len()].copy_from_slice(&bytes);
434                a_mmap.flush()?;
435                b_mmap[..bytes.len()].copy_from_slice(&bytes);
436                b_mmap.flush()?;
437            }
438        }
439
440        let known_peers_slots = Arc::new(Mutex::new(KnownPeersSlots {
441            a: a_mmap,
442            b: b_mmap,
443        }));
444
445        Ok((maybe_newest_known_addresses, known_peers_slots))
446    }
447
448    /// Object constructor.
449    pub fn new(config: KnownPeersManagerConfig) -> Result<Self, KnownPeersManagerPersistenceError> {
450        let (maybe_newest_known_addresses, known_peers_slots) = if let Some(path) = &config.path {
451            Self::init_file(path, config.cache_size)
452                .map(|(known_addresses, slots)| (known_addresses, Some(slots)))?
453        } else {
454            (None, None)
455        };
456
457        let known_peers = maybe_newest_known_addresses
458            .filter(|newest_known_addresses| {
459                let time_since_unix_epoch = SystemTime::now()
460                    .duration_since(SystemTime::UNIX_EPOCH)
461                    .expect("Never before Unix epoch; qed");
462                let known_peers_age = time_since_unix_epoch
463                    .saturating_sub(Duration::from_secs(newest_known_addresses.timestamp));
464
465                known_peers_age <= config.stale_known_peers_timeout
466            })
467            .map_or_else(
468                || LruMap::new(ByLength::new(config.cache_size)),
469                EncodableKnownPeers::into_cache,
470            );
471
472        Ok(Self {
473            cache_need_saving: false,
474            known_peers,
475            networking_parameters_save_delay: Self::default_delay(),
476            known_peers_slots,
477            address_removed: Default::default(),
478            config,
479        })
480    }
481
482    /// Size of the backing file on disk
483    pub fn file_size(cache_size: u32) -> usize {
484        // *2 because we have a/b parts of the file
485        Self::known_addresses_size(cache_size) * 2
486    }
487
488    /// Creates a reference to the `NetworkingParametersRegistry` trait implementation.
489    pub fn boxed(self) -> Box<dyn KnownPeersRegistry> {
490        Box::new(self)
491    }
492
493    // Create default delay for networking parameters.
494    fn default_delay() -> Pin<Box<Fuse<Sleep>>> {
495        Box::pin(sleep(Duration::from_secs(DATA_FLUSH_DURATION_SECS)).fuse())
496    }
497
498    fn single_peer_encoded_address_size() -> usize {
499        let multiaddr = Multiaddr::from_str(
500            "/ip4/127.0.0.1/tcp/1234/p2p/12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp",
501        )
502        .expect("Valid multiaddr; qed");
503        // Use multiaddr size that is 3x larger than typical, should be enough for most practical
504        // cases
505        multiaddr.to_vec().encoded_size() * 3
506    }
507
508    /// Size of single peer known addresses, this is an estimate and in some pathological cases peer
509    /// will have to be rejected if encoding exceeds this length.
510    fn single_peer_encoded_size() -> usize {
511        // Peer ID encoding + compact encoding of the length of list of addresses + (length of a
512        // single peer address entry + optional failure time) * number of entries
513        PeerId::random().to_bytes().encoded_size()
514            + Compact::compact_len(&(ADDRESSES_CACHE_SIZE))
515            + (Self::single_peer_encoded_address_size() + Some(0u64).encoded_size())
516                * ADDRESSES_CACHE_SIZE as usize
517    }
518
519    /// Size of known addresses and accompanying metadata.
520    ///
521    /// NOTE: This is max size that needs to be allocated on disk for successful write of a single
522    /// `known_addresses` copy, the actual written data can occupy only a part of this size
523    fn known_addresses_size(cache_size: u32) -> usize {
524        // Timestamp (when was written) + compact encoding of the length of peer records + peer
525        // records + checksum
526        size_of::<u64>()
527            + Compact::compact_len(&(cache_size))
528            + Self::single_peer_encoded_size() * cache_size as usize
529            + Blake3Hash::SIZE
530    }
531
532    fn persistent_enabled(&self) -> bool {
533        self.config.path.is_some()
534    }
535
536    #[cfg(all(test, not(miri)))]
537    pub(crate) fn contains_address(&self, peer_id: &PeerId, address: &Multiaddr) -> bool {
538        self.known_peers
539            .peek(peer_id)
540            .is_some_and(|addresses| addresses.peek(address).is_some())
541    }
542}
543
544#[async_trait]
545impl KnownPeersRegistry for KnownPeersManager {
546    async fn add_known_peer(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>) {
547        if self.config.ignore_peer_list.contains(&peer_id) {
548            debug!(
549                %peer_id,
550                addr_num=addresses.len(),
551                "Adding new peer addresses canceled (ignore list): {:?}",
552                addresses
553            );
554
555            return;
556        }
557
558        debug!(
559            %peer_id,
560            addr_num=addresses.len(),
561            "Add new peer addresses to the networking parameters registry: {:?}",
562            addresses
563        );
564
565        addresses
566            .iter()
567            .filter(|addr| {
568                // filter Memory addresses
569                !addr
570                    .into_iter()
571                    .any(|protocol| matches!(protocol, Protocol::Memory(..)))
572            })
573            .cloned()
574            .map(remove_p2p_suffix)
575            .for_each(|addr| {
576                // Add new address cache if it doesn't exist previously.
577                self.known_peers
578                    .get_or_insert(peer_id, || LruMap::new(ByLength::new(ADDRESSES_CACHE_SIZE)));
579
580                if let Some(addresses) = self.known_peers.get(&peer_id) {
581                    let previous_entry = addresses.peek(&addr).copied().flatten();
582                    addresses.insert(addr, None);
583                    if let Some(previous_entry) = previous_entry {
584                        trace!(%peer_id, "Address cache entry replaced: {:?}", previous_entry);
585                    }
586                }
587            });
588
589        self.cache_need_saving = true;
590    }
591
592    async fn remove_known_peer_addresses(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>) {
593        trace!(%peer_id, "Remove peer addresses from the networking parameters registry: {:?}", addresses);
594
595        let removed_addresses = remove_known_peer_addresses_internal(
596            &mut self.known_peers,
597            peer_id,
598            addresses,
599            self.config.failed_address_cache_removal_interval,
600            self.config.failed_address_kademlia_removal_interval,
601        );
602
603        for event in removed_addresses {
604            self.address_removed.call_simple(&event);
605        }
606
607        self.cache_need_saving = true;
608    }
609
610    fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId) {
611        trace!(%peer_id, "Remove all peer addresses from the networking parameters registry");
612
613        self.known_peers.remove(&peer_id);
614
615        self.cache_need_saving = true;
616    }
617
618    async fn all_known_peers(&mut self) -> Vec<(PeerId, Vec<Multiaddr>)> {
619        if !self.config.enable_known_peers_source {
620            return Vec::new();
621        }
622
623        self.known_peers
624            .iter()
625            .map(|(&peer_id, addresses)| {
626                (
627                    peer_id,
628                    addresses
629                        .iter()
630                        .map(|(addr, _failure_time)| addr.clone())
631                        .collect(),
632                )
633            })
634            .collect()
635    }
636
637    async fn run(&mut self) {
638        if !self.persistent_enabled() {
639            pending::<()>().await;
640        }
641
642        loop {
643            (&mut self.networking_parameters_save_delay).await;
644
645            if let Some(known_peers_slots) = &self.known_peers_slots
646                && self.cache_need_saving
647            {
648                let known_peers =
649                    EncodableKnownPeers::from_cache(&self.known_peers, self.config.cache_size);
650                let known_peers_slots = Arc::clone(known_peers_slots);
651                let write_known_peers_fut =
652                    AsyncJoinOnDrop::new(tokio::task::spawn_blocking(move || {
653                        known_peers_slots
654                            .lock()
655                            .write_to_inactive_slot(&known_peers);
656                    }));
657
658                if let Err(error) = write_known_peers_fut.await {
659                    error!(%error, "Failed to write known peers");
660                }
661
662                self.cache_need_saving = false;
663            }
664            self.networking_parameters_save_delay = KnownPeersManager::default_delay();
665        }
666    }
667
668    fn on_unreachable_address(
669        &mut self,
670        handler: HandlerFn<PeerAddressRemovedEvent>,
671    ) -> Option<HandlerId> {
672        let handler_id = self.address_removed.add(handler);
673
674        Some(handler_id)
675    }
676}
677
678/// Removes a P2p protocol suffix from the multiaddress if any.
679pub(crate) fn remove_p2p_suffix(mut address: Multiaddr) -> Multiaddr {
680    let last_protocol = address.pop();
681
682    if let Some(Protocol::P2p(_)) = &last_protocol {
683        return address;
684    }
685
686    if let Some(protocol) = last_protocol {
687        address.push(protocol);
688    }
689
690    address
691}
692
693/// Appends a P2p protocol suffix to the multiaddress if required.
694pub(crate) fn append_p2p_suffix(peer_id: PeerId, mut address: Multiaddr) -> Multiaddr {
695    let last_protocol = address.pop();
696
697    if let Some(protocol) = last_protocol
698        && !matches!(protocol, Protocol::P2p(..))
699    {
700        address.push(protocol);
701    }
702    address.push(Protocol::P2p(peer_id));
703
704    address
705}
706
707// Testable implementation of the `remove_known_peer_addresses`
708pub(super) fn remove_known_peer_addresses_internal(
709    known_peers: &mut LruMap<PeerId, LruMap<Multiaddr, FailureTime>>,
710    peer_id: PeerId,
711    addresses: Vec<Multiaddr>,
712    expired_address_duration_persistent_storage: Duration,
713    expired_address_duration_kademlia: Duration,
714) -> Vec<PeerAddressRemovedEvent> {
715    let mut address_removed_events = Vec::new();
716    let now = SystemTime::now();
717
718    addresses
719        .into_iter()
720        .map(remove_p2p_suffix)
721        .for_each(|addr| {
722            // if peer_id is present in the cache
723            if let Some(addresses) = known_peers.peek_mut(&peer_id) {
724                let last_address = addresses.peek(&addr).is_some() && addresses.len() == 1;
725                // Get mutable reference to first_failed_time for the address without updating
726                // the item's position in the cache
727                if let Some(first_failed_time) = addresses.peek_mut(&addr) {
728                    // if we failed previously with this address
729                    if let Some(time) = first_failed_time {
730                        // if we failed first time more than an hour ago (for Kademlia)
731                        if *time + expired_address_duration_kademlia < now {
732                            let address_removed = PeerAddressRemovedEvent {
733                                peer_id,
734                                address: addr.clone(),
735                            };
736
737                            address_removed_events.push(address_removed);
738
739                            trace!(%peer_id, "Address was marked for removal from Kademlia: {:?}", addr);
740                        }
741
742                        // if we failed first time more than a day ago (for persistent cache)
743                        if *time + expired_address_duration_persistent_storage < now {
744                            // Remove a failed address
745                            addresses.remove(&addr);
746
747                            // If the last address for peer
748                            if last_address {
749                                known_peers.remove(&peer_id);
750
751                                trace!(%peer_id, "Peer removed from the cache");
752                            }
753
754                            trace!(%peer_id, "Address removed from the persistent cache: {:?}", addr);
755                        } else {
756                            trace!(
757                                %peer_id, "Saving failed connection attempt to a peer: {:?}",
758                                addr
759                            );
760                        }
761                    } else {
762                        // Set failure time
763                        first_failed_time.replace(now);
764
765                        trace!(%peer_id, "Address marked for removal from the cache: {:?}", addr);
766                    }
767                }
768            }
769        });
770
771    address_removed_events
772}