Skip to main content

ab_networking/protocols/request_response/
request_response_factory.rs

1// Copyright (C) 2019-2022 Parity Technologies (UK) Ltd.
2// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
3
4// This program is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with this program. If not, see <https://www.gnu.org/licenses/>.
16
17//! Collection of request-response protocols.
18//!
19//! The [`RequestResponse`] struct defined in this module provides support for zero or more
20//! so-called "request-response" protocols.
21//!
22//! A request-response protocol works in the following way:
23//!
24//! - For every emitted request, a new substream is open and the protocol is negotiated. If the
25//!   remote supports the protocol, the size of the request is sent as a LEB128 number, followed
26//!   with the request itself. The remote then sends the size of the response as a LEB128 number,
27//!   followed with the response.
28//!
29//! - Requests have a certain time limit before they time out. This time includes the time it takes
30//!   to send/receive the request and response.
31//!
32//! - If provided, a [requests processing](ProtocolConfig::inbound_queue) channel is used to handle
33//!   incoming requests.
34//!
35//! Original file commit: <https://github.com/paritytech/substrate/commit/c2fc4b3ca0d7a15cc3f9cb1e5f441d99ec8d6e0b>
36
37#[cfg(all(test, not(miri)))]
38mod tests;
39
40use async_trait::async_trait;
41use futures::channel::{mpsc, oneshot};
42use futures::prelude::*;
43use futures::stream::FuturesUnordered;
44use libp2p::StreamProtocol;
45use libp2p::core::transport::PortUse;
46use libp2p::core::{Endpoint, Multiaddr};
47use libp2p::identity::PeerId;
48use libp2p::request_response::{
49    Behaviour as RequestResponse, Codec as RequestResponseCodec, Config as RequestResponseConfig,
50    Event as RequestResponseEvent, InboundRequestId, Message as RequestResponseMessage,
51    OutboundRequestId, ProtocolSupport, ResponseChannel,
52};
53pub use libp2p::request_response::{InboundFailure, OutboundFailure};
54use libp2p::swarm::behaviour::{ConnectionClosed, DialFailure, FromSwarm, ListenFailure};
55use libp2p::swarm::handler::multi::MultiHandler;
56use libp2p::swarm::{
57    ConnectionDenied, ConnectionId, NetworkBehaviour, THandlerInEvent, THandlerOutEvent, ToSwarm,
58};
59use std::borrow::Cow;
60use std::collections::HashMap;
61use std::collections::hash_map::Entry;
62use std::pin::Pin;
63use std::task::{Context, Poll};
64use std::time::{Duration, Instant};
65use std::{fmt, io, iter};
66use tracing::{debug, error, warn};
67
68const LOG_TARGET: &str = "request-response-protocols";
69
70/// Defines a handler for the request-response protocol factory.
71#[async_trait]
72pub trait RequestHandler: Send {
73    /// Runs the underlying protocol handler.
74    async fn run(&mut self);
75
76    /// Returns a config for the request-response protocol factory.
77    fn protocol_config(&self) -> ProtocolConfig;
78
79    /// Returns a protocol name.
80    fn protocol_name(&self) -> &'static str;
81
82    /// Clone boxed value.
83    fn clone_box(&self) -> Box<dyn RequestHandler>;
84}
85
86impl Clone for Box<dyn RequestHandler> {
87    fn clone(&self) -> Self {
88        self.clone_box()
89    }
90}
91
92/// Configuration for a single request-response protocol.
93#[derive(Debug, Clone)]
94pub struct ProtocolConfig {
95    /// Name of the protocol on the wire. Should be something like `/foo/bar`.
96    pub name: &'static str,
97
98    /// Maximum allowed size, in bytes, of a request.
99    ///
100    /// Any request larger than this value will be declined as a way to avoid allocating too
101    /// much memory for it.
102    pub max_request_size: u64,
103
104    /// Maximum allowed size, in bytes, of a response.
105    ///
106    /// Any response larger than this value will be declined as a way to avoid allocating too
107    /// much memory for it.
108    pub max_response_size: u64,
109
110    /// Duration after which emitted requests are considered timed out.
111    ///
112    /// If you expect the response to come back quickly, you should set this to a smaller duration.
113    pub request_timeout: Duration,
114
115    /// Channel on which the networking service will send incoming requests.
116    ///
117    /// Every time a peer sends a request to the local node using this protocol, the networking
118    /// service will push an element on this channel. The receiving side of this channel then has
119    /// to pull this element, process the request, and send back the response to send back to the
120    /// peer.
121    ///
122    /// The size of the channel has to be carefully chosen. If the channel is full, the networking
123    /// service will discard the incoming request send back an error to the peer. Consequently,
124    /// the channel being full is an indicator that the node is overloaded.
125    ///
126    /// You can typically set the size of the channel to `T / d`, where `T` is the
127    /// `request_timeout` and `d` is the expected average duration of CPU and I/O it takes to
128    /// build a response.
129    ///
130    /// Can be `None` if the local node does not support answering incoming requests.
131    /// If this is `None`, then the local node will not advertise support for this protocol towards
132    /// other peers. If this is `Some` but the channel is closed, then the local node will
133    /// advertise support for this protocol, but any incoming request will lead to an error being
134    /// sent back.
135    pub inbound_queue: Option<mpsc::Sender<IncomingRequest>>,
136}
137
138impl ProtocolConfig {
139    /// Creates request-response protocol config.
140    pub fn new(protocol_name: &'static str) -> ProtocolConfig {
141        ProtocolConfig {
142            name: protocol_name,
143            max_request_size: 1024 * 1024,
144            max_response_size: 16 * 1024 * 1024,
145            request_timeout: Duration::from_secs(20),
146            inbound_queue: None,
147        }
148    }
149}
150
151/// A single request received by a peer on a request-response protocol.
152#[derive(Debug)]
153pub struct IncomingRequest {
154    /// Who sent the request.
155    pub peer: PeerId,
156
157    /// Request sent by the remote. Will always be smaller than
158    /// [`ProtocolConfig::max_request_size`].
159    pub payload: Vec<u8>,
160
161    /// Channel to send back the response.
162    ///
163    /// There are two ways to indicate that handling the request failed:
164    ///
165    /// 1. Drop `pending_response` and thus not changing the reputation of the peer.
166    ///
167    /// 2. Sending an `Err(())` via `pending_response`, optionally including reputation changes for
168    ///    the given peer.
169    pub pending_response: oneshot::Sender<OutgoingResponse>,
170}
171
172/// Response for an incoming request to be send by a request protocol handler.
173#[derive(Debug)]
174pub struct OutgoingResponse {
175    /// The payload of the response.
176    ///
177    /// `Err(())` if none is available e.g. due an error while handling the request.
178    pub result: Result<Vec<u8>, ()>,
179
180    /// If provided, the `oneshot::Sender` will be notified when the request has been sent to the
181    /// peer.
182    ///
183    /// Note: Operating systems typically maintain a buffer of a few dozen kilobytes of
184    /// outgoing data for each TCP socket, and it is not possible for a user
185    /// application to inspect this buffer. This channel here is not actually notified
186    /// when the response has been fully sent out, but rather when it has fully been
187    /// written to the buffer managed by the operating system.
188    pub sent_feedback: Option<oneshot::Sender<()>>,
189}
190
191/// Event generated by the [`RequestResponseFactoryBehaviour`].
192#[derive(Debug)]
193// We are not reading these events in a meaningful way right now, but the fields in there are still
194// potentially useful
195pub enum Event {
196    /// A remote sent a request and either we have successfully answered it or an error happened.
197    ///
198    /// This event is generated for statistics purposes.
199    InboundRequest {
200        /// Peer which has emitted the request.
201        peer: PeerId,
202        /// Name of the protocol in question.
203        protocol: Cow<'static, str>,
204        /// Whether handling the request was successful or unsuccessful.
205        ///
206        /// When successful contains the time elapsed between when we received the request and when
207        /// we sent back the response. When unsuccessful contains the failure reason.
208        result: Result<(), ResponseFailure>,
209    },
210
211    /// A request initiated using [`RequestResponseFactoryBehaviour::send_request`] has succeeded or
212    /// failed.
213    ///
214    /// This event is generated for statistics purposes.
215    RequestFinished {
216        /// Peer that we sent the request to, if one was chosen.
217        peer: Option<PeerId>,
218        /// Name of the protocol in question.
219        protocol: Cow<'static, str>,
220        /// Duration the request took.
221        duration: Duration,
222        /// Result of the request.
223        result: Result<(), String>,
224    },
225}
226
227/// Combination of a protocol name and a request id.
228///
229/// Uniquely identifies an inbound or outbound request among all handled protocols. Note however
230/// that uniqueness is only guaranteed between two inbound and likewise between two outbound
231/// requests. There is no uniqueness guarantee in a set of both inbound and outbound
232/// [`ProtocolRequestId`]s.
233#[derive(Debug, Clone, PartialEq, Eq, Hash)]
234struct ProtocolRequestId {
235    protocol: Cow<'static, str>,
236    request_id: OutboundRequestId,
237}
238
239impl From<(Cow<'static, str>, OutboundRequestId)> for ProtocolRequestId {
240    #[inline]
241    fn from((protocol, request_id): (Cow<'static, str>, OutboundRequestId)) -> Self {
242        Self {
243            protocol,
244            request_id,
245        }
246    }
247}
248
249/// When sending a request, what to do on a disconnected recipient
250#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
251pub enum IfDisconnected {
252    /// Try to connect to the peer
253    TryConnect,
254    /// Just fail if the destination is not yet connected
255    ImmediateError,
256}
257
258/// Convenience functions for `IfDisconnected`.
259impl IfDisconnected {
260    /// Shall we connect to a disconnected peer?
261    pub fn should_connect(self) -> bool {
262        match self {
263            Self::TryConnect => true,
264            Self::ImmediateError => false,
265        }
266    }
267}
268
269/// Implementation of `NetworkBehaviour` that provides support for multiple request-response
270/// protocols.
271#[expect(
272    clippy::type_complexity,
273    reason = "To preserve compatibility with copied implementation"
274)]
275pub struct RequestResponseFactoryBehaviour {
276    /// The multiple sub-protocols, by name.
277    /// Contains the underlying libp2p `RequestResponse` behaviour, plus an optional
278    /// "response builder" used to build responses for incoming requests.
279    protocols: HashMap<
280        Cow<'static, str>,
281        (
282            RequestResponse<GenericCodec>,
283            Option<mpsc::Sender<IncomingRequest>>,
284        ),
285    >,
286
287    /// Pending requests, passed down to a [`RequestResponse`] behaviour, awaiting a reply.
288    pending_requests:
289        HashMap<ProtocolRequestId, (Instant, oneshot::Sender<Result<Vec<u8>, RequestFailure>>)>,
290
291    /// Whenever an incoming request arrives, a `Future` is added to this list and will yield the
292    /// start time and the response to send back to the remote.
293    pending_responses: stream::FuturesUnordered<
294        Pin<Box<dyn Future<Output = Option<RequestProcessingOutcome>> + Send>>,
295    >,
296
297    /// Pending message request, holds `MessageRequest` as a Future state to poll it
298    /// until we get a response from `Peerset`
299    message_request: Option<MessageRequest>,
300
301    /// Request handlers future collection.
302    request_handlers: Vec<Pin<Box<dyn Future<Output = ()> + Send>>>,
303}
304
305impl fmt::Debug for RequestResponseFactoryBehaviour {
306    #[inline]
307    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
308        f.debug_struct("RequestResponseFactoryBehaviour")
309            .finish_non_exhaustive()
310    }
311}
312
313// This is a state of processing incoming request Message.
314struct MessageRequest {
315    peer: PeerId,
316    request_id: InboundRequestId,
317    request: Vec<u8>,
318    channel: ResponseChannel<Result<Vec<u8>, ()>>,
319    protocol: String,
320    response_builder: Option<mpsc::Sender<IncomingRequest>>,
321}
322
323/// Generated by the response builder and waiting to be processed.
324struct RequestProcessingOutcome {
325    request_id: InboundRequestId,
326    protocol: Cow<'static, str>,
327    inner_channel: ResponseChannel<Result<Vec<u8>, ()>>,
328    response: OutgoingResponse,
329}
330
331impl RequestResponseFactoryBehaviour {
332    /// Creates a new behaviour. Must be passed a list of supported protocols. Returns an error if
333    /// the same protocol is passed twice.
334    pub fn new<List>(list: List, max_concurrent_streams: usize) -> Result<Self, RegisterError>
335    where
336        List: IntoIterator<Item = Box<dyn RequestHandler>>,
337    {
338        let mut protocols = HashMap::new();
339        let mut request_handlers = Vec::new();
340        for mut handler in list {
341            let config = handler.protocol_config();
342
343            let protocol_support = if config.inbound_queue.is_some() {
344                ProtocolSupport::Full
345            } else {
346                ProtocolSupport::Outbound
347            };
348
349            let rq_rp = RequestResponse::with_codec(
350                GenericCodec {
351                    max_request_size: config.max_request_size,
352                    max_response_size: config.max_response_size,
353                },
354                iter::once(StreamProtocol::new(config.name)).zip(iter::repeat(protocol_support)),
355                RequestResponseConfig::default()
356                    .with_request_timeout(config.request_timeout)
357                    .with_max_concurrent_streams(max_concurrent_streams),
358            );
359
360            match protocols.entry(Cow::Borrowed(config.name)) {
361                Entry::Vacant(e) => e.insert((rq_rp, config.inbound_queue)),
362                Entry::Occupied(e) => {
363                    return Err(RegisterError::DuplicateProtocol(e.key().clone()));
364                }
365            };
366
367            let request_handler_run: Pin<Box<dyn Future<Output = ()> + Send>> =
368                Box::pin(async move { handler.run().await }.fuse());
369
370            request_handlers.push(request_handler_run);
371        }
372
373        Ok(Self {
374            protocols,
375            pending_requests: HashMap::new(),
376            pending_responses: FuturesUnordered::new(),
377            message_request: None,
378            request_handlers,
379        })
380    }
381
382    /// Initiates sending a request.
383    ///
384    /// If there is no established connection to the target peer, the behavior is determined by the
385    /// choice of `connect`.
386    ///
387    /// An error is returned if the protocol doesn't match one that has been registered.
388    pub fn send_request(
389        &mut self,
390        target: &PeerId,
391        protocol_name: &str,
392        request: Vec<u8>,
393        pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
394        connect: IfDisconnected,
395        addresses: Vec<Multiaddr>,
396    ) {
397        if let Some((protocol, _)) = self.protocols.get_mut(protocol_name) {
398            if protocol.is_connected(target) || connect.should_connect() {
399                let request_id = protocol.send_request_with_addresses(target, request, addresses);
400                let prev_req_id = self.pending_requests.insert(
401                    (protocol_name.to_string().into(), request_id).into(),
402                    (Instant::now(), pending_response),
403                );
404                debug_assert!(prev_req_id.is_none(), "Expect request id to be unique.");
405            } else if pending_response
406                .send(Err(RequestFailure::NotConnected))
407                .is_err()
408            {
409                debug!(
410                    target: LOG_TARGET,
411                    "Not connected to peer {:?}. At the same time local \
412                     node is no longer interested in the result.",
413                    target,
414                );
415            }
416        } else if pending_response
417            .send(Err(RequestFailure::UnknownProtocol))
418            .is_err()
419        {
420            debug!(
421                target: LOG_TARGET,
422                "Unknown protocol {:?}. At the same time local \
423                 node is no longer interested in the result.",
424                protocol_name,
425            );
426        }
427    }
428}
429
430impl NetworkBehaviour for RequestResponseFactoryBehaviour {
431    type ConnectionHandler = MultiHandler<
432        String,
433        <RequestResponse<GenericCodec> as NetworkBehaviour>::ConnectionHandler,
434    >;
435    type ToSwarm = Event;
436
437    fn handle_established_inbound_connection(
438        &mut self,
439        connection_id: ConnectionId,
440        peer: PeerId,
441        local_addr: &Multiaddr,
442        remote_addr: &Multiaddr,
443    ) -> Result<Self::ConnectionHandler, ConnectionDenied> {
444        let iter = self.protocols.iter_mut().map(|(p, (r, _))| {
445            (
446                p.to_string(),
447                r.handle_established_inbound_connection(
448                    connection_id,
449                    peer,
450                    local_addr,
451                    remote_addr,
452                )
453                .expect(
454                    "Behaviours return handlers in these methods with the exception of \
455                    'connection management' behaviours like connection-limits or allow-black list. \
456                    So, inner request-response behaviour always returns Ok(handler).",
457                ),
458            )
459        });
460
461        let handler = MultiHandler::try_from_iter(iter).expect(
462            "Protocols are in a HashMap and there can be at most one handler per protocol name, \
463			 which is the only possible error; qed",
464        );
465
466        Ok(handler)
467    }
468
469    fn handle_established_outbound_connection(
470        &mut self,
471        connection_id: ConnectionId,
472        peer: PeerId,
473        addr: &Multiaddr,
474        role_override: Endpoint,
475        port_use: PortUse,
476    ) -> Result<Self::ConnectionHandler, ConnectionDenied> {
477        let iter = self.protocols.iter_mut().map(|(p, (r, _))| {
478            (
479                p.to_string(),
480                r.handle_established_outbound_connection(
481                    connection_id,
482                    peer,
483                    addr,
484                    role_override,
485                    port_use,
486                )
487                .expect(
488                    "Behaviours return handlers in these methods with the exception of \
489                        'connection management' behaviours like connection-limits or allow-black \
490                        list. So, inner request-response behaviour always returns Ok(handler).",
491                ),
492            )
493        });
494
495        let handler = MultiHandler::try_from_iter(iter).expect(
496            "Protocols are in a HashMap and there can be at most one handler per protocol name, \
497            which is the only possible error; qed",
498        );
499
500        Ok(handler)
501    }
502
503    /// Informs the behaviour about an event from the [`Swarm`](libp2p::Swarm).
504    fn on_swarm_event(&mut self, event: FromSwarm<'_>) {
505        match event {
506            FromSwarm::ConnectionEstablished(inner) => {
507                for (protocol, _) in self.protocols.values_mut() {
508                    protocol.on_swarm_event(FromSwarm::ConnectionEstablished(inner));
509                }
510            }
511            FromSwarm::ConnectionClosed(inner) => {
512                for (protocol, _) in self.protocols.values_mut() {
513                    protocol.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
514                        peer_id: inner.peer_id,
515                        connection_id: inner.connection_id,
516                        endpoint: inner.endpoint,
517                        cause: inner.cause,
518                        remaining_established: inner.remaining_established,
519                    }));
520                }
521            }
522            FromSwarm::AddressChange(inner) => {
523                for (protocol, _) in self.protocols.values_mut() {
524                    protocol.on_swarm_event(FromSwarm::AddressChange(inner));
525                }
526            }
527            FromSwarm::DialFailure(inner) => {
528                for (protocol, _) in self.protocols.values_mut() {
529                    protocol.on_swarm_event(FromSwarm::DialFailure(DialFailure {
530                        peer_id: inner.peer_id,
531                        error: inner.error,
532                        connection_id: inner.connection_id,
533                    }));
534                }
535            }
536            FromSwarm::ListenFailure(inner) => {
537                for (protocol, _) in self.protocols.values_mut() {
538                    protocol.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
539                        local_addr: inner.local_addr,
540                        send_back_addr: inner.send_back_addr,
541                        error: inner.error,
542                        connection_id: inner.connection_id,
543                        peer_id: inner.peer_id,
544                    }));
545                }
546            }
547            FromSwarm::NewListener(inner) => {
548                for (protocol, _) in self.protocols.values_mut() {
549                    protocol.on_swarm_event(FromSwarm::NewListener(inner));
550                }
551            }
552            FromSwarm::NewListenAddr(inner) => {
553                for (protocol, _) in self.protocols.values_mut() {
554                    protocol.on_swarm_event(FromSwarm::NewListenAddr(inner));
555                }
556            }
557            FromSwarm::ExpiredListenAddr(inner) => {
558                for (protocol, _) in self.protocols.values_mut() {
559                    protocol.on_swarm_event(FromSwarm::ExpiredListenAddr(inner));
560                }
561            }
562            FromSwarm::ListenerError(inner) => {
563                for (protocol, _) in self.protocols.values_mut() {
564                    protocol.on_swarm_event(FromSwarm::ListenerError(inner));
565                }
566            }
567            FromSwarm::ListenerClosed(inner) => {
568                for (protocol, _) in self.protocols.values_mut() {
569                    protocol.on_swarm_event(FromSwarm::ListenerClosed(inner));
570                }
571            }
572            FromSwarm::NewExternalAddrCandidate(inner) => {
573                for (protocol, _) in self.protocols.values_mut() {
574                    protocol.on_swarm_event(FromSwarm::NewExternalAddrCandidate(inner));
575                }
576            }
577            FromSwarm::ExternalAddrConfirmed(inner) => {
578                for (protocol, _) in self.protocols.values_mut() {
579                    protocol.on_swarm_event(FromSwarm::ExternalAddrConfirmed(inner));
580                }
581            }
582            FromSwarm::ExternalAddrExpired(inner) => {
583                for (protocol, _) in self.protocols.values_mut() {
584                    protocol.on_swarm_event(FromSwarm::ExternalAddrExpired(inner));
585                }
586            }
587            FromSwarm::NewExternalAddrOfPeer(inner) => {
588                for (protocol, _) in self.protocols.values_mut() {
589                    protocol.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(inner));
590                }
591            }
592            event => {
593                warn!(
594                    ?event,
595                    "New event must be forwarded to request response protocols"
596                );
597            }
598        }
599    }
600
601    fn on_connection_handler_event(
602        &mut self,
603        peer_id: PeerId,
604        connection: ConnectionId,
605        event: THandlerOutEvent<Self>,
606    ) {
607        let p_name = event.0;
608        if let Some((proto, _)) = self.protocols.get_mut(&*p_name) {
609            return proto.on_connection_handler_event(peer_id, connection, event.1);
610        }
611
612        warn!(
613            target: LOG_TARGET,
614            "inject_node_event: no request-response instance registered for protocol {:?}", p_name
615        );
616    }
617
618    fn poll(
619        &mut self,
620        cx: &mut Context<'_>,
621    ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
622        'poll_all: loop {
623            if let Some(message_request) = self.message_request.take() {
624                let MessageRequest {
625                    peer,
626                    request_id,
627                    request,
628                    channel,
629                    protocol,
630                    response_builder,
631                } = message_request;
632
633                let (tx, rx) = oneshot::channel();
634
635                // Submit the request to the "response builder" passed by the user at
636                // initialization.
637                if let Some(mut response_builder) = response_builder {
638                    // If the response builder is too busy, silently drop `tx`. This
639                    // will be reported by the corresponding `RequestResponse` through
640                    // an `InboundFailure::Omission` event.
641                    let _: Result<(), _> = response_builder.try_send(IncomingRequest {
642                        peer,
643                        payload: request,
644                        pending_response: tx,
645                    });
646                } else {
647                    debug_assert!(false, "Received message on outbound-only protocol.");
648                }
649
650                self.pending_responses.push(Box::pin(async move {
651                    // The `tx` created above can be dropped if we are not capable of
652                    // processing this request, which is reflected as a
653                    // `InboundFailure::Omission` event.
654                    if let Ok(response) = rx.await {
655                        Some(RequestProcessingOutcome {
656                            request_id,
657                            protocol: Cow::from(protocol),
658                            inner_channel: channel,
659                            response,
660                        })
661                    } else {
662                        None
663                    }
664                }));
665
666                // This `continue` makes sure that `pending_responses` gets polled
667                // after we have added the new element.
668                continue 'poll_all;
669            }
670            // Poll to see if any response is ready to be sent back.
671            while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) {
672                let Some(RequestProcessingOutcome {
673                    request_id,
674                    protocol: protocol_name,
675                    inner_channel,
676                    response: OutgoingResponse { result, .. },
677                }) = outcome
678                else {
679                    // The response builder was too busy, or handling the request failed. This is
680                    // later on reported as `InboundFailure::Omission`.
681                    continue;
682                };
683
684                if let Ok(payload) = result
685                    && let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name)
686                    && protocol.send_response(inner_channel, Ok(payload)).is_err()
687                {
688                    // Note: Failure is handled further below when receiving
689                    // `InboundFailure` event from `RequestResponse` behaviour.
690                    debug!(
691                        target: LOG_TARGET,
692                        %request_id,
693                        "Failed to send response for request on protocol {} due to a \
694                        timeout or due to the connection to the peer being closed. \
695                        Dropping response",
696                        protocol_name,
697                    );
698                }
699            }
700
701            for rq_rs_runner in &mut self.request_handlers {
702                // Future.Output == (), so we don't need a result here
703                let _: Poll<()> = rq_rs_runner.poll_unpin(cx);
704            }
705
706            // Poll request-responses protocols.
707            for (protocol, (behaviour, response_builder)) in &mut self.protocols {
708                while let Poll::Ready(event) = behaviour.poll(cx) {
709                    let event = match event {
710                        // Main events we are interested in.
711                        ToSwarm::GenerateEvent(event) => event,
712
713                        // Other events generated by the underlying behaviour are transparently
714                        // passed through.
715                        ToSwarm::Dial { opts } => {
716                            if opts.get_peer_id().is_none() {
717                                error!(
718                                    "The request-response isn't supposed to start dialing \
719                                    addresses"
720                                );
721                            }
722                            return Poll::Ready(ToSwarm::Dial { opts });
723                        }
724                        ToSwarm::NotifyHandler {
725                            peer_id,
726                            handler,
727                            event,
728                        } => {
729                            return Poll::Ready(ToSwarm::NotifyHandler {
730                                peer_id,
731                                handler,
732                                event: ((*protocol).to_string(), event),
733                            });
734                        }
735                        ToSwarm::CloseConnection {
736                            peer_id,
737                            connection,
738                        } => {
739                            return Poll::Ready(ToSwarm::CloseConnection {
740                                peer_id,
741                                connection,
742                            });
743                        }
744                        ToSwarm::NewExternalAddrCandidate(observed) => {
745                            return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed));
746                        }
747                        ToSwarm::ExternalAddrConfirmed(addr) => {
748                            return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr));
749                        }
750                        ToSwarm::ExternalAddrExpired(addr) => {
751                            return Poll::Ready(ToSwarm::ExternalAddrExpired(addr));
752                        }
753                        ToSwarm::ListenOn { opts } => {
754                            return Poll::Ready(ToSwarm::ListenOn { opts });
755                        }
756                        ToSwarm::RemoveListener { id } => {
757                            return Poll::Ready(ToSwarm::RemoveListener { id });
758                        }
759                        event => {
760                            warn!(
761                                ?event,
762                                "New event from request response protocol must be send up"
763                            );
764
765                            continue;
766                        }
767                    };
768
769                    match event {
770                        // Received a request from a remote.
771                        RequestResponseEvent::Message {
772                            peer,
773                            message:
774                                RequestResponseMessage::Request {
775                                    request_id,
776                                    request,
777                                    channel,
778                                },
779                            ..
780                        } => {
781                            self.message_request = Some(MessageRequest {
782                                peer,
783                                request_id,
784                                request,
785                                channel,
786                                protocol: protocol.to_string(),
787                                response_builder: response_builder.clone(),
788                            });
789
790                            // This `continue` makes sure that `message_request` gets polled
791                            // after we have added the new element.
792                            continue 'poll_all;
793                        }
794
795                        // Received a response from a remote to one of our requests.
796                        RequestResponseEvent::Message {
797                            peer,
798                            message:
799                                RequestResponseMessage::Response {
800                                    request_id,
801                                    response,
802                                },
803                            ..
804                        } => {
805                            let (started, delivered) = if let Some((started, pending_response)) =
806                                self.pending_requests
807                                    .remove(&(protocol.clone(), request_id).into())
808                            {
809                                let delivered = pending_response
810                                    .send(response.map_err(|()| RequestFailure::Refused))
811                                    .map_err(|_error_result| RequestFailure::Obsolete.to_string());
812                                (started, delivered)
813                            } else {
814                                warn!(
815                                    target: LOG_TARGET,
816                                    "Received `RequestResponseEvent::Message` with unexpected request id {:?}",
817                                    request_id,
818                                );
819                                debug_assert!(false);
820                                continue;
821                            };
822
823                            let out = Event::RequestFinished {
824                                peer: Some(peer),
825                                protocol: protocol.clone(),
826                                duration: started.elapsed(),
827                                result: delivered,
828                            };
829
830                            return Poll::Ready(ToSwarm::GenerateEvent(out));
831                        }
832
833                        // One of our requests has failed.
834                        RequestResponseEvent::OutboundFailure {
835                            peer,
836                            request_id,
837                            error,
838                            ..
839                        } => {
840                            let error_string = error.to_string();
841                            let started = if let Some((started, pending_response)) = self
842                                .pending_requests
843                                .remove(&(protocol.clone(), request_id).into())
844                            {
845                                if pending_response
846                                    .send(Err(RequestFailure::Network(error)))
847                                    .is_err()
848                                {
849                                    debug!(
850                                        target: LOG_TARGET,
851                                        %request_id,
852                                        "Request failed. At the same time local node is no longer interested in \
853                                        the result",
854                                    );
855                                }
856                                started
857                            } else {
858                                warn!(
859                                    target: LOG_TARGET,
860                                    %request_id,
861                                    "Received `RequestResponseEvent::Message` with unexpected request",
862                                );
863                                debug_assert!(false);
864                                continue;
865                            };
866
867                            let out = Event::RequestFinished {
868                                peer: Some(peer),
869                                protocol: protocol.clone(),
870                                duration: started.elapsed(),
871                                result: Err(error_string),
872                            };
873
874                            return Poll::Ready(ToSwarm::GenerateEvent(out));
875                        }
876
877                        // An inbound request failed, either while reading the request or due to
878                        // failing to send a response.
879                        RequestResponseEvent::InboundFailure { peer, error, .. } => {
880                            debug!(?error, %peer, "Inbound request failed.");
881
882                            let out = Event::InboundRequest {
883                                peer,
884                                protocol: protocol.clone(),
885                                result: Err(ResponseFailure::Network(error)),
886                            };
887                            return Poll::Ready(ToSwarm::GenerateEvent(out));
888                        }
889
890                        // A response to an inbound request has been sent.
891                        RequestResponseEvent::ResponseSent { peer, .. } => {
892                            let out = Event::InboundRequest {
893                                peer,
894                                protocol: protocol.clone(),
895                                result: Ok(()),
896                            };
897
898                            return Poll::Ready(ToSwarm::GenerateEvent(out));
899                        }
900                    };
901                }
902            }
903
904            break Poll::Pending;
905        }
906    }
907}
908
909/// Error when registering a protocol.
910#[derive(Debug, thiserror::Error)]
911pub enum RegisterError {
912    /// A protocol has been specified multiple times.
913    #[error("{0}")]
914    DuplicateProtocol(Cow<'static, str>),
915}
916
917/// Error in a request
918#[derive(Debug, thiserror::Error)]
919pub enum RequestFailure {
920    /// We are not currently connected to the requested peer
921    #[error("We are not currently connected to the requested peer")]
922    NotConnected,
923    /// Given protocol hasn't been registered
924    #[error("Given protocol hasn't been registered")]
925    UnknownProtocol,
926    /// Remote has closed the substream before answering, thereby signaling that it considers the
927    /// request as valid, but refused to answer it
928    #[error(
929        "Remote has closed the substream before answering, thereby signaling that it considers the \
930        request as valid, but refused to answer it"
931    )]
932    Refused,
933    /// The remote replied, but the local node is no longer interested in the response
934    #[error("The remote replied, but the local node is no longer interested in the response")]
935    Obsolete,
936    /// Problem on the network
937    #[error("Problem on the network: {0}")]
938    Network(OutboundFailure),
939}
940
941/// Error when processing a request sent by a remote.
942#[derive(Debug, thiserror::Error)]
943pub enum ResponseFailure {
944    /// Problem on the network.
945    #[error("Problem on the network: {0}")]
946    Network(InboundFailure),
947}
948
949/// Implements the libp2p [`RequestResponseCodec`] trait. Defines how streams of bytes are turned
950/// into requests and responses and vice-versa.
951#[derive(Debug, Clone)]
952#[doc(hidden)] // Needs to be public in order to satisfy the Rust compiler.
953pub struct GenericCodec {
954    max_request_size: u64,
955    max_response_size: u64,
956}
957
958impl RequestResponseCodec for GenericCodec {
959    type Protocol = StreamProtocol;
960    type Request = Vec<u8>;
961    type Response = Result<Vec<u8>, ()>;
962
963    async fn read_request<T>(
964        &mut self,
965        _: &Self::Protocol,
966        mut io: &mut T,
967    ) -> io::Result<Self::Request>
968    where
969        T: AsyncRead + Unpin + Send,
970    {
971        // Read the length.
972        let length = unsigned_varint::aio::read_usize(&mut io)
973            .await
974            .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
975        if length > usize::try_from(self.max_request_size).unwrap_or(usize::MAX) {
976            return Err(io::Error::new(
977                io::ErrorKind::InvalidInput,
978                format!(
979                    "Request size exceeds limit: {} > {}",
980                    length, self.max_request_size
981                ),
982            ));
983        }
984
985        // Read the payload.
986        let mut buffer = vec![0; length];
987        io.read_exact(&mut buffer).await?;
988        Ok(buffer)
989    }
990
991    async fn read_response<T>(
992        &mut self,
993        _: &Self::Protocol,
994        mut io: &mut T,
995    ) -> io::Result<Self::Response>
996    where
997        T: AsyncRead + Unpin + Send,
998    {
999        // Note that this function returns a `Result<Result<...>>`. Returning an `Err` is
1000        // considered as a protocol error and will result in the entire connection being closed.
1001        // Returning `Ok(Err(_))` signifies that a response has successfully been fetched, and
1002        // that this response is an error.
1003
1004        // Read the length.
1005        let length = match unsigned_varint::aio::read_usize(&mut io).await {
1006            Ok(l) => l,
1007            Err(unsigned_varint::io::ReadError::Io(err))
1008                if matches!(err.kind(), io::ErrorKind::UnexpectedEof) =>
1009            {
1010                return Ok(Err(()));
1011            }
1012            Err(err) => return Err(io::Error::new(io::ErrorKind::InvalidInput, err)),
1013        };
1014
1015        if length > usize::try_from(self.max_response_size).unwrap_or(usize::MAX) {
1016            return Err(io::Error::new(
1017                io::ErrorKind::InvalidInput,
1018                format!(
1019                    "Response size exceeds limit: {} > {}",
1020                    length, self.max_response_size
1021                ),
1022            ));
1023        }
1024
1025        // Read the payload.
1026        let mut buffer = vec![0; length];
1027        io.read_exact(&mut buffer).await?;
1028        Ok(Ok(buffer))
1029    }
1030
1031    async fn write_request<T>(
1032        &mut self,
1033        _: &Self::Protocol,
1034        io: &mut T,
1035        req: Self::Request,
1036    ) -> io::Result<()>
1037    where
1038        T: AsyncWrite + Unpin + Send,
1039    {
1040        // Write the length.
1041        {
1042            let mut buffer = unsigned_varint::encode::usize_buffer();
1043            io.write_all(unsigned_varint::encode::usize(req.len(), &mut buffer))
1044                .await?;
1045        }
1046
1047        // Write the payload.
1048        io.write_all(&req).await?;
1049
1050        io.close().await?;
1051        Ok(())
1052    }
1053
1054    async fn write_response<T>(
1055        &mut self,
1056        _: &Self::Protocol,
1057        io: &mut T,
1058        res: Self::Response,
1059    ) -> io::Result<()>
1060    where
1061        T: AsyncWrite + Unpin + Send,
1062    {
1063        // If `res` is an `Err`, we jump to closing the substream without writing anything on it.
1064        if let Ok(res) = res {
1065            // Write the length.
1066            {
1067                let mut buffer = unsigned_varint::encode::usize_buffer();
1068                io.write_all(unsigned_varint::encode::usize(res.len(), &mut buffer))
1069                    .await?;
1070            }
1071
1072            // Write the payload.
1073            io.write_all(&res).await?;
1074        }
1075
1076        io.close().await?;
1077        Ok(())
1078    }
1079}