1#[cfg(all(test, not(miri)))]
38mod tests;
39
40use async_trait::async_trait;
41use futures::channel::{mpsc, oneshot};
42use futures::prelude::*;
43use libp2p::StreamProtocol;
44use libp2p::core::transport::PortUse;
45use libp2p::core::{Endpoint, Multiaddr};
46use libp2p::identity::PeerId;
47use libp2p::request_response::{
48 Behaviour as RequestResponse, Codec as RequestResponseCodec, Config as RequestResponseConfig,
49 Event as RequestResponseEvent, InboundRequestId, Message as RequestResponseMessage,
50 OutboundRequestId, ProtocolSupport, ResponseChannel,
51};
52pub use libp2p::request_response::{InboundFailure, OutboundFailure};
53use libp2p::swarm::behaviour::{ConnectionClosed, DialFailure, FromSwarm, ListenFailure};
54use libp2p::swarm::dial_opts::DialOpts;
55use libp2p::swarm::handler::multi::MultiHandler;
56use libp2p::swarm::{
57 ConnectionDenied, ConnectionId, NetworkBehaviour, THandlerInEvent, THandlerOutEvent, ToSwarm,
58};
59use std::borrow::Cow;
60use std::collections::HashMap;
61use std::collections::hash_map::Entry;
62use std::pin::Pin;
63use std::task::{Context, Poll};
64use std::time::{Duration, Instant};
65use std::{fmt, io, iter};
66use tracing::{debug, error, warn};
67
68const LOG_TARGET: &str = "request-response-protocols";
69
70#[async_trait]
72pub trait RequestHandler: Send {
73 async fn run(&mut self);
75
76 fn protocol_config(&self) -> ProtocolConfig;
78
79 fn protocol_name(&self) -> &'static str;
81
82 fn clone_box(&self) -> Box<dyn RequestHandler>;
84}
85
86impl Clone for Box<dyn RequestHandler> {
87 fn clone(&self) -> Self {
88 self.clone_box()
89 }
90}
91
92#[derive(Debug, Clone)]
94pub struct ProtocolConfig {
95 pub name: &'static str,
97
98 pub max_request_size: u64,
103
104 pub max_response_size: u64,
109
110 pub request_timeout: Duration,
114
115 pub inbound_queue: Option<mpsc::Sender<IncomingRequest>>,
136}
137
138impl ProtocolConfig {
139 pub fn new(protocol_name: &'static str) -> ProtocolConfig {
141 ProtocolConfig {
142 name: protocol_name,
143 max_request_size: 1024 * 1024,
144 max_response_size: 16 * 1024 * 1024,
145 request_timeout: Duration::from_secs(20),
146 inbound_queue: None,
147 }
148 }
149}
150
151#[derive(Debug)]
153pub struct IncomingRequest {
154 pub peer: PeerId,
156
157 pub payload: Vec<u8>,
160
161 pub pending_response: oneshot::Sender<OutgoingResponse>,
170}
171
172#[derive(Debug)]
174pub struct OutgoingResponse {
175 pub result: Result<Vec<u8>, ()>,
179
180 pub sent_feedback: Option<oneshot::Sender<()>>,
189}
190
191#[derive(Debug)]
193pub enum Event {
196 InboundRequest {
200 peer: PeerId,
202 protocol: Cow<'static, str>,
204 result: Result<(), ResponseFailure>,
209 },
210
211 RequestFinished {
216 peer: Option<PeerId>,
218 protocol: Cow<'static, str>,
220 duration: Duration,
222 result: Result<(), String>,
224 },
225}
226
227#[derive(Debug, Clone, PartialEq, Eq, Hash)]
234struct ProtocolRequestId {
235 protocol: Cow<'static, str>,
236 request_id: OutboundRequestId,
237}
238
239impl From<(Cow<'static, str>, OutboundRequestId)> for ProtocolRequestId {
240 #[inline]
241 fn from((protocol, request_id): (Cow<'static, str>, OutboundRequestId)) -> Self {
242 Self {
243 protocol,
244 request_id,
245 }
246 }
247}
248
249#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
251pub enum IfDisconnected {
252 TryConnect,
254 ImmediateError,
256}
257
258impl IfDisconnected {
260 pub fn should_connect(self) -> bool {
262 match self {
263 Self::TryConnect => true,
264 Self::ImmediateError => false,
265 }
266 }
267}
268
269#[expect(
272 clippy::type_complexity,
273 reason = "To preserve compatibility with copied implementation"
274)]
275pub struct RequestResponseFactoryBehaviour {
276 protocols: HashMap<
280 Cow<'static, str>,
281 (
282 RequestResponse<GenericCodec>,
283 Option<mpsc::Sender<IncomingRequest>>,
284 ),
285 >,
286
287 pending_requests:
289 HashMap<ProtocolRequestId, (Instant, oneshot::Sender<Result<Vec<u8>, RequestFailure>>)>,
290
291 pending_responses: stream::FuturesUnordered<
294 Pin<Box<dyn Future<Output = Option<RequestProcessingOutcome>> + Send>>,
295 >,
296
297 message_request: Option<MessageRequest>,
300
301 request_handlers: Vec<Pin<Box<dyn Future<Output = ()> + Send>>>,
303}
304
305impl fmt::Debug for RequestResponseFactoryBehaviour {
306 #[inline]
307 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
308 f.debug_struct("RequestResponseFactoryBehaviour")
309 .finish_non_exhaustive()
310 }
311}
312
313struct MessageRequest {
315 peer: PeerId,
316 request_id: InboundRequestId,
317 request: Vec<u8>,
318 channel: ResponseChannel<Result<Vec<u8>, ()>>,
319 protocol: String,
320 response_builder: Option<mpsc::Sender<IncomingRequest>>,
321}
322
323struct RequestProcessingOutcome {
325 request_id: InboundRequestId,
326 protocol: Cow<'static, str>,
327 inner_channel: ResponseChannel<Result<Vec<u8>, ()>>,
328 response: OutgoingResponse,
329}
330
331impl RequestResponseFactoryBehaviour {
332 pub fn new(
335 list: impl IntoIterator<Item = Box<dyn RequestHandler>>,
336 max_concurrent_streams: usize,
337 ) -> Result<Self, RegisterError> {
338 let mut protocols = HashMap::new();
339 let mut request_handlers = Vec::new();
340 for mut handler in list {
341 let config = handler.protocol_config();
342
343 let protocol_support = if config.inbound_queue.is_some() {
344 ProtocolSupport::Full
345 } else {
346 ProtocolSupport::Outbound
347 };
348
349 let rq_rp = RequestResponse::with_codec(
350 GenericCodec {
351 max_request_size: config.max_request_size,
352 max_response_size: config.max_response_size,
353 },
354 iter::once(StreamProtocol::new(config.name)).zip(iter::repeat(protocol_support)),
355 RequestResponseConfig::default()
356 .with_request_timeout(config.request_timeout)
357 .with_max_concurrent_streams(max_concurrent_streams),
358 );
359
360 match protocols.entry(Cow::Borrowed(config.name)) {
361 Entry::Vacant(e) => e.insert((rq_rp, config.inbound_queue)),
362 Entry::Occupied(e) => {
363 return Err(RegisterError::DuplicateProtocol(e.key().clone()));
364 }
365 };
366
367 let request_handler_run: Pin<Box<dyn Future<Output = ()> + Send>> =
368 Box::pin(async move { handler.run().await }.fuse());
369
370 request_handlers.push(request_handler_run);
371 }
372
373 Ok(Self {
374 protocols,
375 pending_requests: Default::default(),
376 pending_responses: Default::default(),
377 message_request: None,
378 request_handlers,
379 })
380 }
381
382 pub fn send_request(
389 &mut self,
390 target: &PeerId,
391 protocol_name: &str,
392 request: Vec<u8>,
393 pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
394 connect: IfDisconnected,
395 addresses: Vec<Multiaddr>,
396 ) {
397 if let Some((protocol, _)) = self.protocols.get_mut(protocol_name) {
398 if protocol.is_connected(target) || connect.should_connect() {
399 let opts = DialOpts::peer_id(*target).addresses(addresses).build();
400 let request_id = protocol.send_request(opts, request);
401 let prev_req_id = self.pending_requests.insert(
402 (protocol_name.to_string().into(), request_id).into(),
403 (Instant::now(), pending_response),
404 );
405 debug_assert!(prev_req_id.is_none(), "Expect request id to be unique.");
406 } else if pending_response
407 .send(Err(RequestFailure::NotConnected))
408 .is_err()
409 {
410 debug!(
411 target: LOG_TARGET,
412 "Not connected to peer {:?}. At the same time local \
413 node is no longer interested in the result.",
414 target,
415 );
416 }
417 } else if pending_response
418 .send(Err(RequestFailure::UnknownProtocol))
419 .is_err()
420 {
421 debug!(
422 target: LOG_TARGET,
423 "Unknown protocol {:?}. At the same time local \
424 node is no longer interested in the result.",
425 protocol_name,
426 );
427 }
428 }
429}
430
431impl NetworkBehaviour for RequestResponseFactoryBehaviour {
432 type ConnectionHandler = MultiHandler<
433 String,
434 <RequestResponse<GenericCodec> as NetworkBehaviour>::ConnectionHandler,
435 >;
436 type ToSwarm = Event;
437
438 fn handle_established_inbound_connection(
439 &mut self,
440 connection_id: ConnectionId,
441 peer: PeerId,
442 local_addr: &Multiaddr,
443 remote_addr: &Multiaddr,
444 ) -> Result<Self::ConnectionHandler, ConnectionDenied> {
445 let iter = self.protocols.iter_mut().map(|(p, (r, _))| {
446 (
447 p.to_string(),
448 r.handle_established_inbound_connection(
449 connection_id,
450 peer,
451 local_addr,
452 remote_addr,
453 )
454 .expect(
455 "Behaviours return handlers in these methods with the exception of \
456 'connection management' behaviours like connection-limits or allow-black list. \
457 So, inner request-response behaviour always returns Ok(handler).",
458 ),
459 )
460 });
461
462 let handler = MultiHandler::try_from_iter(iter).expect(
463 "Protocols are in a HashMap and there can be at most one handler per protocol name, \
464 which is the only possible error; qed",
465 );
466
467 Ok(handler)
468 }
469
470 fn handle_established_outbound_connection(
471 &mut self,
472 connection_id: ConnectionId,
473 peer: PeerId,
474 addr: &Multiaddr,
475 role_override: Endpoint,
476 port_use: PortUse,
477 ) -> Result<Self::ConnectionHandler, ConnectionDenied> {
478 let iter = self.protocols.iter_mut().map(|(p, (r, _))| {
479 (
480 p.to_string(),
481 r.handle_established_outbound_connection(
482 connection_id,
483 peer,
484 addr,
485 role_override,
486 port_use,
487 )
488 .expect(
489 "Behaviours return handlers in these methods with the exception of \
490 'connection management' behaviours like connection-limits or allow-black \
491 list. So, inner request-response behaviour always returns Ok(handler).",
492 ),
493 )
494 });
495
496 let handler = MultiHandler::try_from_iter(iter).expect(
497 "Protocols are in a HashMap and there can be at most one handler per protocol name, \
498 which is the only possible error; qed",
499 );
500
501 Ok(handler)
502 }
503
504 fn on_swarm_event(&mut self, event: FromSwarm<'_>) {
506 match event {
507 FromSwarm::ConnectionEstablished(inner) => {
508 for (protocol, _) in self.protocols.values_mut() {
509 protocol.on_swarm_event(FromSwarm::ConnectionEstablished(inner));
510 }
511 }
512 FromSwarm::ConnectionClosed(inner) => {
513 for (protocol, _) in self.protocols.values_mut() {
514 protocol.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
515 peer_id: inner.peer_id,
516 connection_id: inner.connection_id,
517 endpoint: inner.endpoint,
518 cause: inner.cause,
519 remaining_established: inner.remaining_established,
520 }));
521 }
522 }
523 FromSwarm::AddressChange(inner) => {
524 for (protocol, _) in self.protocols.values_mut() {
525 protocol.on_swarm_event(FromSwarm::AddressChange(inner));
526 }
527 }
528 FromSwarm::DialFailure(inner) => {
529 for (protocol, _) in self.protocols.values_mut() {
530 protocol.on_swarm_event(FromSwarm::DialFailure(DialFailure {
531 peer_id: inner.peer_id,
532 error: inner.error,
533 connection_id: inner.connection_id,
534 }));
535 }
536 }
537 FromSwarm::ListenFailure(inner) => {
538 for (protocol, _) in self.protocols.values_mut() {
539 protocol.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
540 local_addr: inner.local_addr,
541 send_back_addr: inner.send_back_addr,
542 error: inner.error,
543 connection_id: inner.connection_id,
544 peer_id: inner.peer_id,
545 }));
546 }
547 }
548 FromSwarm::NewListener(inner) => {
549 for (protocol, _) in self.protocols.values_mut() {
550 protocol.on_swarm_event(FromSwarm::NewListener(inner));
551 }
552 }
553 FromSwarm::NewListenAddr(inner) => {
554 for (protocol, _) in self.protocols.values_mut() {
555 protocol.on_swarm_event(FromSwarm::NewListenAddr(inner));
556 }
557 }
558 FromSwarm::ExpiredListenAddr(inner) => {
559 for (protocol, _) in self.protocols.values_mut() {
560 protocol.on_swarm_event(FromSwarm::ExpiredListenAddr(inner));
561 }
562 }
563 FromSwarm::ListenerError(inner) => {
564 for (protocol, _) in self.protocols.values_mut() {
565 protocol.on_swarm_event(FromSwarm::ListenerError(inner));
566 }
567 }
568 FromSwarm::ListenerClosed(inner) => {
569 for (protocol, _) in self.protocols.values_mut() {
570 protocol.on_swarm_event(FromSwarm::ListenerClosed(inner));
571 }
572 }
573 FromSwarm::NewExternalAddrCandidate(inner) => {
574 for (protocol, _) in self.protocols.values_mut() {
575 protocol.on_swarm_event(FromSwarm::NewExternalAddrCandidate(inner));
576 }
577 }
578 FromSwarm::ExternalAddrConfirmed(inner) => {
579 for (protocol, _) in self.protocols.values_mut() {
580 protocol.on_swarm_event(FromSwarm::ExternalAddrConfirmed(inner));
581 }
582 }
583 FromSwarm::ExternalAddrExpired(inner) => {
584 for (protocol, _) in self.protocols.values_mut() {
585 protocol.on_swarm_event(FromSwarm::ExternalAddrExpired(inner));
586 }
587 }
588 FromSwarm::NewExternalAddrOfPeer(inner) => {
589 for (protocol, _) in self.protocols.values_mut() {
590 protocol.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(inner));
591 }
592 }
593 event => {
594 warn!(
595 ?event,
596 "New event must be forwarded to request response protocols"
597 );
598 }
599 };
600 }
601
602 fn on_connection_handler_event(
603 &mut self,
604 peer_id: PeerId,
605 connection: ConnectionId,
606 event: THandlerOutEvent<Self>,
607 ) {
608 let p_name = event.0;
609 if let Some((proto, _)) = self.protocols.get_mut(&*p_name) {
610 return proto.on_connection_handler_event(peer_id, connection, event.1);
611 }
612
613 warn!(
614 target: LOG_TARGET,
615 "inject_node_event: no request-response instance registered for protocol {:?}", p_name
616 )
617 }
618
619 fn poll(
620 &mut self,
621 cx: &mut Context<'_>,
622 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
623 'poll_all: loop {
624 if let Some(message_request) = self.message_request.take() {
625 let MessageRequest {
626 peer,
627 request_id,
628 request,
629 channel,
630 protocol,
631 response_builder,
632 } = message_request;
633
634 let (tx, rx) = oneshot::channel();
635
636 if let Some(mut response_builder) = response_builder {
639 let _ = response_builder.try_send(IncomingRequest {
643 peer,
644 payload: request,
645 pending_response: tx,
646 });
647 } else {
648 debug_assert!(false, "Received message on outbound-only protocol.");
649 }
650
651 self.pending_responses.push(Box::pin(async move {
652 if let Ok(response) = rx.await {
656 Some(RequestProcessingOutcome {
657 request_id,
658 protocol: Cow::from(protocol),
659 inner_channel: channel,
660 response,
661 })
662 } else {
663 None
664 }
665 }));
666
667 continue 'poll_all;
670 }
671 while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) {
673 let RequestProcessingOutcome {
674 request_id,
675 protocol: protocol_name,
676 inner_channel,
677 response: OutgoingResponse { result, .. },
678 } = match outcome {
679 Some(outcome) => outcome,
680 None => continue,
683 };
684
685 if let Ok(payload) = result
686 && let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name)
687 && protocol.send_response(inner_channel, Ok(payload)).is_err()
688 {
689 debug!(
692 target: LOG_TARGET,
693 %request_id,
694 "Failed to send response for request on protocol {} due to a \
695 timeout or due to the connection to the peer being closed. \
696 Dropping response",
697 protocol_name,
698 );
699 }
700 }
701
702 for rq_rs_runner in &mut self.request_handlers {
703 let _ = rq_rs_runner.poll_unpin(cx);
705 }
706
707 for (protocol, (behaviour, response_builder)) in &mut self.protocols {
709 while let Poll::Ready(event) = behaviour.poll(cx) {
710 let event = match event {
711 ToSwarm::GenerateEvent(event) => event,
713
714 ToSwarm::Dial { opts } => {
717 if opts.get_peer_id().is_none() {
718 error!(
719 "The request-response isn't supposed to start dialing \
720 addresses"
721 );
722 }
723 return Poll::Ready(ToSwarm::Dial { opts });
724 }
725 ToSwarm::NotifyHandler {
726 peer_id,
727 handler,
728 event,
729 } => {
730 return Poll::Ready(ToSwarm::NotifyHandler {
731 peer_id,
732 handler,
733 event: ((*protocol).to_string(), event),
734 });
735 }
736 ToSwarm::CloseConnection {
737 peer_id,
738 connection,
739 } => {
740 return Poll::Ready(ToSwarm::CloseConnection {
741 peer_id,
742 connection,
743 });
744 }
745 ToSwarm::NewExternalAddrCandidate(observed) => {
746 return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed));
747 }
748 ToSwarm::ExternalAddrConfirmed(addr) => {
749 return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr));
750 }
751 ToSwarm::ExternalAddrExpired(addr) => {
752 return Poll::Ready(ToSwarm::ExternalAddrExpired(addr));
753 }
754 ToSwarm::ListenOn { opts } => {
755 return Poll::Ready(ToSwarm::ListenOn { opts });
756 }
757 ToSwarm::RemoveListener { id } => {
758 return Poll::Ready(ToSwarm::RemoveListener { id });
759 }
760 event => {
761 warn!(
762 ?event,
763 "New event from request response protocol must be send up"
764 );
765
766 continue;
767 }
768 };
769
770 match event {
771 RequestResponseEvent::Message {
773 peer,
774 message:
775 RequestResponseMessage::Request {
776 request_id,
777 request,
778 channel,
779 },
780 } => {
781 self.message_request = Some(MessageRequest {
782 peer,
783 request_id,
784 request,
785 channel,
786 protocol: protocol.to_string(),
787 response_builder: response_builder.clone(),
788 });
789
790 continue 'poll_all;
793 }
794
795 RequestResponseEvent::Message {
797 peer,
798 message:
799 RequestResponseMessage::Response {
800 request_id,
801 response,
802 },
803 } => {
804 let (started, delivered) = match self
805 .pending_requests
806 .remove(&(protocol.clone(), request_id).into())
807 {
808 Some((started, pending_response)) => {
809 let delivered = pending_response
810 .send(response.map_err(|()| RequestFailure::Refused))
811 .map_err(|_| RequestFailure::Obsolete.to_string());
812 (started, delivered)
813 }
814 None => {
815 warn!(
816 target: LOG_TARGET,
817 "Received `RequestResponseEvent::Message` with unexpected request id {:?}",
818 request_id,
819 );
820 debug_assert!(false);
821 continue;
822 }
823 };
824
825 let out = Event::RequestFinished {
826 peer: Some(peer),
827 protocol: protocol.clone(),
828 duration: started.elapsed(),
829 result: delivered,
830 };
831
832 return Poll::Ready(ToSwarm::GenerateEvent(out));
833 }
834
835 RequestResponseEvent::OutboundFailure {
837 peer,
838 request_id,
839 error,
840 ..
841 } => {
842 let error_string = error.to_string();
843 let started = match self
844 .pending_requests
845 .remove(&(protocol.clone(), request_id).into())
846 {
847 Some((started, pending_response)) => {
848 if pending_response
849 .send(Err(RequestFailure::Network(error)))
850 .is_err()
851 {
852 debug!(
853 target: LOG_TARGET,
854 %request_id,
855 "Request failed. At the same time local node is no longer interested in \
856 the result",
857 );
858 }
859 started
860 }
861 None => {
862 warn!(
863 target: LOG_TARGET,
864 %request_id,
865 "Received `RequestResponseEvent::Message` with unexpected request",
866 );
867 debug_assert!(false);
868 continue;
869 }
870 };
871
872 let out = Event::RequestFinished {
873 peer,
874 protocol: protocol.clone(),
875 duration: started.elapsed(),
876 result: Err(error_string),
877 };
878
879 return Poll::Ready(ToSwarm::GenerateEvent(out));
880 }
881
882 RequestResponseEvent::InboundFailure { peer, error, .. } => {
885 debug!(?error, %peer, "Inbound request failed.");
886
887 let out = Event::InboundRequest {
888 peer,
889 protocol: protocol.clone(),
890 result: Err(ResponseFailure::Network(error)),
891 };
892 return Poll::Ready(ToSwarm::GenerateEvent(out));
893 }
894
895 RequestResponseEvent::ResponseSent { peer, .. } => {
897 let out = Event::InboundRequest {
898 peer,
899 protocol: protocol.clone(),
900 result: Ok(()),
901 };
902
903 return Poll::Ready(ToSwarm::GenerateEvent(out));
904 }
905 };
906 }
907 }
908
909 break Poll::Pending;
910 }
911 }
912}
913
914#[derive(Debug, thiserror::Error)]
916pub enum RegisterError {
917 #[error("{0}")]
919 DuplicateProtocol(Cow<'static, str>),
920}
921
922#[derive(Debug, thiserror::Error)]
924pub enum RequestFailure {
925 #[error("We are not currently connected to the requested peer")]
927 NotConnected,
928 #[error("Given protocol hasn't been registered")]
930 UnknownProtocol,
931 #[error(
934 "Remote has closed the substream before answering, thereby signaling that it considers the \
935 request as valid, but refused to answer it"
936 )]
937 Refused,
938 #[error("The remote replied, but the local node is no longer interested in the response")]
940 Obsolete,
941 #[error("Problem on the network: {0}")]
943 Network(OutboundFailure),
944}
945
946#[derive(Debug, thiserror::Error)]
948pub enum ResponseFailure {
949 #[error("Problem on the network: {0}")]
951 Network(InboundFailure),
952}
953
954#[derive(Debug, Clone)]
957#[doc(hidden)] pub struct GenericCodec {
959 max_request_size: u64,
960 max_response_size: u64,
961}
962
963#[async_trait::async_trait]
964impl RequestResponseCodec for GenericCodec {
965 type Protocol = StreamProtocol;
966 type Request = Vec<u8>;
967 type Response = Result<Vec<u8>, ()>;
968
969 async fn read_request<T>(
970 &mut self,
971 _: &Self::Protocol,
972 mut io: &mut T,
973 ) -> io::Result<Self::Request>
974 where
975 T: AsyncRead + Unpin + Send,
976 {
977 let length = unsigned_varint::aio::read_usize(&mut io)
979 .await
980 .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
981 if length > usize::try_from(self.max_request_size).unwrap_or(usize::MAX) {
982 return Err(io::Error::new(
983 io::ErrorKind::InvalidInput,
984 format!(
985 "Request size exceeds limit: {} > {}",
986 length, self.max_request_size
987 ),
988 ));
989 }
990
991 let mut buffer = vec![0; length];
993 io.read_exact(&mut buffer).await?;
994 Ok(buffer)
995 }
996
997 async fn read_response<T>(
998 &mut self,
999 _: &Self::Protocol,
1000 mut io: &mut T,
1001 ) -> io::Result<Self::Response>
1002 where
1003 T: AsyncRead + Unpin + Send,
1004 {
1005 let length = match unsigned_varint::aio::read_usize(&mut io).await {
1012 Ok(l) => l,
1013 Err(unsigned_varint::io::ReadError::Io(err))
1014 if matches!(err.kind(), io::ErrorKind::UnexpectedEof) =>
1015 {
1016 return Ok(Err(()));
1017 }
1018 Err(err) => return Err(io::Error::new(io::ErrorKind::InvalidInput, err)),
1019 };
1020
1021 if length > usize::try_from(self.max_response_size).unwrap_or(usize::MAX) {
1022 return Err(io::Error::new(
1023 io::ErrorKind::InvalidInput,
1024 format!(
1025 "Response size exceeds limit: {} > {}",
1026 length, self.max_response_size
1027 ),
1028 ));
1029 }
1030
1031 let mut buffer = vec![0; length];
1033 io.read_exact(&mut buffer).await?;
1034 Ok(Ok(buffer))
1035 }
1036
1037 async fn write_request<T>(
1038 &mut self,
1039 _: &Self::Protocol,
1040 io: &mut T,
1041 req: Self::Request,
1042 ) -> io::Result<()>
1043 where
1044 T: AsyncWrite + Unpin + Send,
1045 {
1046 {
1048 let mut buffer = unsigned_varint::encode::usize_buffer();
1049 io.write_all(unsigned_varint::encode::usize(req.len(), &mut buffer))
1050 .await?;
1051 }
1052
1053 io.write_all(&req).await?;
1055
1056 io.close().await?;
1057 Ok(())
1058 }
1059
1060 async fn write_response<T>(
1061 &mut self,
1062 _: &Self::Protocol,
1063 io: &mut T,
1064 res: Self::Response,
1065 ) -> io::Result<()>
1066 where
1067 T: AsyncWrite + Unpin + Send,
1068 {
1069 if let Ok(res) = res {
1071 {
1073 let mut buffer = unsigned_varint::encode::usize_buffer();
1074 io.write_all(unsigned_varint::encode::usize(res.len(), &mut buffer))
1075 .await?;
1076 }
1077
1078 io.write_all(&res).await?;
1080 }
1081
1082 io.close().await?;
1083 Ok(())
1084 }
1085}