1#[cfg(all(test, not(miri)))]
38mod tests;
39
40use async_trait::async_trait;
41use futures::channel::{mpsc, oneshot};
42use futures::prelude::*;
43use libp2p::StreamProtocol;
44use libp2p::core::transport::PortUse;
45use libp2p::core::{Endpoint, Multiaddr};
46use libp2p::identity::PeerId;
47use libp2p::request_response::{
48 Behaviour as RequestResponse, Codec as RequestResponseCodec, Config as RequestResponseConfig,
49 Event as RequestResponseEvent, InboundRequestId, Message as RequestResponseMessage,
50 OutboundRequestId, ProtocolSupport, ResponseChannel,
51};
52pub use libp2p::request_response::{InboundFailure, OutboundFailure};
53use libp2p::swarm::behaviour::{ConnectionClosed, DialFailure, FromSwarm, ListenFailure};
54use libp2p::swarm::handler::multi::MultiHandler;
55use libp2p::swarm::{
56 ConnectionDenied, ConnectionId, NetworkBehaviour, THandlerInEvent, THandlerOutEvent, ToSwarm,
57};
58use std::borrow::Cow;
59use std::collections::HashMap;
60use std::collections::hash_map::Entry;
61use std::pin::Pin;
62use std::task::{Context, Poll};
63use std::time::{Duration, Instant};
64use std::{fmt, io, iter};
65use tracing::{debug, error, warn};
66
67const LOG_TARGET: &str = "request-response-protocols";
68
69#[async_trait]
71pub trait RequestHandler: Send {
72 async fn run(&mut self);
74
75 fn protocol_config(&self) -> ProtocolConfig;
77
78 fn protocol_name(&self) -> &'static str;
80
81 fn clone_box(&self) -> Box<dyn RequestHandler>;
83}
84
85impl Clone for Box<dyn RequestHandler> {
86 fn clone(&self) -> Self {
87 self.clone_box()
88 }
89}
90
91#[derive(Debug, Clone)]
93pub struct ProtocolConfig {
94 pub name: &'static str,
96
97 pub max_request_size: u64,
102
103 pub max_response_size: u64,
108
109 pub request_timeout: Duration,
113
114 pub inbound_queue: Option<mpsc::Sender<IncomingRequest>>,
135}
136
137impl ProtocolConfig {
138 pub fn new(protocol_name: &'static str) -> ProtocolConfig {
140 ProtocolConfig {
141 name: protocol_name,
142 max_request_size: 1024 * 1024,
143 max_response_size: 16 * 1024 * 1024,
144 request_timeout: Duration::from_secs(20),
145 inbound_queue: None,
146 }
147 }
148}
149
150#[derive(Debug)]
152pub struct IncomingRequest {
153 pub peer: PeerId,
155
156 pub payload: Vec<u8>,
159
160 pub pending_response: oneshot::Sender<OutgoingResponse>,
169}
170
171#[derive(Debug)]
173pub struct OutgoingResponse {
174 pub result: Result<Vec<u8>, ()>,
178
179 pub sent_feedback: Option<oneshot::Sender<()>>,
188}
189
190#[derive(Debug)]
192pub enum Event {
195 InboundRequest {
199 peer: PeerId,
201 protocol: Cow<'static, str>,
203 result: Result<(), ResponseFailure>,
208 },
209
210 RequestFinished {
215 peer: Option<PeerId>,
217 protocol: Cow<'static, str>,
219 duration: Duration,
221 result: Result<(), String>,
223 },
224}
225
226#[derive(Debug, Clone, PartialEq, Eq, Hash)]
233struct ProtocolRequestId {
234 protocol: Cow<'static, str>,
235 request_id: OutboundRequestId,
236}
237
238impl From<(Cow<'static, str>, OutboundRequestId)> for ProtocolRequestId {
239 #[inline]
240 fn from((protocol, request_id): (Cow<'static, str>, OutboundRequestId)) -> Self {
241 Self {
242 protocol,
243 request_id,
244 }
245 }
246}
247
248#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
250pub enum IfDisconnected {
251 TryConnect,
253 ImmediateError,
255}
256
257impl IfDisconnected {
259 pub fn should_connect(self) -> bool {
261 match self {
262 Self::TryConnect => true,
263 Self::ImmediateError => false,
264 }
265 }
266}
267
268#[expect(
271 clippy::type_complexity,
272 reason = "To preserve compatibility with copied implementation"
273)]
274pub struct RequestResponseFactoryBehaviour {
275 protocols: HashMap<
279 Cow<'static, str>,
280 (
281 RequestResponse<GenericCodec>,
282 Option<mpsc::Sender<IncomingRequest>>,
283 ),
284 >,
285
286 pending_requests:
288 HashMap<ProtocolRequestId, (Instant, oneshot::Sender<Result<Vec<u8>, RequestFailure>>)>,
289
290 pending_responses: stream::FuturesUnordered<
293 Pin<Box<dyn Future<Output = Option<RequestProcessingOutcome>> + Send>>,
294 >,
295
296 message_request: Option<MessageRequest>,
299
300 request_handlers: Vec<Pin<Box<dyn Future<Output = ()> + Send>>>,
302}
303
304impl fmt::Debug for RequestResponseFactoryBehaviour {
305 #[inline]
306 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
307 f.debug_struct("RequestResponseFactoryBehaviour")
308 .finish_non_exhaustive()
309 }
310}
311
312struct MessageRequest {
314 peer: PeerId,
315 request_id: InboundRequestId,
316 request: Vec<u8>,
317 channel: ResponseChannel<Result<Vec<u8>, ()>>,
318 protocol: String,
319 response_builder: Option<mpsc::Sender<IncomingRequest>>,
320}
321
322struct RequestProcessingOutcome {
324 request_id: InboundRequestId,
325 protocol: Cow<'static, str>,
326 inner_channel: ResponseChannel<Result<Vec<u8>, ()>>,
327 response: OutgoingResponse,
328}
329
330impl RequestResponseFactoryBehaviour {
331 pub fn new(
334 list: impl IntoIterator<Item = Box<dyn RequestHandler>>,
335 max_concurrent_streams: usize,
336 ) -> Result<Self, RegisterError> {
337 let mut protocols = HashMap::new();
338 let mut request_handlers = Vec::new();
339 for mut handler in list {
340 let config = handler.protocol_config();
341
342 let protocol_support = if config.inbound_queue.is_some() {
343 ProtocolSupport::Full
344 } else {
345 ProtocolSupport::Outbound
346 };
347
348 let rq_rp = RequestResponse::with_codec(
349 GenericCodec {
350 max_request_size: config.max_request_size,
351 max_response_size: config.max_response_size,
352 },
353 iter::once(StreamProtocol::new(config.name)).zip(iter::repeat(protocol_support)),
354 RequestResponseConfig::default()
355 .with_request_timeout(config.request_timeout)
356 .with_max_concurrent_streams(max_concurrent_streams),
357 );
358
359 match protocols.entry(Cow::Borrowed(config.name)) {
360 Entry::Vacant(e) => e.insert((rq_rp, config.inbound_queue)),
361 Entry::Occupied(e) => {
362 return Err(RegisterError::DuplicateProtocol(e.key().clone()));
363 }
364 };
365
366 let request_handler_run: Pin<Box<dyn Future<Output = ()> + Send>> =
367 Box::pin(async move { handler.run().await }.fuse());
368
369 request_handlers.push(request_handler_run);
370 }
371
372 Ok(Self {
373 protocols,
374 pending_requests: Default::default(),
375 pending_responses: Default::default(),
376 message_request: None,
377 request_handlers,
378 })
379 }
380
381 pub fn send_request(
388 &mut self,
389 target: &PeerId,
390 protocol_name: &str,
391 request: Vec<u8>,
392 pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
393 connect: IfDisconnected,
394 addresses: Vec<Multiaddr>,
395 ) {
396 if let Some((protocol, _)) = self.protocols.get_mut(protocol_name) {
397 if protocol.is_connected(target) || connect.should_connect() {
398 let request_id = protocol.send_request_with_addresses(target, request, addresses);
399 let prev_req_id = self.pending_requests.insert(
400 (protocol_name.to_string().into(), request_id).into(),
401 (Instant::now(), pending_response),
402 );
403 debug_assert!(prev_req_id.is_none(), "Expect request id to be unique.");
404 } else if pending_response
405 .send(Err(RequestFailure::NotConnected))
406 .is_err()
407 {
408 debug!(
409 target: LOG_TARGET,
410 "Not connected to peer {:?}. At the same time local \
411 node is no longer interested in the result.",
412 target,
413 );
414 }
415 } else if pending_response
416 .send(Err(RequestFailure::UnknownProtocol))
417 .is_err()
418 {
419 debug!(
420 target: LOG_TARGET,
421 "Unknown protocol {:?}. At the same time local \
422 node is no longer interested in the result.",
423 protocol_name,
424 );
425 }
426 }
427}
428
429impl NetworkBehaviour for RequestResponseFactoryBehaviour {
430 type ConnectionHandler = MultiHandler<
431 String,
432 <RequestResponse<GenericCodec> as NetworkBehaviour>::ConnectionHandler,
433 >;
434 type ToSwarm = Event;
435
436 fn handle_established_inbound_connection(
437 &mut self,
438 connection_id: ConnectionId,
439 peer: PeerId,
440 local_addr: &Multiaddr,
441 remote_addr: &Multiaddr,
442 ) -> Result<Self::ConnectionHandler, ConnectionDenied> {
443 let iter = self.protocols.iter_mut().map(|(p, (r, _))| {
444 (
445 p.to_string(),
446 r.handle_established_inbound_connection(
447 connection_id,
448 peer,
449 local_addr,
450 remote_addr,
451 )
452 .expect(
453 "Behaviours return handlers in these methods with the exception of \
454 'connection management' behaviours like connection-limits or allow-black list. \
455 So, inner request-response behaviour always returns Ok(handler).",
456 ),
457 )
458 });
459
460 let handler = MultiHandler::try_from_iter(iter).expect(
461 "Protocols are in a HashMap and there can be at most one handler per protocol name, \
462 which is the only possible error; qed",
463 );
464
465 Ok(handler)
466 }
467
468 fn handle_established_outbound_connection(
469 &mut self,
470 connection_id: ConnectionId,
471 peer: PeerId,
472 addr: &Multiaddr,
473 role_override: Endpoint,
474 port_use: PortUse,
475 ) -> Result<Self::ConnectionHandler, ConnectionDenied> {
476 let iter = self.protocols.iter_mut().map(|(p, (r, _))| {
477 (
478 p.to_string(),
479 r.handle_established_outbound_connection(
480 connection_id,
481 peer,
482 addr,
483 role_override,
484 port_use,
485 )
486 .expect(
487 "Behaviours return handlers in these methods with the exception of \
488 'connection management' behaviours like connection-limits or allow-black \
489 list. So, inner request-response behaviour always returns Ok(handler).",
490 ),
491 )
492 });
493
494 let handler = MultiHandler::try_from_iter(iter).expect(
495 "Protocols are in a HashMap and there can be at most one handler per protocol name, \
496 which is the only possible error; qed",
497 );
498
499 Ok(handler)
500 }
501
502 fn on_swarm_event(&mut self, event: FromSwarm<'_>) {
504 match event {
505 FromSwarm::ConnectionEstablished(inner) => {
506 for (protocol, _) in self.protocols.values_mut() {
507 protocol.on_swarm_event(FromSwarm::ConnectionEstablished(inner));
508 }
509 }
510 FromSwarm::ConnectionClosed(inner) => {
511 for (protocol, _) in self.protocols.values_mut() {
512 protocol.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
513 peer_id: inner.peer_id,
514 connection_id: inner.connection_id,
515 endpoint: inner.endpoint,
516 cause: inner.cause,
517 remaining_established: inner.remaining_established,
518 }));
519 }
520 }
521 FromSwarm::AddressChange(inner) => {
522 for (protocol, _) in self.protocols.values_mut() {
523 protocol.on_swarm_event(FromSwarm::AddressChange(inner));
524 }
525 }
526 FromSwarm::DialFailure(inner) => {
527 for (protocol, _) in self.protocols.values_mut() {
528 protocol.on_swarm_event(FromSwarm::DialFailure(DialFailure {
529 peer_id: inner.peer_id,
530 error: inner.error,
531 connection_id: inner.connection_id,
532 }));
533 }
534 }
535 FromSwarm::ListenFailure(inner) => {
536 for (protocol, _) in self.protocols.values_mut() {
537 protocol.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
538 local_addr: inner.local_addr,
539 send_back_addr: inner.send_back_addr,
540 error: inner.error,
541 connection_id: inner.connection_id,
542 peer_id: inner.peer_id,
543 }));
544 }
545 }
546 FromSwarm::NewListener(inner) => {
547 for (protocol, _) in self.protocols.values_mut() {
548 protocol.on_swarm_event(FromSwarm::NewListener(inner));
549 }
550 }
551 FromSwarm::NewListenAddr(inner) => {
552 for (protocol, _) in self.protocols.values_mut() {
553 protocol.on_swarm_event(FromSwarm::NewListenAddr(inner));
554 }
555 }
556 FromSwarm::ExpiredListenAddr(inner) => {
557 for (protocol, _) in self.protocols.values_mut() {
558 protocol.on_swarm_event(FromSwarm::ExpiredListenAddr(inner));
559 }
560 }
561 FromSwarm::ListenerError(inner) => {
562 for (protocol, _) in self.protocols.values_mut() {
563 protocol.on_swarm_event(FromSwarm::ListenerError(inner));
564 }
565 }
566 FromSwarm::ListenerClosed(inner) => {
567 for (protocol, _) in self.protocols.values_mut() {
568 protocol.on_swarm_event(FromSwarm::ListenerClosed(inner));
569 }
570 }
571 FromSwarm::NewExternalAddrCandidate(inner) => {
572 for (protocol, _) in self.protocols.values_mut() {
573 protocol.on_swarm_event(FromSwarm::NewExternalAddrCandidate(inner));
574 }
575 }
576 FromSwarm::ExternalAddrConfirmed(inner) => {
577 for (protocol, _) in self.protocols.values_mut() {
578 protocol.on_swarm_event(FromSwarm::ExternalAddrConfirmed(inner));
579 }
580 }
581 FromSwarm::ExternalAddrExpired(inner) => {
582 for (protocol, _) in self.protocols.values_mut() {
583 protocol.on_swarm_event(FromSwarm::ExternalAddrExpired(inner));
584 }
585 }
586 FromSwarm::NewExternalAddrOfPeer(inner) => {
587 for (protocol, _) in self.protocols.values_mut() {
588 protocol.on_swarm_event(FromSwarm::NewExternalAddrOfPeer(inner));
589 }
590 }
591 event => {
592 warn!(
593 ?event,
594 "New event must be forwarded to request response protocols"
595 );
596 }
597 };
598 }
599
600 fn on_connection_handler_event(
601 &mut self,
602 peer_id: PeerId,
603 connection: ConnectionId,
604 event: THandlerOutEvent<Self>,
605 ) {
606 let p_name = event.0;
607 if let Some((proto, _)) = self.protocols.get_mut(&*p_name) {
608 return proto.on_connection_handler_event(peer_id, connection, event.1);
609 }
610
611 warn!(
612 target: LOG_TARGET,
613 "inject_node_event: no request-response instance registered for protocol {:?}", p_name
614 )
615 }
616
617 fn poll(
618 &mut self,
619 cx: &mut Context<'_>,
620 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
621 'poll_all: loop {
622 if let Some(message_request) = self.message_request.take() {
623 let MessageRequest {
624 peer,
625 request_id,
626 request,
627 channel,
628 protocol,
629 response_builder,
630 } = message_request;
631
632 let (tx, rx) = oneshot::channel();
633
634 if let Some(mut response_builder) = response_builder {
637 let _ = response_builder.try_send(IncomingRequest {
641 peer,
642 payload: request,
643 pending_response: tx,
644 });
645 } else {
646 debug_assert!(false, "Received message on outbound-only protocol.");
647 }
648
649 self.pending_responses.push(Box::pin(async move {
650 if let Ok(response) = rx.await {
654 Some(RequestProcessingOutcome {
655 request_id,
656 protocol: Cow::from(protocol),
657 inner_channel: channel,
658 response,
659 })
660 } else {
661 None
662 }
663 }));
664
665 continue 'poll_all;
668 }
669 while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) {
671 let RequestProcessingOutcome {
672 request_id,
673 protocol: protocol_name,
674 inner_channel,
675 response: OutgoingResponse { result, .. },
676 } = match outcome {
677 Some(outcome) => outcome,
678 None => continue,
681 };
682
683 if let Ok(payload) = result
684 && let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name)
685 && protocol.send_response(inner_channel, Ok(payload)).is_err()
686 {
687 debug!(
690 target: LOG_TARGET,
691 %request_id,
692 "Failed to send response for request on protocol {} due to a \
693 timeout or due to the connection to the peer being closed. \
694 Dropping response",
695 protocol_name,
696 );
697 }
698 }
699
700 for rq_rs_runner in &mut self.request_handlers {
701 let _ = rq_rs_runner.poll_unpin(cx);
703 }
704
705 for (protocol, (behaviour, response_builder)) in &mut self.protocols {
707 while let Poll::Ready(event) = behaviour.poll(cx) {
708 let event = match event {
709 ToSwarm::GenerateEvent(event) => event,
711
712 ToSwarm::Dial { opts } => {
715 if opts.get_peer_id().is_none() {
716 error!(
717 "The request-response isn't supposed to start dialing \
718 addresses"
719 );
720 }
721 return Poll::Ready(ToSwarm::Dial { opts });
722 }
723 ToSwarm::NotifyHandler {
724 peer_id,
725 handler,
726 event,
727 } => {
728 return Poll::Ready(ToSwarm::NotifyHandler {
729 peer_id,
730 handler,
731 event: ((*protocol).to_string(), event),
732 });
733 }
734 ToSwarm::CloseConnection {
735 peer_id,
736 connection,
737 } => {
738 return Poll::Ready(ToSwarm::CloseConnection {
739 peer_id,
740 connection,
741 });
742 }
743 ToSwarm::NewExternalAddrCandidate(observed) => {
744 return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed));
745 }
746 ToSwarm::ExternalAddrConfirmed(addr) => {
747 return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr));
748 }
749 ToSwarm::ExternalAddrExpired(addr) => {
750 return Poll::Ready(ToSwarm::ExternalAddrExpired(addr));
751 }
752 ToSwarm::ListenOn { opts } => {
753 return Poll::Ready(ToSwarm::ListenOn { opts });
754 }
755 ToSwarm::RemoveListener { id } => {
756 return Poll::Ready(ToSwarm::RemoveListener { id });
757 }
758 event => {
759 warn!(
760 ?event,
761 "New event from request response protocol must be send up"
762 );
763
764 continue;
765 }
766 };
767
768 match event {
769 RequestResponseEvent::Message {
771 peer,
772 message:
773 RequestResponseMessage::Request {
774 request_id,
775 request,
776 channel,
777 },
778 ..
779 } => {
780 self.message_request = Some(MessageRequest {
781 peer,
782 request_id,
783 request,
784 channel,
785 protocol: protocol.to_string(),
786 response_builder: response_builder.clone(),
787 });
788
789 continue 'poll_all;
792 }
793
794 RequestResponseEvent::Message {
796 peer,
797 message:
798 RequestResponseMessage::Response {
799 request_id,
800 response,
801 },
802 ..
803 } => {
804 let (started, delivered) = match self
805 .pending_requests
806 .remove(&(protocol.clone(), request_id).into())
807 {
808 Some((started, pending_response)) => {
809 let delivered = pending_response
810 .send(response.map_err(|()| RequestFailure::Refused))
811 .map_err(|_| RequestFailure::Obsolete.to_string());
812 (started, delivered)
813 }
814 None => {
815 warn!(
816 target: LOG_TARGET,
817 "Received `RequestResponseEvent::Message` with unexpected request id {:?}",
818 request_id,
819 );
820 debug_assert!(false);
821 continue;
822 }
823 };
824
825 let out = Event::RequestFinished {
826 peer: Some(peer),
827 protocol: protocol.clone(),
828 duration: started.elapsed(),
829 result: delivered,
830 };
831
832 return Poll::Ready(ToSwarm::GenerateEvent(out));
833 }
834
835 RequestResponseEvent::OutboundFailure {
837 peer,
838 request_id,
839 error,
840 ..
841 } => {
842 let error_string = error.to_string();
843 let started = match self
844 .pending_requests
845 .remove(&(protocol.clone(), request_id).into())
846 {
847 Some((started, pending_response)) => {
848 if pending_response
849 .send(Err(RequestFailure::Network(error)))
850 .is_err()
851 {
852 debug!(
853 target: LOG_TARGET,
854 %request_id,
855 "Request failed. At the same time local node is no longer interested in \
856 the result",
857 );
858 }
859 started
860 }
861 None => {
862 warn!(
863 target: LOG_TARGET,
864 %request_id,
865 "Received `RequestResponseEvent::Message` with unexpected request",
866 );
867 debug_assert!(false);
868 continue;
869 }
870 };
871
872 let out = Event::RequestFinished {
873 peer: Some(peer),
874 protocol: protocol.clone(),
875 duration: started.elapsed(),
876 result: Err(error_string),
877 };
878
879 return Poll::Ready(ToSwarm::GenerateEvent(out));
880 }
881
882 RequestResponseEvent::InboundFailure { peer, error, .. } => {
885 debug!(?error, %peer, "Inbound request failed.");
886
887 let out = Event::InboundRequest {
888 peer,
889 protocol: protocol.clone(),
890 result: Err(ResponseFailure::Network(error)),
891 };
892 return Poll::Ready(ToSwarm::GenerateEvent(out));
893 }
894
895 RequestResponseEvent::ResponseSent { peer, .. } => {
897 let out = Event::InboundRequest {
898 peer,
899 protocol: protocol.clone(),
900 result: Ok(()),
901 };
902
903 return Poll::Ready(ToSwarm::GenerateEvent(out));
904 }
905 };
906 }
907 }
908
909 break Poll::Pending;
910 }
911 }
912}
913
914#[derive(Debug, thiserror::Error)]
916pub enum RegisterError {
917 #[error("{0}")]
919 DuplicateProtocol(Cow<'static, str>),
920}
921
922#[derive(Debug, thiserror::Error)]
924pub enum RequestFailure {
925 #[error("We are not currently connected to the requested peer")]
927 NotConnected,
928 #[error("Given protocol hasn't been registered")]
930 UnknownProtocol,
931 #[error(
934 "Remote has closed the substream before answering, thereby signaling that it considers the \
935 request as valid, but refused to answer it"
936 )]
937 Refused,
938 #[error("The remote replied, but the local node is no longer interested in the response")]
940 Obsolete,
941 #[error("Problem on the network: {0}")]
943 Network(OutboundFailure),
944}
945
946#[derive(Debug, thiserror::Error)]
948pub enum ResponseFailure {
949 #[error("Problem on the network: {0}")]
951 Network(InboundFailure),
952}
953
954#[derive(Debug, Clone)]
957#[doc(hidden)] pub struct GenericCodec {
959 max_request_size: u64,
960 max_response_size: u64,
961}
962
963#[async_trait::async_trait]
964impl RequestResponseCodec for GenericCodec {
965 type Protocol = StreamProtocol;
966 type Request = Vec<u8>;
967 type Response = Result<Vec<u8>, ()>;
968
969 async fn read_request<T>(
970 &mut self,
971 _: &Self::Protocol,
972 mut io: &mut T,
973 ) -> io::Result<Self::Request>
974 where
975 T: AsyncRead + Unpin + Send,
976 {
977 let length = unsigned_varint::aio::read_usize(&mut io)
979 .await
980 .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
981 if length > usize::try_from(self.max_request_size).unwrap_or(usize::MAX) {
982 return Err(io::Error::new(
983 io::ErrorKind::InvalidInput,
984 format!(
985 "Request size exceeds limit: {} > {}",
986 length, self.max_request_size
987 ),
988 ));
989 }
990
991 let mut buffer = vec![0; length];
993 io.read_exact(&mut buffer).await?;
994 Ok(buffer)
995 }
996
997 async fn read_response<T>(
998 &mut self,
999 _: &Self::Protocol,
1000 mut io: &mut T,
1001 ) -> io::Result<Self::Response>
1002 where
1003 T: AsyncRead + Unpin + Send,
1004 {
1005 let length = match unsigned_varint::aio::read_usize(&mut io).await {
1012 Ok(l) => l,
1013 Err(unsigned_varint::io::ReadError::Io(err))
1014 if matches!(err.kind(), io::ErrorKind::UnexpectedEof) =>
1015 {
1016 return Ok(Err(()));
1017 }
1018 Err(err) => return Err(io::Error::new(io::ErrorKind::InvalidInput, err)),
1019 };
1020
1021 if length > usize::try_from(self.max_response_size).unwrap_or(usize::MAX) {
1022 return Err(io::Error::new(
1023 io::ErrorKind::InvalidInput,
1024 format!(
1025 "Response size exceeds limit: {} > {}",
1026 length, self.max_response_size
1027 ),
1028 ));
1029 }
1030
1031 let mut buffer = vec![0; length];
1033 io.read_exact(&mut buffer).await?;
1034 Ok(Ok(buffer))
1035 }
1036
1037 async fn write_request<T>(
1038 &mut self,
1039 _: &Self::Protocol,
1040 io: &mut T,
1041 req: Self::Request,
1042 ) -> io::Result<()>
1043 where
1044 T: AsyncWrite + Unpin + Send,
1045 {
1046 {
1048 let mut buffer = unsigned_varint::encode::usize_buffer();
1049 io.write_all(unsigned_varint::encode::usize(req.len(), &mut buffer))
1050 .await?;
1051 }
1052
1053 io.write_all(&req).await?;
1055
1056 io.close().await?;
1057 Ok(())
1058 }
1059
1060 async fn write_response<T>(
1061 &mut self,
1062 _: &Self::Protocol,
1063 io: &mut T,
1064 res: Self::Response,
1065 ) -> io::Result<()>
1066 where
1067 T: AsyncWrite + Unpin + Send,
1068 {
1069 if let Ok(res) = res {
1071 {
1073 let mut buffer = unsigned_varint::encode::usize_buffer();
1074 io.write_all(unsigned_varint::encode::usize(res.len(), &mut buffer))
1075 .await?;
1076 }
1077
1078 io.write_all(&res).await?;
1080 }
1081
1082 io.close().await?;
1083 Ok(())
1084 }
1085}