1use crate::utils::AsyncJoinOnDrop;
18use anyhow::anyhow;
19use async_nats::{
20 Client, ConnectOptions, HeaderMap, HeaderValue, Message, PublishError, RequestError,
21 RequestErrorKind, Subject, SubscribeError, Subscriber, ToServerAddrs,
22};
23use backon::{BackoffBuilder, ExponentialBuilder};
24use futures::channel::mpsc;
25use futures::stream::FuturesUnordered;
26use futures::{FutureExt, Stream, StreamExt, select};
27use parity_scale_codec::{Decode, Encode};
28use std::any::type_name;
29use std::collections::VecDeque;
30use std::fmt;
31use std::future::Future;
32use std::marker::PhantomData;
33use std::ops::Deref;
34use std::pin::Pin;
35use std::sync::Arc;
36use std::task::{Context, Poll};
37use std::time::Duration;
38use thiserror::Error;
39use tracing::{Instrument, debug, error, trace, warn};
40use ulid::Ulid;
41
42const EXPECTED_MESSAGE_SIZE: usize = 2 * 1024 * 1024;
43const ACKNOWLEDGEMENT_TIMEOUT: Duration = Duration::from_mins(1);
44const REQUEST_TIMEOUT: Duration = Duration::from_mins(5);
47
48pub trait GenericRequest: Encode + Decode + fmt::Debug + Send + Sync + 'static {
54 const SUBJECT: &'static str;
56 type Response: Encode + Decode + fmt::Debug + Send + Sync + 'static;
58}
59
60pub trait GenericStreamRequest: Encode + Decode + fmt::Debug + Send + Sync + 'static {
67 const SUBJECT: &'static str;
69 type Response: Encode + Decode + fmt::Debug + Send + Sync + 'static;
75}
76
77#[derive(Debug, Encode, Decode)]
81enum GenericStreamResponses<Response> {
82 Continue {
84 index: u32,
86 responses: VecDeque<Response>,
88 ack_subject: String,
91 },
92 Last {
94 index: u32,
96 responses: VecDeque<Response>,
98 },
99}
100
101impl<Response> From<GenericStreamResponses<Response>> for VecDeque<Response> {
102 #[inline]
103 fn from(value: GenericStreamResponses<Response>) -> Self {
104 match value {
105 GenericStreamResponses::Continue { responses, .. } => responses,
106 GenericStreamResponses::Last { responses, .. } => responses,
107 }
108 }
109}
110
111impl<Response> GenericStreamResponses<Response> {
112 fn next(&mut self) -> Option<Response> {
113 match self {
114 GenericStreamResponses::Continue { responses, .. } => responses.pop_front(),
115 GenericStreamResponses::Last { responses, .. } => responses.pop_front(),
116 }
117 }
118
119 fn index(&self) -> u32 {
120 match self {
121 GenericStreamResponses::Continue { index, .. } => *index,
122 GenericStreamResponses::Last { index, .. } => *index,
123 }
124 }
125
126 fn ack_subject(&self) -> Option<&str> {
127 if let GenericStreamResponses::Continue { ack_subject, .. } = self {
128 Some(ack_subject)
129 } else {
130 None
131 }
132 }
133
134 fn is_last(&self) -> bool {
135 matches!(self, Self::Last { .. })
136 }
137}
138
139#[derive(Debug, Error)]
141pub enum StreamRequestError {
142 #[error("Subscribe error: {0}")]
144 Subscribe(#[from] SubscribeError),
145 #[error("Publish error: {0}")]
147 Publish(#[from] PublishError),
148}
149
150#[derive(Debug)]
153#[pin_project::pin_project]
154pub struct StreamResponseSubscriber<Response> {
155 #[pin]
156 subscriber: Subscriber,
157 response_subject: String,
158 buffered_responses: Option<GenericStreamResponses<Response>>,
159 next_index: u32,
160 acknowledgement_sender: mpsc::UnboundedSender<(String, u32)>,
161 _background_task: AsyncJoinOnDrop<()>,
162 _phantom: PhantomData<Response>,
163}
164
165impl<Response> Stream for StreamResponseSubscriber<Response>
166where
167 Response: Decode,
168{
169 type Item = Response;
170
171 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
172 if let Some(buffered_responses) = self.buffered_responses.as_mut() {
173 if let Some(response) = buffered_responses.next() {
174 return Poll::Ready(Some(response));
175 } else if buffered_responses.is_last() {
176 return Poll::Ready(None);
177 }
178
179 self.buffered_responses.take();
180 self.next_index += 1;
181 }
182
183 let mut projected = self.project();
184 match projected.subscriber.poll_next_unpin(cx) {
185 Poll::Ready(Some(message)) => {
186 match GenericStreamResponses::<Response>::decode(&mut message.payload.as_ref()) {
187 Ok(mut responses) => {
188 if responses.index() != *projected.next_index {
189 warn!(
190 actual_index = %responses.index(),
191 expected_index = %*projected.next_index,
192 message_type = %type_name::<Response>(),
193 response_subject = %projected.response_subject,
194 "Received unexpected response stream index, aborting stream"
195 );
196
197 return Poll::Ready(None);
198 }
199
200 if let Some(ack_subject) = responses.ack_subject() {
201 let index = responses.index();
202 let ack_subject = ack_subject.to_string();
203
204 if let Err(error) = projected
205 .acknowledgement_sender
206 .unbounded_send((ack_subject.clone(), index))
207 {
208 warn!(
209 %error,
210 %index,
211 message_type = %type_name::<Response>(),
212 response_subject = %projected.response_subject,
213 %ack_subject,
214 "Failed to send acknowledgement for stream response"
215 );
216 }
217 }
218
219 if let Some(response) = responses.next() {
220 *projected.buffered_responses = Some(responses);
221 Poll::Ready(Some(response))
222 } else {
223 Poll::Ready(None)
224 }
225 }
226 Err(error) => {
227 warn!(
228 %error,
229 response_type = %type_name::<Response>(),
230 response_subject = %projected.response_subject,
231 message = %hex::encode(message.payload),
232 "Failed to decode stream response"
233 );
234
235 Poll::Ready(None)
236 }
237 }
238 }
239 Poll::Ready(None) => Poll::Ready(None),
240 Poll::Pending => Poll::Pending,
241 }
242 }
243}
244
245impl<Response> StreamResponseSubscriber<Response> {
246 fn new(subscriber: Subscriber, response_subject: String, nats_client: NatsClient) -> Self {
247 let (acknowledgement_sender, mut acknowledgement_receiver) =
248 mpsc::unbounded::<(String, u32)>();
249
250 let ack_publisher_fut = {
251 let response_subject = response_subject.clone();
252
253 async move {
254 while let Some((subject, index)) = acknowledgement_receiver.next().await {
255 trace!(
256 %subject,
257 %index,
258 %response_subject,
259 %index,
260 "Sending stream response acknowledgement"
261 );
262 if let Err(error) = nats_client
263 .publish(subject.clone(), index.to_le_bytes().to_vec().into())
264 .await
265 {
266 warn!(
267 %error,
268 %subject,
269 %index,
270 %response_subject,
271 %index,
272 "Failed to send stream response acknowledgement"
273 );
274 return;
275 }
276 }
277 }
278 };
279 let background_task =
280 AsyncJoinOnDrop::new(tokio::spawn(ack_publisher_fut.in_current_span()), true);
281
282 Self {
283 response_subject,
284 subscriber,
285 buffered_responses: None,
286 next_index: 0,
287 acknowledgement_sender,
288 _background_task: background_task,
289 _phantom: PhantomData,
290 }
291 }
292}
293
294pub trait GenericNotification: Encode + Decode + fmt::Debug + Send + Sync + 'static {
296 const SUBJECT: &'static str;
299}
300
301pub trait GenericBroadcast: Encode + Decode + fmt::Debug + Send + Sync + 'static {
307 const SUBJECT: &'static str;
309
310 fn deterministic_message_id(&self) -> Option<HeaderValue> {
313 None
314 }
315}
316
317#[derive(Debug)]
319#[pin_project::pin_project]
320pub struct SubscriberWrapper<Message> {
321 #[pin]
322 subscriber: Subscriber,
323 _phantom: PhantomData<Message>,
324}
325
326impl<Message> Stream for SubscriberWrapper<Message>
327where
328 Message: Decode,
329{
330 type Item = Message;
331
332 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
333 match self.project().subscriber.poll_next_unpin(cx) {
334 Poll::Ready(Some(message)) => match Message::decode(&mut message.payload.as_ref()) {
335 Ok(message) => Poll::Ready(Some(message)),
336 Err(error) => {
337 warn!(
338 %error,
339 message_type = %type_name::<Message>(),
340 message = %hex::encode(message.payload),
341 "Failed to decode stream message"
342 );
343
344 Poll::Pending
345 }
346 },
347 Poll::Ready(None) => Poll::Ready(None),
348 Poll::Pending => Poll::Pending,
349 }
350 }
351}
352
353#[derive(Debug)]
354struct Inner {
355 client: Client,
356 request_retry_backoff_policy: ExponentialBuilder,
357 approximate_max_message_size: usize,
358 max_message_size: usize,
359}
360
361#[derive(Debug, Clone)]
363pub struct NatsClient {
364 inner: Arc<Inner>,
365}
366
367impl Deref for NatsClient {
368 type Target = Client;
369
370 #[inline]
371 fn deref(&self) -> &Self::Target {
372 &self.inner.client
373 }
374}
375
376impl NatsClient {
377 pub async fn new<A>(
379 addrs: A,
380 request_retry_backoff_policy: ExponentialBuilder,
381 ) -> Result<Self, async_nats::Error>
382 where
383 A: ToServerAddrs,
384 {
385 let servers = addrs.to_server_addrs()?.collect::<Vec<_>>();
386 Self::from_client(
387 async_nats::connect_with_options(
388 &servers,
389 ConnectOptions::default().request_timeout(Some(REQUEST_TIMEOUT)),
390 )
391 .await?,
392 request_retry_backoff_policy,
393 )
394 }
395
396 pub fn from_client(
398 client: Client,
399 request_retry_backoff_policy: ExponentialBuilder,
400 ) -> Result<Self, async_nats::Error> {
401 let max_payload = client.server_info().max_payload;
402 if max_payload < EXPECTED_MESSAGE_SIZE {
403 return Err(format!(
404 "Max payload {max_payload} is smaller than expected {EXPECTED_MESSAGE_SIZE}, \
405 increase it by specifying max_payload = 2MB or higher number in NATS configuration"
406 )
407 .into());
408 }
409
410 let inner = Inner {
411 client,
412 request_retry_backoff_policy,
413 approximate_max_message_size: max_payload * 9 / 10,
415 max_message_size: max_payload,
417 };
418
419 Ok(Self {
420 inner: Arc::new(inner),
421 })
422 }
423
424 pub fn approximate_max_message_size(&self) -> usize {
427 self.inner.approximate_max_message_size
428 }
429
430 pub async fn request<Request>(
432 &self,
433 request: &Request,
434 instance: Option<&str>,
435 ) -> Result<Request::Response, RequestError>
436 where
437 Request: GenericRequest,
438 {
439 let subject = subject_with_instance(Request::SUBJECT, instance);
440 let mut maybe_retry_backoff = None;
441 let message = loop {
442 match self
443 .inner
444 .client
445 .request(subject.clone(), request.encode().into())
446 .await
447 {
448 Ok(message) => {
449 break message;
450 }
451 Err(error) => {
452 match error.kind() {
453 RequestErrorKind::TimedOut | RequestErrorKind::NoResponders => {
454 }
456 RequestErrorKind::InvalidSubject
457 | RequestErrorKind::MaxPayloadExceeded
458 | RequestErrorKind::Other => {
459 return Err(error);
460 }
461 }
462
463 let retry_backoff = maybe_retry_backoff
464 .get_or_insert_with(|| self.inner.request_retry_backoff_policy.build());
465
466 if let Some(delay) = retry_backoff.next() {
467 debug!(
468 %subject,
469 %error,
470 request_type = %type_name::<Request>(),
471 ?delay,
472 "Failed to make request, retrying after some delay"
473 );
474
475 tokio::time::sleep(delay).await;
476 } else {
477 return Err(error);
478 }
479 }
480 }
481 };
482
483 let response =
484 Request::Response::decode(&mut message.payload.as_ref()).map_err(|error| {
485 warn!(
486 %subject,
487 %error,
488 response_type = %type_name::<Request::Response>(),
489 response = %hex::encode(message.payload),
490 "Response decoding failed"
491 );
492
493 RequestErrorKind::Other
494 })?;
495
496 Ok(response)
497 }
498
499 pub async fn request_responder<Request, F, OP>(
514 &self,
515 instance: Option<&str>,
516 queue_group: Option<String>,
517 process: OP,
518 ) -> anyhow::Result<()>
519 where
520 Request: GenericRequest,
521 F: Future<Output = Option<Request::Response>> + Send,
522 OP: Fn(Request) -> F + Send + Sync,
523 {
524 let mut processing = FuturesUnordered::new();
526
527 let subscription = self
528 .common_subscribe(Request::SUBJECT, instance, queue_group)
529 .await
530 .map_err(|error| {
531 anyhow!(
532 "Failed to subscribe to {} requests for {instance:?}: {error}",
533 type_name::<Request>(),
534 )
535 })?;
536
537 debug!(
538 request_type = %type_name::<Request>(),
539 ?subscription,
540 "Requests subscription"
541 );
542 let mut subscription = subscription.fuse();
543
544 loop {
545 select! {
546 message = subscription.select_next_some() => {
547 processing.push(
549 self
550 .process_request(
551 message,
552 &process,
553 )
554 .in_current_span(),
555 );
556 },
557 _ = processing.next() => {
558 },
560 complete => {
561 break;
562 }
563 }
564 }
565
566 Ok(())
567 }
568
569 async fn process_request<Request, F, OP>(&self, message: Message, process: OP)
570 where
571 Request: GenericRequest,
572 F: Future<Output = Option<Request::Response>> + Send,
573 OP: Fn(Request) -> F + Send + Sync,
574 {
575 let Some(reply_subject) = message.reply else {
576 return;
577 };
578
579 let message_payload_size = message.payload.len();
580 let request = match Request::decode(&mut message.payload.as_ref()) {
581 Ok(request) => {
582 drop(message.payload);
584 request
585 }
586 Err(error) => {
587 warn!(
588 request_type = %type_name::<Request>(),
589 %error,
590 message = %hex::encode(message.payload),
591 "Failed to decode request"
592 );
593 return;
594 }
595 };
596
597 if message_payload_size > 1024 {
599 trace!(
600 request_type = %type_name::<Request>(),
601 %reply_subject,
602 "Processing request"
603 );
604 } else {
605 trace!(
606 request_type = %type_name::<Request>(),
607 ?request,
608 %reply_subject,
609 "Processing request"
610 );
611 }
612
613 if let Some(response) = process(request).await
614 && let Err(error) = self.publish(reply_subject, response.encode().into()).await
615 {
616 warn!(
617 request_type = %type_name::<Request>(),
618 %error,
619 "Failed to send response"
620 );
621 }
622 }
623
624 pub async fn stream_request<Request>(
626 &self,
627 request: &Request,
628 instance: Option<&str>,
629 ) -> Result<StreamResponseSubscriber<Request::Response>, StreamRequestError>
630 where
631 Request: GenericStreamRequest,
632 {
633 let stream_request_subject = subject_with_instance(Request::SUBJECT, instance);
634 let stream_response_subject = format!("stream-response.{}", Ulid::new());
635
636 let subscriber = self
637 .inner
638 .client
639 .subscribe(stream_response_subject.clone())
640 .await?;
641
642 debug!(
643 request_type = %type_name::<Request>(),
644 %stream_request_subject,
645 %stream_response_subject,
646 ?subscriber,
647 "Stream request subscription"
648 );
649
650 self.inner
651 .client
652 .publish_with_reply(
653 stream_request_subject,
654 stream_response_subject.clone(),
655 request.encode().into(),
656 )
657 .await?;
658
659 Ok(StreamResponseSubscriber::new(
660 subscriber,
661 stream_response_subject,
662 self.clone(),
663 ))
664 }
665
666 pub async fn stream_request_responder<Request, F, S, OP>(
681 &self,
682 instance: Option<&str>,
683 queue_group: Option<String>,
684 process: OP,
685 ) -> anyhow::Result<()>
686 where
687 Request: GenericStreamRequest,
688 F: Future<Output = Option<S>> + Send,
689 S: Stream<Item = Request::Response> + Unpin,
690 OP: Fn(Request) -> F + Send + Sync,
691 {
692 let mut processing = FuturesUnordered::new();
694
695 let subscription = self
696 .common_subscribe(Request::SUBJECT, instance, queue_group)
697 .await
698 .map_err(|error| {
699 anyhow!(
700 "Failed to subscribe to {} stream requests for {instance:?}: {error}",
701 type_name::<Request>(),
702 )
703 })?;
704
705 debug!(
706 request_type = %type_name::<Request>(),
707 ?subscription,
708 "Stream requests subscription"
709 );
710 let mut subscription = subscription.fuse();
711
712 loop {
713 select! {
714 message = subscription.select_next_some() => {
715 processing.push(
717 self
718 .process_stream_request(
719 message,
720 &process,
721 )
722 .in_current_span(),
723 );
724 },
725 _ = processing.next() => {
726 },
728 complete => {
729 break;
730 }
731 }
732 }
733
734 Ok(())
735 }
736
737 async fn process_stream_request<Request, F, S, OP>(&self, message: Message, process: OP)
738 where
739 Request: GenericStreamRequest,
740 F: Future<Output = Option<S>> + Send,
741 S: Stream<Item = Request::Response> + Unpin,
742 OP: Fn(Request) -> F + Send + Sync,
743 {
744 let Some(reply_subject) = message.reply else {
745 return;
746 };
747
748 let message_payload_size = message.payload.len();
749 let request = match Request::decode(&mut message.payload.as_ref()) {
750 Ok(request) => {
751 drop(message.payload);
753 request
754 }
755 Err(error) => {
756 warn!(
757 request_type = %type_name::<Request>(),
758 %error,
759 message = %hex::encode(message.payload),
760 "Failed to decode request"
761 );
762 return;
763 }
764 };
765
766 if message_payload_size > 1024 {
768 trace!(
769 request_type = %type_name::<Request>(),
770 %reply_subject,
771 "Processing request"
772 );
773 } else {
774 trace!(
775 request_type = %type_name::<Request>(),
776 ?request,
777 %reply_subject,
778 "Processing request"
779 );
780 }
781
782 if let Some(stream) = process(request).await {
783 self.stream_response::<Request, _>(reply_subject, stream)
784 .await;
785 }
786 }
787
788 async fn stream_response<Request, S>(&self, response_subject: Subject, response_stream: S)
790 where
791 Request: GenericStreamRequest,
792 S: Stream<Item = Request::Response> + Unpin,
793 {
794 type Response<Request> =
795 GenericStreamResponses<<Request as GenericStreamRequest>::Response>;
796
797 let mut response_stream = response_stream.fuse();
798
799 let Some(first_element) = response_stream.next().await else {
801 if let Err(error) = self
802 .publish(
803 response_subject.clone(),
804 Response::<Request>::Last {
805 index: 0,
806 responses: VecDeque::new(),
807 }
808 .encode()
809 .into(),
810 )
811 .await
812 {
813 warn!(
814 %response_subject,
815 %error,
816 request_type = %type_name::<Request>(),
817 response_type = %type_name::<Request::Response>(),
818 "Failed to send stream response"
819 );
820 }
821
822 return;
823 };
824 let max_message_size = self.inner.max_message_size;
825 let approximate_max_message_size = self.approximate_max_message_size();
826 let max_responses_per_message = approximate_max_message_size / first_element.encoded_size();
827
828 let ack_subject = format!("stream-response-ack.{}", Ulid::new());
829 let mut ack_subscription = match self.subscribe(ack_subject.clone()).await {
830 Ok(ack_subscription) => ack_subscription,
831 Err(error) => {
832 warn!(
833 %response_subject,
834 %error,
835 request_type = %type_name::<Request>(),
836 response_type = %type_name::<Request::Response>(),
837 "Failed to subscribe to ack subject"
838 );
839 return;
840 }
841 };
842 debug!(
843 %response_subject,
844 request_type = %type_name::<Request>(),
845 response_type = %type_name::<Request::Response>(),
846 ?ack_subscription,
847 "Ack subscription subscription"
848 );
849 let mut index = 0;
850 let mut buffer = VecDeque::with_capacity(max_responses_per_message);
852 buffer.push_back(first_element);
853 let mut overflow_buffer = VecDeque::new();
854
855 loop {
856 if buffer.is_empty()
858 && let Some(element) = response_stream.next().await
859 {
860 buffer.push_back(element);
861 }
862 while buffer.encoded_size() < approximate_max_message_size
863 && let Some(element) = response_stream.next().now_or_never().flatten()
864 {
865 buffer.push_back(element);
866 }
867
868 loop {
869 let is_done = response_stream.is_done() && overflow_buffer.is_empty();
870 let num_messages = buffer.len();
871 let response = if is_done {
872 Response::<Request>::Last {
873 index,
874 responses: buffer,
875 }
876 } else {
877 Response::<Request>::Continue {
878 index,
879 responses: buffer,
880 ack_subject: ack_subject.clone(),
881 }
882 };
883 let encoded_response = response.encode();
884 let encoded_response_len = encoded_response.len();
885 if encoded_response_len > max_message_size {
888 buffer = response.into();
889 if let Some(element) = buffer.pop_back() {
890 if buffer.is_empty() {
891 error!(
892 ?element,
893 encoded_response_len,
894 max_message_size,
895 "Element was too large to fit into NATS message, this is an \
896 implementation bug"
897 );
898 }
899 overflow_buffer.push_front(element);
900 continue;
901 }
902
903 error!(
904 %response_subject,
905 request_type = %type_name::<Request>(),
906 response_type = %type_name::<Request::Response>(),
907 "Empty response overflown message size, this should never happen"
908 );
909 return;
910 }
911
912 debug!(
913 %response_subject,
914 num_messages,
915 %index,
916 %is_done,
917 "Publishing stream response messages",
918 );
919
920 if let Err(error) = self
921 .publish(response_subject.clone(), encoded_response.into())
922 .await
923 {
924 warn!(
925 %response_subject,
926 %error,
927 request_type = %type_name::<Request>(),
928 response_type = %type_name::<Request::Response>(),
929 "Failed to send stream response"
930 );
931 return;
932 }
933
934 if is_done {
935 return;
936 }
937
938 buffer = response.into();
939 buffer.clear();
940 buffer.extend(overflow_buffer.drain(..));
942
943 if index >= 1 {
944 let expected_index = index - 1;
946
947 trace!(
948 %response_subject,
949 %expected_index,
950 "Waiting for acknowledgement"
951 );
952 match tokio::time::timeout(ACKNOWLEDGEMENT_TIMEOUT, ack_subscription.next())
953 .await
954 {
955 Ok(Some(message)) => {
956 if let Some(received_index) =
957 message.payload.get(..size_of::<u32>()).map(|bytes| {
958 u32::from_le_bytes(
959 bytes.try_into().expect("Correctly chunked slice; qed"),
960 )
961 })
962 {
963 debug!(
964 %response_subject,
965 %received_index,
966 "Received acknowledgement"
967 );
968 if received_index != expected_index {
969 warn!(
970 %response_subject,
971 %received_index,
972 %expected_index,
973 request_type = %type_name::<Request>(),
974 response_type = %type_name::<Request::Response>(),
975 message = %hex::encode(message.payload),
976 "Unexpected acknowledgement index"
977 );
978 return;
979 }
980 } else {
981 warn!(
982 %response_subject,
983 request_type = %type_name::<Request>(),
984 response_type = %type_name::<Request::Response>(),
985 message = %hex::encode(message.payload),
986 "Unexpected acknowledgement message"
987 );
988 return;
989 }
990 }
991 Ok(None) => {
992 warn!(
993 %response_subject,
994 request_type = %type_name::<Request>(),
995 response_type = %type_name::<Request::Response>(),
996 "Acknowledgement stream ended unexpectedly"
997 );
998 return;
999 }
1000 Err(_error) => {
1001 warn!(
1002 %response_subject,
1003 %expected_index,
1004 request_type = %type_name::<Request>(),
1005 response_type = %type_name::<Request::Response>(),
1006 "Acknowledgement wait timed out"
1007 );
1008 return;
1009 }
1010 }
1011 }
1012
1013 index += 1;
1014
1015 if buffer.is_empty() {
1017 break;
1018 }
1019 }
1020 }
1021 }
1022
1023 pub async fn notification<Notification>(
1025 &self,
1026 notification: &Notification,
1027 instance: Option<&str>,
1028 ) -> Result<(), PublishError>
1029 where
1030 Notification: GenericNotification,
1031 {
1032 self.inner
1033 .client
1034 .publish(
1035 subject_with_instance(Notification::SUBJECT, instance),
1036 notification.encode().into(),
1037 )
1038 .await
1039 }
1040
1041 pub async fn broadcast<Broadcast>(
1043 &self,
1044 message: &Broadcast,
1045 instance: &str,
1046 ) -> Result<(), PublishError>
1047 where
1048 Broadcast: GenericBroadcast,
1049 {
1050 self.inner
1051 .client
1052 .publish_with_headers(
1053 Broadcast::SUBJECT.replace('*', instance),
1054 {
1055 let mut headers = HeaderMap::new();
1056 if let Some(message_id) = message.deterministic_message_id() {
1057 headers.insert("Nats-Msg-Id", message_id);
1058 }
1059 headers
1060 },
1061 message.encode().into(),
1062 )
1063 .await
1064 }
1065
1066 pub async fn subscribe_to_notifications<Notification>(
1069 &self,
1070 instance: Option<&str>,
1071 queue_group: Option<String>,
1072 ) -> Result<SubscriberWrapper<Notification>, SubscribeError>
1073 where
1074 Notification: GenericNotification,
1075 {
1076 self.simple_subscribe(Notification::SUBJECT, instance, queue_group)
1077 .await
1078 }
1079
1080 pub async fn subscribe_to_broadcasts<Broadcast>(
1083 &self,
1084 instance: Option<&str>,
1085 queue_group: Option<String>,
1086 ) -> Result<SubscriberWrapper<Broadcast>, SubscribeError>
1087 where
1088 Broadcast: GenericBroadcast,
1089 {
1090 self.simple_subscribe(Broadcast::SUBJECT, instance, queue_group)
1091 .await
1092 }
1093
1094 async fn simple_subscribe<Message>(
1097 &self,
1098 subject: &'static str,
1099 instance: Option<&str>,
1100 queue_group: Option<String>,
1101 ) -> Result<SubscriberWrapper<Message>, SubscribeError>
1102 where
1103 Message: Decode,
1104 {
1105 let subscriber = self
1106 .common_subscribe(subject, instance, queue_group)
1107 .await?;
1108 debug!(
1109 %subject,
1110 message_type = %type_name::<Message>(),
1111 ?subscriber,
1112 "Simple subscription"
1113 );
1114
1115 Ok(SubscriberWrapper {
1116 subscriber,
1117 _phantom: PhantomData,
1118 })
1119 }
1120
1121 async fn common_subscribe(
1124 &self,
1125 subject: &'static str,
1126 instance: Option<&str>,
1127 queue_group: Option<String>,
1128 ) -> Result<Subscriber, SubscribeError> {
1129 let subscriber = if let Some(queue_group) = queue_group {
1130 self.inner
1131 .client
1132 .queue_subscribe(subject_with_instance(subject, instance), queue_group)
1133 .await?
1134 } else {
1135 self.inner
1136 .client
1137 .subscribe(subject_with_instance(subject, instance))
1138 .await?
1139 };
1140
1141 Ok(subscriber)
1142 }
1143}
1144
1145fn subject_with_instance(subject: &'static str, instance: Option<&str>) -> Subject {
1146 if let Some(instance) = instance {
1147 Subject::from(subject.replace('*', instance))
1148 } else {
1149 Subject::from_static(subject)
1150 }
1151}