Skip to main content

ab_networking/protocols/request_response/
request_response_factory.rs

1// Copyright (C) 2019-2022 Parity Technologies (UK) Ltd.
2// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
3
4// This program is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with this program. If not, see <https://www.gnu.org/licenses/>.
16
17//! Collection of request-response protocols.
18//!
19//! The [`RequestResponse`] struct defined in this module provides support for zero or more
20//! so-called "request-response" protocols.
21//!
22//! A request-response protocol works in the following way:
23//!
24//! - For every emitted request, a new substream is open and the protocol is negotiated. If the
25//!   remote supports the protocol, the size of the request is sent as a LEB128 number, followed
26//!   with the request itself. The remote then sends the size of the response as a LEB128 number,
27//!   followed with the response.
28//!
29//! - Requests have a certain time limit before they time out. This time includes the time it takes
30//!   to send/receive the request and response.
31//!
32//! - If provided, a ["requests processing"](ProtocolConfig::inbound_queue) channel is used to
33//!   handle incoming requests.
34//!
35//! Original file commit: <https://github.com/paritytech/substrate/commit/c2fc4b3ca0d7a15cc3f9cb1e5f441d99ec8d6e0b>
36
37#[cfg(all(test, not(miri)))]
38mod tests;
39
40use async_trait::async_trait;
41use futures::channel::{mpsc, oneshot};
42use futures::prelude::*;
43use libp2p::StreamProtocol;
44use libp2p::core::transport::PortUse;
45use libp2p::core::{Endpoint, Multiaddr};
46use libp2p::identity::PeerId;
47use libp2p::request_response::{
48    Behaviour as RequestResponse, Codec as RequestResponseCodec, Config as RequestResponseConfig,
49    Event as RequestResponseEvent, InboundRequestId, Message as RequestResponseMessage,
50    OutboundRequestId, ProtocolSupport, ResponseChannel,
51};
52pub use libp2p::request_response::{InboundFailure, OutboundFailure};
53use libp2p::swarm::behaviour::{ConnectionClosed, DialFailure, FromSwarm, ListenFailure};
54use libp2p::swarm::handler::multi::MultiHandler;
55use libp2p::swarm::{
56    ConnectionDenied, ConnectionId, NetworkBehaviour, THandlerInEvent, THandlerOutEvent, ToSwarm,
57};
58use std::borrow::Cow;
59use std::collections::HashMap;
60use std::collections::hash_map::Entry;
61use std::pin::Pin;
62use std::task::{Context, Poll};
63use std::time::{Duration, Instant};
64use std::{fmt, io, iter};
65use tracing::{debug, error, warn};
66
67const LOG_TARGET: &str = "request-response-protocols";
68
69/// Defines a handler for the request-response protocol factory.
70#[async_trait]
71pub trait RequestHandler: Send {
72    /// Runs the underlying protocol handler.
73    async fn run(&mut self);
74
75    /// Returns a config for the request-response protocol factory.
76    fn protocol_config(&self) -> ProtocolConfig;
77
78    /// Returns a protocol name.
79    fn protocol_name(&self) -> &'static str;
80
81    /// Clone boxed value.
82    fn clone_box(&self) -> Box<dyn RequestHandler>;
83}
84
85impl Clone for Box<dyn RequestHandler> {
86    fn clone(&self) -> Self {
87        self.clone_box()
88    }
89}
90
91/// Configuration for a single request-response protocol.
92#[derive(Debug, Clone)]
93pub struct ProtocolConfig {
94    /// Name of the protocol on the wire. Should be something like `/foo/bar`.
95    pub name: &'static str,
96
97    /// Maximum allowed size, in bytes, of a request.
98    ///
99    /// Any request larger than this value will be declined as a way to avoid allocating too
100    /// much memory for it.
101    pub max_request_size: u64,
102
103    /// Maximum allowed size, in bytes, of a response.
104    ///
105    /// Any response larger than this value will be declined as a way to avoid allocating too
106    /// much memory for it.
107    pub max_response_size: u64,
108
109    /// Duration after which emitted requests are considered timed out.
110    ///
111    /// If you expect the response to come back quickly, you should set this to a smaller duration.
112    pub request_timeout: Duration,
113
114    /// Channel on which the networking service will send incoming requests.
115    ///
116    /// Every time a peer sends a request to the local node using this protocol, the networking
117    /// service will push an element on this channel. The receiving side of this channel then has
118    /// to pull this element, process the request, and send back the response to send back to the
119    /// peer.
120    ///
121    /// The size of the channel has to be carefully chosen. If the channel is full, the networking
122    /// service will discard the incoming request send back an error to the peer. Consequently,
123    /// the channel being full is an indicator that the node is overloaded.
124    ///
125    /// You can typically set the size of the channel to `T / d`, where `T` is the
126    /// `request_timeout` and `d` is the expected average duration of CPU and I/O it takes to
127    /// build a response.
128    ///
129    /// Can be `None` if the local node does not support answering incoming requests.
130    /// If this is `None`, then the local node will not advertise support for this protocol towards
131    /// other peers. If this is `Some` but the channel is closed, then the local node will
132    /// advertise support for this protocol, but any incoming request will lead to an error being
133    /// sent back.
134    pub inbound_queue: Option<mpsc::Sender<IncomingRequest>>,
135}
136
137impl ProtocolConfig {
138    /// Creates request-response protocol config.
139    pub fn new(protocol_name: &'static str) -> ProtocolConfig {
140        ProtocolConfig {
141            name: protocol_name,
142            max_request_size: 1024 * 1024,
143            max_response_size: 16 * 1024 * 1024,
144            request_timeout: Duration::from_secs(20),
145            inbound_queue: None,
146        }
147    }
148}
149
150/// A single request received by a peer on a request-response protocol.
151#[derive(Debug)]
152pub struct IncomingRequest {
153    /// Who sent the request.
154    pub peer: PeerId,
155
156    /// Request sent by the remote. Will always be smaller than
157    /// [`ProtocolConfig::max_request_size`].
158    pub payload: Vec<u8>,
159
160    /// Channel to send back the response.
161    ///
162    /// There are two ways to indicate that handling the request failed:
163    ///
164    /// 1. Drop `pending_response` and thus not changing the reputation of the peer.
165    ///
166    /// 2. Sending an `Err(())` via `pending_response`, optionally including reputation changes for
167    ///    the given peer.
168    pub pending_response: oneshot::Sender<OutgoingResponse>,
169}
170
171/// Response for an incoming request to be send by a request protocol handler.
172#[derive(Debug)]
173pub struct OutgoingResponse {
174    /// The payload of the response.
175    ///
176    /// `Err(())` if none is available e.g. due an error while handling the request.
177    pub result: Result<Vec<u8>, ()>,
178
179    /// If provided, the `oneshot::Sender` will be notified when the request has been sent to the
180    /// peer.
181    ///
182    /// Note: Operating systems typically maintain a buffer of a few dozen kilobytes of
183    /// outgoing data for each TCP socket, and it is not possible for a user
184    /// application to inspect this buffer. This channel here is not actually notified
185    /// when the response has been fully sent out, but rather when it has fully been
186    /// written to the buffer managed by the operating system.
187    pub sent_feedback: Option<oneshot::Sender<()>>,
188}
189
190/// Event generated by the [`RequestResponseFactoryBehaviour`].
191#[derive(Debug)]
192// We are not reading these events in a meaningful way right now, but the fields in there are still
193// potentially useful
194pub enum Event {
195    /// A remote sent a request and either we have successfully answered it or an error happened.
196    ///
197    /// This event is generated for statistics purposes.
198    InboundRequest {
199        /// Peer which has emitted the request.
200        peer: PeerId,
201        /// Name of the protocol in question.
202        protocol: Cow<'static, str>,
203        /// Whether handling the request was successful or unsuccessful.
204        ///
205        /// When successful contains the time elapsed between when we received the request and when
206        /// we sent back the response. When unsuccessful contains the failure reason.
207        result: Result<(), ResponseFailure>,
208    },
209
210    /// A request initiated using [`RequestResponseFactoryBehaviour::send_request`] has succeeded or
211    /// failed.
212    ///
213    /// This event is generated for statistics purposes.
214    RequestFinished {
215        /// Peer that we sent the request to, if one was chosen.
216        peer: Option<PeerId>,
217        /// Name of the protocol in question.
218        protocol: Cow<'static, str>,
219        /// Duration the request took.
220        duration: Duration,
221        /// Result of the request.
222        result: Result<(), String>,
223    },
224}
225
226/// Combination of a protocol name and a request id.
227///
228/// Uniquely identifies an inbound or outbound request among all handled protocols. Note however
229/// that uniqueness is only guaranteed between two inbound and likewise between two outbound
230/// requests. There is no uniqueness guarantee in a set of both inbound and outbound
231/// [`ProtocolRequestId`]s.
232#[derive(Debug, Clone, PartialEq, Eq, Hash)]
233struct ProtocolRequestId {
234    protocol: Cow<'static, str>,
235    request_id: OutboundRequestId,
236}
237
238impl From<(Cow<'static, str>, OutboundRequestId)> for ProtocolRequestId {
239    #[inline]
240    fn from((protocol, request_id): (Cow<'static, str>, OutboundRequestId)) -> Self {
241        Self {
242            protocol,
243            request_id,
244        }
245    }
246}
247
248/// When sending a request, what to do on a disconnected recipient
249#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
250pub enum IfDisconnected {
251    /// Try to connect to the peer
252    TryConnect,
253    /// Just fail if the destination is not yet connected
254    ImmediateError,
255}
256
257/// Convenience functions for `IfDisconnected`.
258impl IfDisconnected {
259    /// Shall we connect to a disconnected peer?
260    pub fn should_connect(self) -> bool {
261        match self {
262            Self::TryConnect => true,
263            Self::ImmediateError => false,
264        }
265    }
266}
267
268/// Implementation of `NetworkBehaviour` that provides support for multiple request-response
269/// protocols.
270#[expect(
271    clippy::type_complexity,
272    reason = "To preserve compatibility with copied implementation"
273)]
274pub struct RequestResponseFactoryBehaviour {
275    /// The multiple sub-protocols, by name.
276    /// Contains the underlying libp2p `RequestResponse` behaviour, plus an optional
277    /// "response builder" used to build responses for incoming requests.
278    protocols: HashMap<
279        Cow<'static, str>,
280        (
281            RequestResponse<GenericCodec>,
282            Option<mpsc::Sender<IncomingRequest>>,
283        ),
284    >,
285
286    /// Pending requests, passed down to a [`RequestResponse`] behaviour, awaiting a reply.
287    pending_requests:
288        HashMap<ProtocolRequestId, (Instant, oneshot::Sender<Result<Vec<u8>, RequestFailure>>)>,
289
290    /// Whenever an incoming request arrives, a `Future` is added to this list and will yield the
291    /// start time and the response to send back to the remote.
292    pending_responses: stream::FuturesUnordered<
293        Pin<Box<dyn Future<Output = Option<RequestProcessingOutcome>> + Send>>,
294    >,
295
296    /// Pending message request, holds `MessageRequest` as a Future state to poll it
297    /// until we get a response from `Peerset`
298    message_request: Option<MessageRequest>,
299
300    /// Request handlers future collection.
301    request_handlers: Vec<Pin<Box<dyn Future<Output = ()> + Send>>>,
302}
303
304impl fmt::Debug for RequestResponseFactoryBehaviour {
305    #[inline]
306    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
307        f.debug_struct("RequestResponseFactoryBehaviour")
308            .finish_non_exhaustive()
309    }
310}
311
312// This is a state of processing incoming request Message.
313struct MessageRequest {
314    peer: PeerId,
315    request_id: InboundRequestId,
316    request: Vec<u8>,
317    channel: ResponseChannel<Result<Vec<u8>, ()>>,
318    protocol: String,
319    response_builder: Option<mpsc::Sender<IncomingRequest>>,
320}
321
322/// Generated by the response builder and waiting to be processed.
323struct RequestProcessingOutcome {
324    request_id: InboundRequestId,
325    protocol: Cow<'static, str>,
326    inner_channel: ResponseChannel<Result<Vec<u8>, ()>>,
327    response: OutgoingResponse,
328}
329
330impl RequestResponseFactoryBehaviour {
331    /// Creates a new behaviour. Must be passed a list of supported protocols. Returns an error if
332    /// the same protocol is passed twice.
333    pub fn new(
334        list: impl IntoIterator<Item = Box<dyn RequestHandler>>,
335        max_concurrent_streams: usize,
336    ) -> Result<Self, RegisterError> {
337        let mut protocols = HashMap::new();
338        let mut request_handlers = Vec::new();
339        for mut handler in list {
340            let config = handler.protocol_config();
341
342            let protocol_support = if config.inbound_queue.is_some() {
343                ProtocolSupport::Full
344            } else {
345                ProtocolSupport::Outbound
346            };
347
348            let rq_rp = RequestResponse::with_codec(
349                GenericCodec {
350                    max_request_size: config.max_request_size,
351                    max_response_size: config.max_response_size,
352                },
353                iter::once(StreamProtocol::new(config.name)).zip(iter::repeat(protocol_support)),
354                RequestResponseConfig::default()
355                    .with_request_timeout(config.request_timeout)
356                    .with_max_concurrent_streams(max_concurrent_streams),
357            );
358
359            match protocols.entry(Cow::Borrowed(config.name)) {
360                Entry::Vacant(e) => e.insert((rq_rp, config.inbound_queue)),
361                Entry::Occupied(e) => {
362                    return Err(RegisterError::DuplicateProtocol(e.key().clone()));
363                }
364            };
365
366            let request_handler_run: Pin<Box<dyn Future<Output = ()> + Send>> =
367                Box::pin(async move { handler.run().await }.fuse());
368
369            request_handlers.push(request_handler_run);
370        }
371
372        Ok(Self {
373            protocols,
374            pending_requests: Default::default(),
375            pending_responses: Default::default(),
376            message_request: None,
377            request_handlers,
378        })
379    }
380
381    /// Initiates sending a request.
382    ///
383    /// If there is no established connection to the target peer, the behavior is determined by the
384    /// choice of `connect`.
385    ///
386    /// An error is returned if the protocol doesn't match one that has been registered.
387    pub fn send_request(
388        &mut self,
389        target: &PeerId,
390        protocol_name: &str,
391        request: Vec<u8>,
392        pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
393        connect: IfDisconnected,
394        addresses: Vec<Multiaddr>,
395    ) {
396        if let Some((protocol, _)) = self.protocols.get_mut(protocol_name) {
397            if protocol.is_connected(target) || connect.should_connect() {
398                let request_id = protocol.send_request_with_addresses(target, request, addresses);
399                let prev_req_id = self.pending_requests.insert(
400                    (protocol_name.to_string().into(), request_id).into(),
401                    (Instant::now(), pending_response),
402                );
403                debug_assert!(prev_req_id.is_none(), "Expect request id to be unique.");
404            } else if pending_response
405                .send(Err(RequestFailure::NotConnected))
406                .is_err()
407            {
408                debug!(
409                    target: LOG_TARGET,
410                    "Not connected to peer {:?}. At the same time local \
411                     node is no longer interested in the result.",
412                    target,
413                );
414            }
415        } else if pending_response
416            .send(Err(RequestFailure::UnknownProtocol))
417            .is_err()
418        {
419            debug!(
420                target: LOG_TARGET,
421                "Unknown protocol {:?}. At the same time local \
422                 node is no longer interested in the result.",
423                protocol_name,
424            );
425        }
426    }
427}
428
429impl NetworkBehaviour for RequestResponseFactoryBehaviour {
430    type ConnectionHandler = MultiHandler<
431        String,
432        <RequestResponse<GenericCodec> as NetworkBehaviour>::ConnectionHandler,
433    >;
434    type ToSwarm = Event;
435
436    fn handle_established_inbound_connection(
437        &mut self,
438        connection_id: ConnectionId,
439        peer: PeerId,
440        local_addr: &Multiaddr,
441        remote_addr: &Multiaddr,
442    ) -> Result<Self::ConnectionHandler, ConnectionDenied> {
443        let iter = self.protocols.iter_mut().map(|(p, (r, _))| {
444            (
445                p.to_string(),
446                r.handle_established_inbound_connection(
447                    connection_id,
448                    peer,
449                    local_addr,
450                    remote_addr,
451                )
452                .expect(
453                    "Behaviours return handlers in these methods with the exception of \
454                    'connection management' behaviours like connection-limits or allow-black list. \
455                    So, inner request-response behaviour always returns Ok(handler).",
456                ),
457            )
458        });
459
460        let handler = MultiHandler::try_from_iter(iter).expect(
461            "Protocols are in a HashMap and there can be at most one handler per protocol name, \
462			 which is the only possible error; qed",
463        );
464
465        Ok(handler)
466    }
467
468    fn handle_established_outbound_connection(
469        &mut self,
470        connection_id: ConnectionId,
471        peer: PeerId,
472        addr: &Multiaddr,
473        role_override: Endpoint,
474        port_use: PortUse,
475    ) -> Result<Self::ConnectionHandler, ConnectionDenied> {
476        let iter = self.protocols.iter_mut().map(|(p, (r, _))| {
477            (
478                p.to_string(),
479                r.handle_established_outbound_connection(
480                    connection_id,
481                    peer,
482                    addr,
483                    role_override,
484                    port_use,
485                )
486                .expect(
487                    "Behaviours return handlers in these methods with the exception of \
488                        'connection management' behaviours like connection-limits or allow-black \
489                        list. So, inner request-response behaviour always returns Ok(handler).",
490                ),
491            )
492        });
493
494        let handler = MultiHandler::try_from_iter(iter).expect(
495            "Protocols are in a HashMap and there can be at most one handler per protocol name, \
496            which is the only possible error; qed",
497        );
498
499        Ok(handler)
500    }
501
502    /// Informs the behaviour about an event from the [`Swarm`](libp2p::Swarm).
503    fn on_swarm_event(&mut self, event: FromSwarm<'_>) {
504        match event {
505            FromSwarm::ConnectionEstablished(inner) => {
506                for (protocol, _) in self.protocols.values_mut() {
507                    protocol.on_swarm_event(FromSwarm::ConnectionEstablished(inner));
508                }
509            }
510            FromSwarm::ConnectionClosed(inner) => {
511                for (protocol, _) in self.protocols.values_mut() {
512                    protocol.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
513                        peer_id: inner.peer_id,
514                        connection_id: inner.connection_id,
515                        endpoint: inner.endpoint,
516                        cause: inner.cause,
517                        remaining_established: inner.remaining_established,
518                    }));
519                }
520            }
521            FromSwarm::AddressChange(inner) => {
522                for (protocol, _) in self.protocols.values_mut() {
523                    protocol.on_swarm_event(FromSwarm::AddressChange(inner));
524                }
525            }
526            FromSwarm::DialFailure(inner) => {
527                for (protocol, _) in self.protocols.values_mut() {
528                    protocol.on_swarm_event(FromSwarm::DialFailure(DialFailure {
529                        peer_id: inner.peer_id,
530                        error: inner.error,
531                        connection_id: inner.connection_id,
532                    }));
533                }
534            }
535            FromSwarm::ListenFailure(inner) => {
536                for (protocol, _) in self.protocols.values_mut() {
537                    protocol.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
538                        local_addr: inner.local_addr,
539                        send_back_addr: inner.send_back_addr,
540                        error: inner.error,
541                        connection_id: inner.connection_id,
542                        peer_id: inner.peer_id,
543                    }));
544                }
545            }
546            FromSwarm::NewListener(inner) => {
547                for (protocol, _) in self.protocols.values_mut() {
548                    protocol.on_swarm_event(FromSwarm::NewListener(inner));
549                }
550            }
551            FromSwarm::NewListenAddr(inner) => {
552                for (protocol, _) in self.protocols.values_mut() {
553                    protocol.on_swarm_event(FromSwarm::NewListenAddr(inner));
554                }
555            }
556            FromSwarm::ExpiredListenAddr(inner) => {
557                for (protocol, _) in self.protocols.values_mut() {
558                    protocol.on_swarm_event(FromSwarm::ExpiredListenAddr(inner));
559                }
560            }
561            FromSwarm::ListenerError(inner) => {
562                for (protocol, _) in self.protocols.values_mut() {
563                    protocol.on_swarm_event(FromSwarm::ListenerError(inner));
564                }
565            }
566            FromSwarm::ListenerClosed(inner) => {
567                for (protocol, _) in self.protocols.values_mut() {
568                    protocol.on_swarm_event(FromSwarm::ListenerClosed(inner));
569                }
570            }
571            FromSwarm::NewExternalAddrCandidate(inner) => {
572                for (protocol, _) in self.protocols.values_mut() {
573                    protocol.on_swarm_event(FromSwarm::NewExternalAddrCandidate(inner));
574                }
575            }
576            FromSwarm::ExternalAddrConfirmed(inner) => {
577                for (protocol, _) in self.protocols.values_mut() {
578                    protocol.on_swarm_event(FromSwarm::ExternalAddrConfirmed(inner));
579                }
580            }
581            FromSwarm::ExternalAddrExpired(inner) => {
582                for (protocol, _) in self.protocols.values_mut() {
583                    protocol.on_swarm_event(FromSwarm::ExternalAddrExpired(inner));
584                }
585            }
586            FromSwarm::NewExternalAddrOfPeer(inner) => {
587                for (protocol, _) in self.protocols.values_mut() {
588                    protocol.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(inner));
589                }
590            }
591            event => {
592                warn!(
593                    ?event,
594                    "New event must be forwarded to request response protocols"
595                );
596            }
597        }
598    }
599
600    fn on_connection_handler_event(
601        &mut self,
602        peer_id: PeerId,
603        connection: ConnectionId,
604        event: THandlerOutEvent<Self>,
605    ) {
606        let p_name = event.0;
607        if let Some((proto, _)) = self.protocols.get_mut(&*p_name) {
608            return proto.on_connection_handler_event(peer_id, connection, event.1);
609        }
610
611        warn!(
612            target: LOG_TARGET,
613            "inject_node_event: no request-response instance registered for protocol {:?}", p_name
614        );
615    }
616
617    fn poll(
618        &mut self,
619        cx: &mut Context<'_>,
620    ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
621        'poll_all: loop {
622            if let Some(message_request) = self.message_request.take() {
623                let MessageRequest {
624                    peer,
625                    request_id,
626                    request,
627                    channel,
628                    protocol,
629                    response_builder,
630                } = message_request;
631
632                let (tx, rx) = oneshot::channel();
633
634                // Submit the request to the "response builder" passed by the user at
635                // initialization.
636                if let Some(mut response_builder) = response_builder {
637                    // If the response builder is too busy, silently drop `tx`. This
638                    // will be reported by the corresponding `RequestResponse` through
639                    // an `InboundFailure::Omission` event.
640                    let _: Result<(), _> = response_builder.try_send(IncomingRequest {
641                        peer,
642                        payload: request,
643                        pending_response: tx,
644                    });
645                } else {
646                    debug_assert!(false, "Received message on outbound-only protocol.");
647                }
648
649                self.pending_responses.push(Box::pin(async move {
650                    // The `tx` created above can be dropped if we are not capable of
651                    // processing this request, which is reflected as a
652                    // `InboundFailure::Omission` event.
653                    if let Ok(response) = rx.await {
654                        Some(RequestProcessingOutcome {
655                            request_id,
656                            protocol: Cow::from(protocol),
657                            inner_channel: channel,
658                            response,
659                        })
660                    } else {
661                        None
662                    }
663                }));
664
665                // This `continue` makes sure that `pending_responses` gets polled
666                // after we have added the new element.
667                continue 'poll_all;
668            }
669            // Poll to see if any response is ready to be sent back.
670            while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) {
671                let Some(RequestProcessingOutcome {
672                    request_id,
673                    protocol: protocol_name,
674                    inner_channel,
675                    response: OutgoingResponse { result, .. },
676                }) = outcome
677                else {
678                    // The response builder was too busy, or handling the request failed. This is
679                    // later on reported as `InboundFailure::Omission`.
680                    continue;
681                };
682
683                if let Ok(payload) = result
684                    && let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name)
685                    && protocol.send_response(inner_channel, Ok(payload)).is_err()
686                {
687                    // Note: Failure is handled further below when receiving
688                    // `InboundFailure` event from `RequestResponse` behaviour.
689                    debug!(
690                        target: LOG_TARGET,
691                        %request_id,
692                        "Failed to send response for request on protocol {} due to a \
693                        timeout or due to the connection to the peer being closed. \
694                        Dropping response",
695                        protocol_name,
696                    );
697                }
698            }
699
700            for rq_rs_runner in &mut self.request_handlers {
701                // Future.Output == (), so we don't need a result here
702                let _: Poll<()> = rq_rs_runner.poll_unpin(cx);
703            }
704
705            // Poll request-responses protocols.
706            for (protocol, (behaviour, response_builder)) in &mut self.protocols {
707                while let Poll::Ready(event) = behaviour.poll(cx) {
708                    let event = match event {
709                        // Main events we are interested in.
710                        ToSwarm::GenerateEvent(event) => event,
711
712                        // Other events generated by the underlying behaviour are transparently
713                        // passed through.
714                        ToSwarm::Dial { opts } => {
715                            if opts.get_peer_id().is_none() {
716                                error!(
717                                    "The request-response isn't supposed to start dialing \
718                                    addresses"
719                                );
720                            }
721                            return Poll::Ready(ToSwarm::Dial { opts });
722                        }
723                        ToSwarm::NotifyHandler {
724                            peer_id,
725                            handler,
726                            event,
727                        } => {
728                            return Poll::Ready(ToSwarm::NotifyHandler {
729                                peer_id,
730                                handler,
731                                event: ((*protocol).to_string(), event),
732                            });
733                        }
734                        ToSwarm::CloseConnection {
735                            peer_id,
736                            connection,
737                        } => {
738                            return Poll::Ready(ToSwarm::CloseConnection {
739                                peer_id,
740                                connection,
741                            });
742                        }
743                        ToSwarm::NewExternalAddrCandidate(observed) => {
744                            return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed));
745                        }
746                        ToSwarm::ExternalAddrConfirmed(addr) => {
747                            return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr));
748                        }
749                        ToSwarm::ExternalAddrExpired(addr) => {
750                            return Poll::Ready(ToSwarm::ExternalAddrExpired(addr));
751                        }
752                        ToSwarm::ListenOn { opts } => {
753                            return Poll::Ready(ToSwarm::ListenOn { opts });
754                        }
755                        ToSwarm::RemoveListener { id } => {
756                            return Poll::Ready(ToSwarm::RemoveListener { id });
757                        }
758                        event => {
759                            warn!(
760                                ?event,
761                                "New event from request response protocol must be send up"
762                            );
763
764                            continue;
765                        }
766                    };
767
768                    match event {
769                        // Received a request from a remote.
770                        RequestResponseEvent::Message {
771                            peer,
772                            message:
773                                RequestResponseMessage::Request {
774                                    request_id,
775                                    request,
776                                    channel,
777                                },
778                            ..
779                        } => {
780                            self.message_request = Some(MessageRequest {
781                                peer,
782                                request_id,
783                                request,
784                                channel,
785                                protocol: protocol.to_string(),
786                                response_builder: response_builder.clone(),
787                            });
788
789                            // This `continue` makes sure that `message_request` gets polled
790                            // after we have added the new element.
791                            continue 'poll_all;
792                        }
793
794                        // Received a response from a remote to one of our requests.
795                        RequestResponseEvent::Message {
796                            peer,
797                            message:
798                                RequestResponseMessage::Response {
799                                    request_id,
800                                    response,
801                                },
802                            ..
803                        } => {
804                            let (started, delivered) = if let Some((started, pending_response)) =
805                                self.pending_requests
806                                    .remove(&(protocol.clone(), request_id).into())
807                            {
808                                let delivered = pending_response
809                                    .send(response.map_err(|()| RequestFailure::Refused))
810                                    .map_err(|_error_result| RequestFailure::Obsolete.to_string());
811                                (started, delivered)
812                            } else {
813                                warn!(
814                                    target: LOG_TARGET,
815                                    "Received `RequestResponseEvent::Message` with unexpected request id {:?}",
816                                    request_id,
817                                );
818                                debug_assert!(false);
819                                continue;
820                            };
821
822                            let out = Event::RequestFinished {
823                                peer: Some(peer),
824                                protocol: protocol.clone(),
825                                duration: started.elapsed(),
826                                result: delivered,
827                            };
828
829                            return Poll::Ready(ToSwarm::GenerateEvent(out));
830                        }
831
832                        // One of our requests has failed.
833                        RequestResponseEvent::OutboundFailure {
834                            peer,
835                            request_id,
836                            error,
837                            ..
838                        } => {
839                            let error_string = error.to_string();
840                            let started = if let Some((started, pending_response)) = self
841                                .pending_requests
842                                .remove(&(protocol.clone(), request_id).into())
843                            {
844                                if pending_response
845                                    .send(Err(RequestFailure::Network(error)))
846                                    .is_err()
847                                {
848                                    debug!(
849                                        target: LOG_TARGET,
850                                        %request_id,
851                                        "Request failed. At the same time local node is no longer interested in \
852                                        the result",
853                                    );
854                                }
855                                started
856                            } else {
857                                warn!(
858                                    target: LOG_TARGET,
859                                    %request_id,
860                                    "Received `RequestResponseEvent::Message` with unexpected request",
861                                );
862                                debug_assert!(false);
863                                continue;
864                            };
865
866                            let out = Event::RequestFinished {
867                                peer: Some(peer),
868                                protocol: protocol.clone(),
869                                duration: started.elapsed(),
870                                result: Err(error_string),
871                            };
872
873                            return Poll::Ready(ToSwarm::GenerateEvent(out));
874                        }
875
876                        // An inbound request failed, either while reading the request or due to
877                        // failing to send a response.
878                        RequestResponseEvent::InboundFailure { peer, error, .. } => {
879                            debug!(?error, %peer, "Inbound request failed.");
880
881                            let out = Event::InboundRequest {
882                                peer,
883                                protocol: protocol.clone(),
884                                result: Err(ResponseFailure::Network(error)),
885                            };
886                            return Poll::Ready(ToSwarm::GenerateEvent(out));
887                        }
888
889                        // A response to an inbound request has been sent.
890                        RequestResponseEvent::ResponseSent { peer, .. } => {
891                            let out = Event::InboundRequest {
892                                peer,
893                                protocol: protocol.clone(),
894                                result: Ok(()),
895                            };
896
897                            return Poll::Ready(ToSwarm::GenerateEvent(out));
898                        }
899                    };
900                }
901            }
902
903            break Poll::Pending;
904        }
905    }
906}
907
908/// Error when registering a protocol.
909#[derive(Debug, thiserror::Error)]
910pub enum RegisterError {
911    /// A protocol has been specified multiple times.
912    #[error("{0}")]
913    DuplicateProtocol(Cow<'static, str>),
914}
915
916/// Error in a request
917#[derive(Debug, thiserror::Error)]
918pub enum RequestFailure {
919    /// We are not currently connected to the requested peer
920    #[error("We are not currently connected to the requested peer")]
921    NotConnected,
922    /// Given protocol hasn't been registered
923    #[error("Given protocol hasn't been registered")]
924    UnknownProtocol,
925    /// Remote has closed the substream before answering, thereby signaling that it considers the
926    /// request as valid, but refused to answer it
927    #[error(
928        "Remote has closed the substream before answering, thereby signaling that it considers the \
929        request as valid, but refused to answer it"
930    )]
931    Refused,
932    /// The remote replied, but the local node is no longer interested in the response
933    #[error("The remote replied, but the local node is no longer interested in the response")]
934    Obsolete,
935    /// Problem on the network
936    #[error("Problem on the network: {0}")]
937    Network(OutboundFailure),
938}
939
940/// Error when processing a request sent by a remote.
941#[derive(Debug, thiserror::Error)]
942pub enum ResponseFailure {
943    /// Problem on the network.
944    #[error("Problem on the network: {0}")]
945    Network(InboundFailure),
946}
947
948/// Implements the libp2p [`RequestResponseCodec`] trait. Defines how streams of bytes are turned
949/// into requests and responses and vice-versa.
950#[derive(Debug, Clone)]
951#[doc(hidden)] // Needs to be public in order to satisfy the Rust compiler.
952pub struct GenericCodec {
953    max_request_size: u64,
954    max_response_size: u64,
955}
956
957impl RequestResponseCodec for GenericCodec {
958    type Protocol = StreamProtocol;
959    type Request = Vec<u8>;
960    type Response = Result<Vec<u8>, ()>;
961
962    async fn read_request<T>(
963        &mut self,
964        _: &Self::Protocol,
965        mut io: &mut T,
966    ) -> io::Result<Self::Request>
967    where
968        T: AsyncRead + Unpin + Send,
969    {
970        // Read the length.
971        let length = unsigned_varint::aio::read_usize(&mut io)
972            .await
973            .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
974        if length > usize::try_from(self.max_request_size).unwrap_or(usize::MAX) {
975            return Err(io::Error::new(
976                io::ErrorKind::InvalidInput,
977                format!(
978                    "Request size exceeds limit: {} > {}",
979                    length, self.max_request_size
980                ),
981            ));
982        }
983
984        // Read the payload.
985        let mut buffer = vec![0; length];
986        io.read_exact(&mut buffer).await?;
987        Ok(buffer)
988    }
989
990    async fn read_response<T>(
991        &mut self,
992        _: &Self::Protocol,
993        mut io: &mut T,
994    ) -> io::Result<Self::Response>
995    where
996        T: AsyncRead + Unpin + Send,
997    {
998        // Note that this function returns a `Result<Result<...>>`. Returning an `Err` is
999        // considered as a protocol error and will result in the entire connection being closed.
1000        // Returning `Ok(Err(_))` signifies that a response has successfully been fetched, and
1001        // that this response is an error.
1002
1003        // Read the length.
1004        let length = match unsigned_varint::aio::read_usize(&mut io).await {
1005            Ok(l) => l,
1006            Err(unsigned_varint::io::ReadError::Io(err))
1007                if matches!(err.kind(), io::ErrorKind::UnexpectedEof) =>
1008            {
1009                return Ok(Err(()));
1010            }
1011            Err(err) => return Err(io::Error::new(io::ErrorKind::InvalidInput, err)),
1012        };
1013
1014        if length > usize::try_from(self.max_response_size).unwrap_or(usize::MAX) {
1015            return Err(io::Error::new(
1016                io::ErrorKind::InvalidInput,
1017                format!(
1018                    "Response size exceeds limit: {} > {}",
1019                    length, self.max_response_size
1020                ),
1021            ));
1022        }
1023
1024        // Read the payload.
1025        let mut buffer = vec![0; length];
1026        io.read_exact(&mut buffer).await?;
1027        Ok(Ok(buffer))
1028    }
1029
1030    async fn write_request<T>(
1031        &mut self,
1032        _: &Self::Protocol,
1033        io: &mut T,
1034        req: Self::Request,
1035    ) -> io::Result<()>
1036    where
1037        T: AsyncWrite + Unpin + Send,
1038    {
1039        // Write the length.
1040        {
1041            let mut buffer = unsigned_varint::encode::usize_buffer();
1042            io.write_all(unsigned_varint::encode::usize(req.len(), &mut buffer))
1043                .await?;
1044        }
1045
1046        // Write the payload.
1047        io.write_all(&req).await?;
1048
1049        io.close().await?;
1050        Ok(())
1051    }
1052
1053    async fn write_response<T>(
1054        &mut self,
1055        _: &Self::Protocol,
1056        io: &mut T,
1057        res: Self::Response,
1058    ) -> io::Result<()>
1059    where
1060        T: AsyncWrite + Unpin + Send,
1061    {
1062        // If `res` is an `Err`, we jump to closing the substream without writing anything on it.
1063        if let Ok(res) = res {
1064            // Write the length.
1065            {
1066                let mut buffer = unsigned_varint::encode::usize_buffer();
1067                io.write_all(unsigned_varint::encode::usize(res.len(), &mut buffer))
1068                    .await?;
1069            }
1070
1071            // Write the payload.
1072            io.write_all(&res).await?;
1073        }
1074
1075        io.close().await?;
1076        Ok(())
1077    }
1078}