1#[cfg(all(test, not(miri)))]
38mod tests;
39
40use async_trait::async_trait;
41use futures::channel::{mpsc, oneshot};
42use futures::prelude::*;
43use futures::stream::FuturesUnordered;
44use libp2p::StreamProtocol;
45use libp2p::core::transport::PortUse;
46use libp2p::core::{Endpoint, Multiaddr};
47use libp2p::identity::PeerId;
48use libp2p::request_response::{
49 Behaviour as RequestResponse, Codec as RequestResponseCodec, Config as RequestResponseConfig,
50 Event as RequestResponseEvent, InboundRequestId, Message as RequestResponseMessage,
51 OutboundRequestId, ProtocolSupport, ResponseChannel,
52};
53pub use libp2p::request_response::{InboundFailure, OutboundFailure};
54use libp2p::swarm::behaviour::{ConnectionClosed, DialFailure, FromSwarm, ListenFailure};
55use libp2p::swarm::handler::multi::MultiHandler;
56use libp2p::swarm::{
57 ConnectionDenied, ConnectionId, NetworkBehaviour, THandlerInEvent, THandlerOutEvent, ToSwarm,
58};
59use std::borrow::Cow;
60use std::collections::HashMap;
61use std::collections::hash_map::Entry;
62use std::pin::Pin;
63use std::task::{Context, Poll};
64use std::time::{Duration, Instant};
65use std::{fmt, io, iter};
66use tracing::{debug, error, warn};
67
68const LOG_TARGET: &str = "request-response-protocols";
69
70#[async_trait]
72pub trait RequestHandler: Send {
73 async fn run(&mut self);
75
76 fn protocol_config(&self) -> ProtocolConfig;
78
79 fn protocol_name(&self) -> &'static str;
81
82 fn clone_box(&self) -> Box<dyn RequestHandler>;
84}
85
86impl Clone for Box<dyn RequestHandler> {
87 fn clone(&self) -> Self {
88 self.clone_box()
89 }
90}
91
92#[derive(Debug, Clone)]
94pub struct ProtocolConfig {
95 pub name: &'static str,
97
98 pub max_request_size: u64,
103
104 pub max_response_size: u64,
109
110 pub request_timeout: Duration,
114
115 pub inbound_queue: Option<mpsc::Sender<IncomingRequest>>,
136}
137
138impl ProtocolConfig {
139 pub fn new(protocol_name: &'static str) -> ProtocolConfig {
141 ProtocolConfig {
142 name: protocol_name,
143 max_request_size: 1024 * 1024,
144 max_response_size: 16 * 1024 * 1024,
145 request_timeout: Duration::from_secs(20),
146 inbound_queue: None,
147 }
148 }
149}
150
151#[derive(Debug)]
153pub struct IncomingRequest {
154 pub peer: PeerId,
156
157 pub payload: Vec<u8>,
160
161 pub pending_response: oneshot::Sender<OutgoingResponse>,
170}
171
172#[derive(Debug)]
174pub struct OutgoingResponse {
175 pub result: Result<Vec<u8>, ()>,
179
180 pub sent_feedback: Option<oneshot::Sender<()>>,
189}
190
191#[derive(Debug)]
193pub enum Event {
196 InboundRequest {
200 peer: PeerId,
202 protocol: Cow<'static, str>,
204 result: Result<(), ResponseFailure>,
209 },
210
211 RequestFinished {
216 peer: Option<PeerId>,
218 protocol: Cow<'static, str>,
220 duration: Duration,
222 result: Result<(), String>,
224 },
225}
226
227#[derive(Debug, Clone, PartialEq, Eq, Hash)]
234struct ProtocolRequestId {
235 protocol: Cow<'static, str>,
236 request_id: OutboundRequestId,
237}
238
239impl From<(Cow<'static, str>, OutboundRequestId)> for ProtocolRequestId {
240 #[inline]
241 fn from((protocol, request_id): (Cow<'static, str>, OutboundRequestId)) -> Self {
242 Self {
243 protocol,
244 request_id,
245 }
246 }
247}
248
249#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
251pub enum IfDisconnected {
252 TryConnect,
254 ImmediateError,
256}
257
258impl IfDisconnected {
260 pub fn should_connect(self) -> bool {
262 match self {
263 Self::TryConnect => true,
264 Self::ImmediateError => false,
265 }
266 }
267}
268
269#[expect(
272 clippy::type_complexity,
273 reason = "To preserve compatibility with copied implementation"
274)]
275pub struct RequestResponseFactoryBehaviour {
276 protocols: HashMap<
280 Cow<'static, str>,
281 (
282 RequestResponse<GenericCodec>,
283 Option<mpsc::Sender<IncomingRequest>>,
284 ),
285 >,
286
287 pending_requests:
289 HashMap<ProtocolRequestId, (Instant, oneshot::Sender<Result<Vec<u8>, RequestFailure>>)>,
290
291 pending_responses: stream::FuturesUnordered<
294 Pin<Box<dyn Future<Output = Option<RequestProcessingOutcome>> + Send>>,
295 >,
296
297 message_request: Option<MessageRequest>,
300
301 request_handlers: Vec<Pin<Box<dyn Future<Output = ()> + Send>>>,
303}
304
305impl fmt::Debug for RequestResponseFactoryBehaviour {
306 #[inline]
307 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
308 f.debug_struct("RequestResponseFactoryBehaviour")
309 .finish_non_exhaustive()
310 }
311}
312
313struct MessageRequest {
315 peer: PeerId,
316 request_id: InboundRequestId,
317 request: Vec<u8>,
318 channel: ResponseChannel<Result<Vec<u8>, ()>>,
319 protocol: String,
320 response_builder: Option<mpsc::Sender<IncomingRequest>>,
321}
322
323struct RequestProcessingOutcome {
325 request_id: InboundRequestId,
326 protocol: Cow<'static, str>,
327 inner_channel: ResponseChannel<Result<Vec<u8>, ()>>,
328 response: OutgoingResponse,
329}
330
331impl RequestResponseFactoryBehaviour {
332 pub fn new<List>(list: List, max_concurrent_streams: usize) -> Result<Self, RegisterError>
335 where
336 List: IntoIterator<Item = Box<dyn RequestHandler>>,
337 {
338 let mut protocols = HashMap::new();
339 let mut request_handlers = Vec::new();
340 for mut handler in list {
341 let config = handler.protocol_config();
342
343 let protocol_support = if config.inbound_queue.is_some() {
344 ProtocolSupport::Full
345 } else {
346 ProtocolSupport::Outbound
347 };
348
349 let rq_rp = RequestResponse::with_codec(
350 GenericCodec {
351 max_request_size: config.max_request_size,
352 max_response_size: config.max_response_size,
353 },
354 iter::once(StreamProtocol::new(config.name)).zip(iter::repeat(protocol_support)),
355 RequestResponseConfig::default()
356 .with_request_timeout(config.request_timeout)
357 .with_max_concurrent_streams(max_concurrent_streams),
358 );
359
360 match protocols.entry(Cow::Borrowed(config.name)) {
361 Entry::Vacant(e) => e.insert((rq_rp, config.inbound_queue)),
362 Entry::Occupied(e) => {
363 return Err(RegisterError::DuplicateProtocol(e.key().clone()));
364 }
365 };
366
367 let request_handler_run: Pin<Box<dyn Future<Output = ()> + Send>> =
368 Box::pin(async move { handler.run().await }.fuse());
369
370 request_handlers.push(request_handler_run);
371 }
372
373 Ok(Self {
374 protocols,
375 pending_requests: HashMap::new(),
376 pending_responses: FuturesUnordered::new(),
377 message_request: None,
378 request_handlers,
379 })
380 }
381
382 pub fn send_request(
389 &mut self,
390 target: &PeerId,
391 protocol_name: &str,
392 request: Vec<u8>,
393 pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
394 connect: IfDisconnected,
395 addresses: Vec<Multiaddr>,
396 ) {
397 if let Some((protocol, _)) = self.protocols.get_mut(protocol_name) {
398 if protocol.is_connected(target) || connect.should_connect() {
399 let request_id = protocol.send_request_with_addresses(target, request, addresses);
400 let prev_req_id = self.pending_requests.insert(
401 (protocol_name.to_string().into(), request_id).into(),
402 (Instant::now(), pending_response),
403 );
404 debug_assert!(prev_req_id.is_none(), "Expect request id to be unique.");
405 } else if pending_response
406 .send(Err(RequestFailure::NotConnected))
407 .is_err()
408 {
409 debug!(
410 target: LOG_TARGET,
411 "Not connected to peer {:?}. At the same time local \
412 node is no longer interested in the result.",
413 target,
414 );
415 }
416 } else if pending_response
417 .send(Err(RequestFailure::UnknownProtocol))
418 .is_err()
419 {
420 debug!(
421 target: LOG_TARGET,
422 "Unknown protocol {:?}. At the same time local \
423 node is no longer interested in the result.",
424 protocol_name,
425 );
426 }
427 }
428}
429
430impl NetworkBehaviour for RequestResponseFactoryBehaviour {
431 type ConnectionHandler = MultiHandler<
432 String,
433 <RequestResponse<GenericCodec> as NetworkBehaviour>::ConnectionHandler,
434 >;
435 type ToSwarm = Event;
436
437 fn handle_established_inbound_connection(
438 &mut self,
439 connection_id: ConnectionId,
440 peer: PeerId,
441 local_addr: &Multiaddr,
442 remote_addr: &Multiaddr,
443 ) -> Result<Self::ConnectionHandler, ConnectionDenied> {
444 let iter = self.protocols.iter_mut().map(|(p, (r, _))| {
445 (
446 p.to_string(),
447 r.handle_established_inbound_connection(
448 connection_id,
449 peer,
450 local_addr,
451 remote_addr,
452 )
453 .expect(
454 "Behaviours return handlers in these methods with the exception of \
455 'connection management' behaviours like connection-limits or allow-black list. \
456 So, inner request-response behaviour always returns Ok(handler).",
457 ),
458 )
459 });
460
461 let handler = MultiHandler::try_from_iter(iter).expect(
462 "Protocols are in a HashMap and there can be at most one handler per protocol name, \
463 which is the only possible error; qed",
464 );
465
466 Ok(handler)
467 }
468
469 fn handle_established_outbound_connection(
470 &mut self,
471 connection_id: ConnectionId,
472 peer: PeerId,
473 addr: &Multiaddr,
474 role_override: Endpoint,
475 port_use: PortUse,
476 ) -> Result<Self::ConnectionHandler, ConnectionDenied> {
477 let iter = self.protocols.iter_mut().map(|(p, (r, _))| {
478 (
479 p.to_string(),
480 r.handle_established_outbound_connection(
481 connection_id,
482 peer,
483 addr,
484 role_override,
485 port_use,
486 )
487 .expect(
488 "Behaviours return handlers in these methods with the exception of \
489 'connection management' behaviours like connection-limits or allow-black \
490 list. So, inner request-response behaviour always returns Ok(handler).",
491 ),
492 )
493 });
494
495 let handler = MultiHandler::try_from_iter(iter).expect(
496 "Protocols are in a HashMap and there can be at most one handler per protocol name, \
497 which is the only possible error; qed",
498 );
499
500 Ok(handler)
501 }
502
503 fn on_swarm_event(&mut self, event: FromSwarm<'_>) {
505 match event {
506 FromSwarm::ConnectionEstablished(inner) => {
507 for (protocol, _) in self.protocols.values_mut() {
508 protocol.on_swarm_event(FromSwarm::ConnectionEstablished(inner));
509 }
510 }
511 FromSwarm::ConnectionClosed(inner) => {
512 for (protocol, _) in self.protocols.values_mut() {
513 protocol.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
514 peer_id: inner.peer_id,
515 connection_id: inner.connection_id,
516 endpoint: inner.endpoint,
517 cause: inner.cause,
518 remaining_established: inner.remaining_established,
519 }));
520 }
521 }
522 FromSwarm::AddressChange(inner) => {
523 for (protocol, _) in self.protocols.values_mut() {
524 protocol.on_swarm_event(FromSwarm::AddressChange(inner));
525 }
526 }
527 FromSwarm::DialFailure(inner) => {
528 for (protocol, _) in self.protocols.values_mut() {
529 protocol.on_swarm_event(FromSwarm::DialFailure(DialFailure {
530 peer_id: inner.peer_id,
531 error: inner.error,
532 connection_id: inner.connection_id,
533 }));
534 }
535 }
536 FromSwarm::ListenFailure(inner) => {
537 for (protocol, _) in self.protocols.values_mut() {
538 protocol.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
539 local_addr: inner.local_addr,
540 send_back_addr: inner.send_back_addr,
541 error: inner.error,
542 connection_id: inner.connection_id,
543 peer_id: inner.peer_id,
544 }));
545 }
546 }
547 FromSwarm::NewListener(inner) => {
548 for (protocol, _) in self.protocols.values_mut() {
549 protocol.on_swarm_event(FromSwarm::NewListener(inner));
550 }
551 }
552 FromSwarm::NewListenAddr(inner) => {
553 for (protocol, _) in self.protocols.values_mut() {
554 protocol.on_swarm_event(FromSwarm::NewListenAddr(inner));
555 }
556 }
557 FromSwarm::ExpiredListenAddr(inner) => {
558 for (protocol, _) in self.protocols.values_mut() {
559 protocol.on_swarm_event(FromSwarm::ExpiredListenAddr(inner));
560 }
561 }
562 FromSwarm::ListenerError(inner) => {
563 for (protocol, _) in self.protocols.values_mut() {
564 protocol.on_swarm_event(FromSwarm::ListenerError(inner));
565 }
566 }
567 FromSwarm::ListenerClosed(inner) => {
568 for (protocol, _) in self.protocols.values_mut() {
569 protocol.on_swarm_event(FromSwarm::ListenerClosed(inner));
570 }
571 }
572 FromSwarm::NewExternalAddrCandidate(inner) => {
573 for (protocol, _) in self.protocols.values_mut() {
574 protocol.on_swarm_event(FromSwarm::NewExternalAddrCandidate(inner));
575 }
576 }
577 FromSwarm::ExternalAddrConfirmed(inner) => {
578 for (protocol, _) in self.protocols.values_mut() {
579 protocol.on_swarm_event(FromSwarm::ExternalAddrConfirmed(inner));
580 }
581 }
582 FromSwarm::ExternalAddrExpired(inner) => {
583 for (protocol, _) in self.protocols.values_mut() {
584 protocol.on_swarm_event(FromSwarm::ExternalAddrExpired(inner));
585 }
586 }
587 FromSwarm::NewExternalAddrOfPeer(inner) => {
588 for (protocol, _) in self.protocols.values_mut() {
589 protocol.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(inner));
590 }
591 }
592 event => {
593 warn!(
594 ?event,
595 "New event must be forwarded to request response protocols"
596 );
597 }
598 }
599 }
600
601 fn on_connection_handler_event(
602 &mut self,
603 peer_id: PeerId,
604 connection: ConnectionId,
605 event: THandlerOutEvent<Self>,
606 ) {
607 let p_name = event.0;
608 if let Some((proto, _)) = self.protocols.get_mut(&*p_name) {
609 return proto.on_connection_handler_event(peer_id, connection, event.1);
610 }
611
612 warn!(
613 target: LOG_TARGET,
614 "inject_node_event: no request-response instance registered for protocol {:?}", p_name
615 );
616 }
617
618 fn poll(
619 &mut self,
620 cx: &mut Context<'_>,
621 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
622 'poll_all: loop {
623 if let Some(message_request) = self.message_request.take() {
624 let MessageRequest {
625 peer,
626 request_id,
627 request,
628 channel,
629 protocol,
630 response_builder,
631 } = message_request;
632
633 let (tx, rx) = oneshot::channel();
634
635 if let Some(mut response_builder) = response_builder {
638 let _: Result<(), _> = response_builder.try_send(IncomingRequest {
642 peer,
643 payload: request,
644 pending_response: tx,
645 });
646 } else {
647 debug_assert!(false, "Received message on outbound-only protocol.");
648 }
649
650 self.pending_responses.push(Box::pin(async move {
651 if let Ok(response) = rx.await {
655 Some(RequestProcessingOutcome {
656 request_id,
657 protocol: Cow::from(protocol),
658 inner_channel: channel,
659 response,
660 })
661 } else {
662 None
663 }
664 }));
665
666 continue 'poll_all;
669 }
670 while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) {
672 let Some(RequestProcessingOutcome {
673 request_id,
674 protocol: protocol_name,
675 inner_channel,
676 response: OutgoingResponse { result, .. },
677 }) = outcome
678 else {
679 continue;
682 };
683
684 if let Ok(payload) = result
685 && let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name)
686 && protocol.send_response(inner_channel, Ok(payload)).is_err()
687 {
688 debug!(
691 target: LOG_TARGET,
692 %request_id,
693 "Failed to send response for request on protocol {} due to a \
694 timeout or due to the connection to the peer being closed. \
695 Dropping response",
696 protocol_name,
697 );
698 }
699 }
700
701 for rq_rs_runner in &mut self.request_handlers {
702 let _: Poll<()> = rq_rs_runner.poll_unpin(cx);
704 }
705
706 for (protocol, (behaviour, response_builder)) in &mut self.protocols {
708 while let Poll::Ready(event) = behaviour.poll(cx) {
709 let event = match event {
710 ToSwarm::GenerateEvent(event) => event,
712
713 ToSwarm::Dial { opts } => {
716 if opts.get_peer_id().is_none() {
717 error!(
718 "The request-response isn't supposed to start dialing \
719 addresses"
720 );
721 }
722 return Poll::Ready(ToSwarm::Dial { opts });
723 }
724 ToSwarm::NotifyHandler {
725 peer_id,
726 handler,
727 event,
728 } => {
729 return Poll::Ready(ToSwarm::NotifyHandler {
730 peer_id,
731 handler,
732 event: ((*protocol).to_string(), event),
733 });
734 }
735 ToSwarm::CloseConnection {
736 peer_id,
737 connection,
738 } => {
739 return Poll::Ready(ToSwarm::CloseConnection {
740 peer_id,
741 connection,
742 });
743 }
744 ToSwarm::NewExternalAddrCandidate(observed) => {
745 return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed));
746 }
747 ToSwarm::ExternalAddrConfirmed(addr) => {
748 return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr));
749 }
750 ToSwarm::ExternalAddrExpired(addr) => {
751 return Poll::Ready(ToSwarm::ExternalAddrExpired(addr));
752 }
753 ToSwarm::ListenOn { opts } => {
754 return Poll::Ready(ToSwarm::ListenOn { opts });
755 }
756 ToSwarm::RemoveListener { id } => {
757 return Poll::Ready(ToSwarm::RemoveListener { id });
758 }
759 event => {
760 warn!(
761 ?event,
762 "New event from request response protocol must be send up"
763 );
764
765 continue;
766 }
767 };
768
769 match event {
770 RequestResponseEvent::Message {
772 peer,
773 message:
774 RequestResponseMessage::Request {
775 request_id,
776 request,
777 channel,
778 },
779 ..
780 } => {
781 self.message_request = Some(MessageRequest {
782 peer,
783 request_id,
784 request,
785 channel,
786 protocol: protocol.to_string(),
787 response_builder: response_builder.clone(),
788 });
789
790 continue 'poll_all;
793 }
794
795 RequestResponseEvent::Message {
797 peer,
798 message:
799 RequestResponseMessage::Response {
800 request_id,
801 response,
802 },
803 ..
804 } => {
805 let (started, delivered) = if let Some((started, pending_response)) =
806 self.pending_requests
807 .remove(&(protocol.clone(), request_id).into())
808 {
809 let delivered = pending_response
810 .send(response.map_err(|()| RequestFailure::Refused))
811 .map_err(|_error_result| RequestFailure::Obsolete.to_string());
812 (started, delivered)
813 } else {
814 warn!(
815 target: LOG_TARGET,
816 "Received `RequestResponseEvent::Message` with unexpected request id {:?}",
817 request_id,
818 );
819 debug_assert!(false);
820 continue;
821 };
822
823 let out = Event::RequestFinished {
824 peer: Some(peer),
825 protocol: protocol.clone(),
826 duration: started.elapsed(),
827 result: delivered,
828 };
829
830 return Poll::Ready(ToSwarm::GenerateEvent(out));
831 }
832
833 RequestResponseEvent::OutboundFailure {
835 peer,
836 request_id,
837 error,
838 ..
839 } => {
840 let error_string = error.to_string();
841 let started = if let Some((started, pending_response)) = self
842 .pending_requests
843 .remove(&(protocol.clone(), request_id).into())
844 {
845 if pending_response
846 .send(Err(RequestFailure::Network(error)))
847 .is_err()
848 {
849 debug!(
850 target: LOG_TARGET,
851 %request_id,
852 "Request failed. At the same time local node is no longer interested in \
853 the result",
854 );
855 }
856 started
857 } else {
858 warn!(
859 target: LOG_TARGET,
860 %request_id,
861 "Received `RequestResponseEvent::Message` with unexpected request",
862 );
863 debug_assert!(false);
864 continue;
865 };
866
867 let out = Event::RequestFinished {
868 peer: Some(peer),
869 protocol: protocol.clone(),
870 duration: started.elapsed(),
871 result: Err(error_string),
872 };
873
874 return Poll::Ready(ToSwarm::GenerateEvent(out));
875 }
876
877 RequestResponseEvent::InboundFailure { peer, error, .. } => {
880 debug!(?error, %peer, "Inbound request failed.");
881
882 let out = Event::InboundRequest {
883 peer,
884 protocol: protocol.clone(),
885 result: Err(ResponseFailure::Network(error)),
886 };
887 return Poll::Ready(ToSwarm::GenerateEvent(out));
888 }
889
890 RequestResponseEvent::ResponseSent { peer, .. } => {
892 let out = Event::InboundRequest {
893 peer,
894 protocol: protocol.clone(),
895 result: Ok(()),
896 };
897
898 return Poll::Ready(ToSwarm::GenerateEvent(out));
899 }
900 };
901 }
902 }
903
904 break Poll::Pending;
905 }
906 }
907}
908
909#[derive(Debug, thiserror::Error)]
911pub enum RegisterError {
912 #[error("{0}")]
914 DuplicateProtocol(Cow<'static, str>),
915}
916
917#[derive(Debug, thiserror::Error)]
919pub enum RequestFailure {
920 #[error("We are not currently connected to the requested peer")]
922 NotConnected,
923 #[error("Given protocol hasn't been registered")]
925 UnknownProtocol,
926 #[error(
929 "Remote has closed the substream before answering, thereby signaling that it considers the \
930 request as valid, but refused to answer it"
931 )]
932 Refused,
933 #[error("The remote replied, but the local node is no longer interested in the response")]
935 Obsolete,
936 #[error("Problem on the network: {0}")]
938 Network(OutboundFailure),
939}
940
941#[derive(Debug, thiserror::Error)]
943pub enum ResponseFailure {
944 #[error("Problem on the network: {0}")]
946 Network(InboundFailure),
947}
948
949#[derive(Debug, Clone)]
952#[doc(hidden)] pub struct GenericCodec {
954 max_request_size: u64,
955 max_response_size: u64,
956}
957
958impl RequestResponseCodec for GenericCodec {
959 type Protocol = StreamProtocol;
960 type Request = Vec<u8>;
961 type Response = Result<Vec<u8>, ()>;
962
963 async fn read_request<T>(
964 &mut self,
965 _: &Self::Protocol,
966 mut io: &mut T,
967 ) -> io::Result<Self::Request>
968 where
969 T: AsyncRead + Unpin + Send,
970 {
971 let length = unsigned_varint::aio::read_usize(&mut io)
973 .await
974 .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
975 if length > usize::try_from(self.max_request_size).unwrap_or(usize::MAX) {
976 return Err(io::Error::new(
977 io::ErrorKind::InvalidInput,
978 format!(
979 "Request size exceeds limit: {} > {}",
980 length, self.max_request_size
981 ),
982 ));
983 }
984
985 let mut buffer = vec![0; length];
987 io.read_exact(&mut buffer).await?;
988 Ok(buffer)
989 }
990
991 async fn read_response<T>(
992 &mut self,
993 _: &Self::Protocol,
994 mut io: &mut T,
995 ) -> io::Result<Self::Response>
996 where
997 T: AsyncRead + Unpin + Send,
998 {
999 let length = match unsigned_varint::aio::read_usize(&mut io).await {
1006 Ok(l) => l,
1007 Err(unsigned_varint::io::ReadError::Io(err))
1008 if matches!(err.kind(), io::ErrorKind::UnexpectedEof) =>
1009 {
1010 return Ok(Err(()));
1011 }
1012 Err(err) => return Err(io::Error::new(io::ErrorKind::InvalidInput, err)),
1013 };
1014
1015 if length > usize::try_from(self.max_response_size).unwrap_or(usize::MAX) {
1016 return Err(io::Error::new(
1017 io::ErrorKind::InvalidInput,
1018 format!(
1019 "Response size exceeds limit: {} > {}",
1020 length, self.max_response_size
1021 ),
1022 ));
1023 }
1024
1025 let mut buffer = vec![0; length];
1027 io.read_exact(&mut buffer).await?;
1028 Ok(Ok(buffer))
1029 }
1030
1031 async fn write_request<T>(
1032 &mut self,
1033 _: &Self::Protocol,
1034 io: &mut T,
1035 req: Self::Request,
1036 ) -> io::Result<()>
1037 where
1038 T: AsyncWrite + Unpin + Send,
1039 {
1040 {
1042 let mut buffer = unsigned_varint::encode::usize_buffer();
1043 io.write_all(unsigned_varint::encode::usize(req.len(), &mut buffer))
1044 .await?;
1045 }
1046
1047 io.write_all(&req).await?;
1049
1050 io.close().await?;
1051 Ok(())
1052 }
1053
1054 async fn write_response<T>(
1055 &mut self,
1056 _: &Self::Protocol,
1057 io: &mut T,
1058 res: Self::Response,
1059 ) -> io::Result<()>
1060 where
1061 T: AsyncWrite + Unpin + Send,
1062 {
1063 if let Ok(res) = res {
1065 {
1067 let mut buffer = unsigned_varint::encode::usize_buffer();
1068 io.write_all(unsigned_varint::encode::usize(res.len(), &mut buffer))
1069 .await?;
1070 }
1071
1072 io.write_all(&res).await?;
1074 }
1075
1076 io.close().await?;
1077 Ok(())
1078 }
1079}