Skip to main content

ab_networking/protocols/request_response/handlers/
cached_piece_by_index.rs

1//! Helper for incoming cached piece requests.
2//!
3//! Request handler can be created with [`CachedPieceByIndexRequestHandler`].
4
5#[cfg(test)]
6mod tests;
7
8use crate::protocols::request_response::handlers::generic_request_handler::{
9    GenericRequest, GenericRequestHandler,
10};
11use ab_core_primitives::pieces::{Piece, PieceIndex};
12use derive_more::{Deref, DerefMut, From, Into};
13use libp2p::kad::K_VALUE;
14use libp2p::multiaddr::Protocol;
15use libp2p::{Multiaddr, PeerId};
16use multihash::Multihash;
17use parity_scale_codec::{Compact, CompactLen, Decode, Encode, EncodeLike, Input, Output};
18use std::sync::Arc;
19
20/// Cached-piece-by-index request.
21///
22/// This is similar to `PieceByIndexRequest`, but will only respond with cached pieces.
23#[derive(Debug, Clone, Eq, PartialEq, Encode, Decode)]
24pub struct CachedPieceByIndexRequest {
25    /// Request key - piece index
26    pub piece_index: PieceIndex,
27    /// Additional pieces that requester is interested in if they are cached locally
28    // TODO: Use `Arc<[PieceIndex]>` once
29    //  https://github.com/paritytech/parity-scale-codec/issues/633 is resolved
30    pub cached_pieces: Arc<Vec<PieceIndex>>,
31}
32
33impl GenericRequest for CachedPieceByIndexRequest {
34    const PROTOCOL_NAME: &'static str = "/subspace/cached-piece-by-index/0.1.0";
35    const LOG_TARGET: &'static str = "cached-piece-by-index-request-response-handler";
36    type Response = CachedPieceByIndexResponse;
37}
38
39impl CachedPieceByIndexRequest {
40    /// Max number of cached piece indexes to accept per request, equals to the number of source
41    /// shards in a sector and fits nicely into a single TCP packet
42    pub const RECOMMENDED_LIMIT: usize = 128;
43}
44
45/// Closest peers
46#[derive(Debug, Default, PartialEq, Eq, Clone, From, Into, Deref, DerefMut)]
47pub struct ClosestPeers(Vec<(PeerId, Vec<Multiaddr>)>);
48
49impl Encode for ClosestPeers {
50    fn size_hint(&self) -> usize {
51        let mut size = Compact::compact_len(&(self.0.len() as u32));
52
53        for (peer_id, addresses) in &self.0 {
54            size += peer_id.as_ref().encoded_size();
55            size += Compact::compact_len(&(addresses.len() as u32));
56
57            for address in addresses {
58                size += address.as_ref().encoded_size();
59            }
60        }
61
62        size
63    }
64
65    fn encode_to<T>(&self, dest: &mut T)
66    where
67        T: Output + ?Sized,
68    {
69        Compact::from(self.0.len() as u32).encode_to(dest);
70
71        for (peer_id, addresses) in &self.0 {
72            peer_id.as_ref().encode_to(dest);
73            Compact::from(addresses.len() as u32).encode_to(dest);
74
75            for address in addresses {
76                address.as_ref().encode_to(dest);
77            }
78        }
79    }
80}
81
82impl EncodeLike for ClosestPeers {}
83
84impl Decode for ClosestPeers {
85    fn decode<I>(input: &mut I) -> Result<Self, parity_scale_codec::Error>
86    where
87        I: Input,
88    {
89        let mut closest_peers = Vec::with_capacity(K_VALUE.get());
90
91        let closest_peers_count = Compact::<u32>::decode(input)?.0 as usize;
92        for _ in 0..closest_peers_count {
93            let peer_id =
94                PeerId::from_multihash(Multihash::decode(input)?).map_err(|multihash| {
95                    parity_scale_codec::Error::from("Can't create `PeerId` from `Multihash`")
96                        .chain(format!("Code: {}", multihash.code()))
97                })?;
98            let p2p = Multiaddr::from(Protocol::P2p(peer_id));
99            let mut addresses = Vec::new();
100
101            let addresses_count = Compact::<u32>::decode(input)?.0 as usize;
102
103            if addresses_count == 0 {
104                return Err(parity_scale_codec::Error::from(
105                    "List of addresses must not be empty",
106                ));
107            }
108
109            for _ in 0..addresses_count {
110                let address = Multiaddr::try_from(Vec::<u8>::decode(input)?).map_err(|error| {
111                    parity_scale_codec::Error::from("Failed to decode `Multiaddr`")
112                        .chain(error.to_string())
113                })?;
114
115                if !address.ends_with(&p2p) {
116                    return Err(parity_scale_codec::Error::from(
117                        "`Multiaddr` doesn't end with correct p2p suffix",
118                    )
119                    .chain(format!("Address {address}, PeerId {p2p}")));
120                }
121
122                addresses.push(address);
123            }
124
125            closest_peers.push((peer_id, addresses));
126        }
127
128        Ok(Self(closest_peers))
129    }
130}
131
132/// Piece result contains either piece itself or the closest known peers to the piece index
133#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
134pub enum PieceResult {
135    /// Piece was cached locally
136    Piece(Piece),
137    /// Piece was not cached locally, but these are the closest known peers to the piece index
138    ClosestPeers(ClosestPeers),
139}
140
141/// Cached-piece-by-index response, may be cached piece or stored in one of the farms
142#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
143pub struct CachedPieceByIndexResponse {
144    /// Piece result
145    pub result: PieceResult,
146    /// Additional pieces that requester is interested in and are cached locally, order from
147    /// request is not preserved
148    pub cached_pieces: Vec<PieceIndex>,
149}
150
151/// Cached-piece-by-index request handler
152pub type CachedPieceByIndexRequestHandler = GenericRequestHandler<CachedPieceByIndexRequest>;