1use crate::utils::AsyncJoinOnDrop;
18use anyhow::anyhow;
19use async_nats::{
20 Client, ConnectOptions, HeaderMap, HeaderValue, Message, PublishError, RequestError,
21 RequestErrorKind, Subject, SubscribeError, Subscriber, ToServerAddrs,
22};
23use backoff::ExponentialBackoff;
24use backoff::backoff::Backoff;
25use futures::channel::mpsc;
26use futures::stream::FuturesUnordered;
27use futures::{FutureExt, Stream, StreamExt, select};
28use parity_scale_codec::{Decode, Encode};
29use std::any::type_name;
30use std::collections::VecDeque;
31use std::fmt;
32use std::future::Future;
33use std::marker::PhantomData;
34use std::ops::Deref;
35use std::pin::Pin;
36use std::sync::Arc;
37use std::task::{Context, Poll};
38use std::time::Duration;
39use thiserror::Error;
40use tracing::{Instrument, debug, error, trace, warn};
41use ulid::Ulid;
42
43const EXPECTED_MESSAGE_SIZE: usize = 2 * 1024 * 1024;
44const ACKNOWLEDGEMENT_TIMEOUT: Duration = Duration::from_mins(1);
45const REQUEST_TIMEOUT: Duration = Duration::from_mins(5);
48
49pub trait GenericRequest: Encode + Decode + fmt::Debug + Send + Sync + 'static {
55 const SUBJECT: &'static str;
57 type Response: Encode + Decode + fmt::Debug + Send + Sync + 'static;
59}
60
61pub trait GenericStreamRequest: Encode + Decode + fmt::Debug + Send + Sync + 'static {
68 const SUBJECT: &'static str;
70 type Response: Encode + Decode + fmt::Debug + Send + Sync + 'static;
76}
77
78#[derive(Debug, Encode, Decode)]
82enum GenericStreamResponses<Response> {
83 Continue {
85 index: u32,
87 responses: VecDeque<Response>,
89 ack_subject: String,
92 },
93 Last {
95 index: u32,
97 responses: VecDeque<Response>,
99 },
100}
101
102impl<Response> From<GenericStreamResponses<Response>> for VecDeque<Response> {
103 #[inline]
104 fn from(value: GenericStreamResponses<Response>) -> Self {
105 match value {
106 GenericStreamResponses::Continue { responses, .. } => responses,
107 GenericStreamResponses::Last { responses, .. } => responses,
108 }
109 }
110}
111
112impl<Response> GenericStreamResponses<Response> {
113 fn next(&mut self) -> Option<Response> {
114 match self {
115 GenericStreamResponses::Continue { responses, .. } => responses.pop_front(),
116 GenericStreamResponses::Last { responses, .. } => responses.pop_front(),
117 }
118 }
119
120 fn index(&self) -> u32 {
121 match self {
122 GenericStreamResponses::Continue { index, .. } => *index,
123 GenericStreamResponses::Last { index, .. } => *index,
124 }
125 }
126
127 fn ack_subject(&self) -> Option<&str> {
128 if let GenericStreamResponses::Continue { ack_subject, .. } = self {
129 Some(ack_subject)
130 } else {
131 None
132 }
133 }
134
135 fn is_last(&self) -> bool {
136 matches!(self, Self::Last { .. })
137 }
138}
139
140#[derive(Debug, Error)]
142pub enum StreamRequestError {
143 #[error("Subscribe error: {0}")]
145 Subscribe(#[from] SubscribeError),
146 #[error("Publish error: {0}")]
148 Publish(#[from] PublishError),
149}
150
151#[derive(Debug)]
154#[pin_project::pin_project]
155pub struct StreamResponseSubscriber<Response> {
156 #[pin]
157 subscriber: Subscriber,
158 response_subject: String,
159 buffered_responses: Option<GenericStreamResponses<Response>>,
160 next_index: u32,
161 acknowledgement_sender: mpsc::UnboundedSender<(String, u32)>,
162 _background_task: AsyncJoinOnDrop<()>,
163 _phantom: PhantomData<Response>,
164}
165
166impl<Response> Stream for StreamResponseSubscriber<Response>
167where
168 Response: Decode,
169{
170 type Item = Response;
171
172 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
173 if let Some(buffered_responses) = self.buffered_responses.as_mut() {
174 if let Some(response) = buffered_responses.next() {
175 return Poll::Ready(Some(response));
176 } else if buffered_responses.is_last() {
177 return Poll::Ready(None);
178 }
179
180 self.buffered_responses.take();
181 self.next_index += 1;
182 }
183
184 let mut projected = self.project();
185 match projected.subscriber.poll_next_unpin(cx) {
186 Poll::Ready(Some(message)) => {
187 match GenericStreamResponses::<Response>::decode(&mut message.payload.as_ref()) {
188 Ok(mut responses) => {
189 if responses.index() != *projected.next_index {
190 warn!(
191 actual_index = %responses.index(),
192 expected_index = %*projected.next_index,
193 message_type = %type_name::<Response>(),
194 response_subject = %projected.response_subject,
195 "Received unexpected response stream index, aborting stream"
196 );
197
198 return Poll::Ready(None);
199 }
200
201 if let Some(ack_subject) = responses.ack_subject() {
202 let index = responses.index();
203 let ack_subject = ack_subject.to_string();
204
205 if let Err(error) = projected
206 .acknowledgement_sender
207 .unbounded_send((ack_subject.clone(), index))
208 {
209 warn!(
210 %error,
211 %index,
212 message_type = %type_name::<Response>(),
213 response_subject = %projected.response_subject,
214 %ack_subject,
215 "Failed to send acknowledgement for stream response"
216 );
217 }
218 }
219
220 if let Some(response) = responses.next() {
221 *projected.buffered_responses = Some(responses);
222 Poll::Ready(Some(response))
223 } else {
224 Poll::Ready(None)
225 }
226 }
227 Err(error) => {
228 warn!(
229 %error,
230 response_type = %type_name::<Response>(),
231 response_subject = %projected.response_subject,
232 message = %hex::encode(message.payload),
233 "Failed to decode stream response"
234 );
235
236 Poll::Ready(None)
237 }
238 }
239 }
240 Poll::Ready(None) => Poll::Ready(None),
241 Poll::Pending => Poll::Pending,
242 }
243 }
244}
245
246impl<Response> StreamResponseSubscriber<Response> {
247 fn new(subscriber: Subscriber, response_subject: String, nats_client: NatsClient) -> Self {
248 let (acknowledgement_sender, mut acknowledgement_receiver) =
249 mpsc::unbounded::<(String, u32)>();
250
251 let ack_publisher_fut = {
252 let response_subject = response_subject.clone();
253
254 async move {
255 while let Some((subject, index)) = acknowledgement_receiver.next().await {
256 trace!(
257 %subject,
258 %index,
259 %response_subject,
260 %index,
261 "Sending stream response acknowledgement"
262 );
263 if let Err(error) = nats_client
264 .publish(subject.clone(), index.to_le_bytes().to_vec().into())
265 .await
266 {
267 warn!(
268 %error,
269 %subject,
270 %index,
271 %response_subject,
272 %index,
273 "Failed to send stream response acknowledgement"
274 );
275 return;
276 }
277 }
278 }
279 };
280 let background_task =
281 AsyncJoinOnDrop::new(tokio::spawn(ack_publisher_fut.in_current_span()), true);
282
283 Self {
284 response_subject,
285 subscriber,
286 buffered_responses: None,
287 next_index: 0,
288 acknowledgement_sender,
289 _background_task: background_task,
290 _phantom: PhantomData,
291 }
292 }
293}
294
295pub trait GenericNotification: Encode + Decode + fmt::Debug + Send + Sync + 'static {
297 const SUBJECT: &'static str;
300}
301
302pub trait GenericBroadcast: Encode + Decode + fmt::Debug + Send + Sync + 'static {
308 const SUBJECT: &'static str;
310
311 fn deterministic_message_id(&self) -> Option<HeaderValue> {
314 None
315 }
316}
317
318#[derive(Debug)]
320#[pin_project::pin_project]
321pub struct SubscriberWrapper<Message> {
322 #[pin]
323 subscriber: Subscriber,
324 _phantom: PhantomData<Message>,
325}
326
327impl<Message> Stream for SubscriberWrapper<Message>
328where
329 Message: Decode,
330{
331 type Item = Message;
332
333 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
334 match self.project().subscriber.poll_next_unpin(cx) {
335 Poll::Ready(Some(message)) => match Message::decode(&mut message.payload.as_ref()) {
336 Ok(message) => Poll::Ready(Some(message)),
337 Err(error) => {
338 warn!(
339 %error,
340 message_type = %type_name::<Message>(),
341 message = %hex::encode(message.payload),
342 "Failed to decode stream message"
343 );
344
345 Poll::Pending
346 }
347 },
348 Poll::Ready(None) => Poll::Ready(None),
349 Poll::Pending => Poll::Pending,
350 }
351 }
352}
353
354#[derive(Debug)]
355struct Inner {
356 client: Client,
357 request_retry_backoff_policy: ExponentialBackoff,
358 approximate_max_message_size: usize,
359 max_message_size: usize,
360}
361
362#[derive(Debug, Clone)]
364pub struct NatsClient {
365 inner: Arc<Inner>,
366}
367
368impl Deref for NatsClient {
369 type Target = Client;
370
371 #[inline]
372 fn deref(&self) -> &Self::Target {
373 &self.inner.client
374 }
375}
376
377impl NatsClient {
378 pub async fn new<A: ToServerAddrs>(
380 addrs: A,
381 request_retry_backoff_policy: ExponentialBackoff,
382 ) -> Result<Self, async_nats::Error> {
383 let servers = addrs.to_server_addrs()?.collect::<Vec<_>>();
384 Self::from_client(
385 async_nats::connect_with_options(
386 &servers,
387 ConnectOptions::default().request_timeout(Some(REQUEST_TIMEOUT)),
388 )
389 .await?,
390 request_retry_backoff_policy,
391 )
392 }
393
394 pub fn from_client(
396 client: Client,
397 request_retry_backoff_policy: ExponentialBackoff,
398 ) -> Result<Self, async_nats::Error> {
399 let max_payload = client.server_info().max_payload;
400 if max_payload < EXPECTED_MESSAGE_SIZE {
401 return Err(format!(
402 "Max payload {max_payload} is smaller than expected {EXPECTED_MESSAGE_SIZE}, \
403 increase it by specifying max_payload = 2MB or higher number in NATS configuration"
404 )
405 .into());
406 }
407
408 let inner = Inner {
409 client,
410 request_retry_backoff_policy,
411 approximate_max_message_size: max_payload * 9 / 10,
413 max_message_size: max_payload,
415 };
416
417 Ok(Self {
418 inner: Arc::new(inner),
419 })
420 }
421
422 pub fn approximate_max_message_size(&self) -> usize {
425 self.inner.approximate_max_message_size
426 }
427
428 pub async fn request<Request>(
430 &self,
431 request: &Request,
432 instance: Option<&str>,
433 ) -> Result<Request::Response, RequestError>
434 where
435 Request: GenericRequest,
436 {
437 let subject = subject_with_instance(Request::SUBJECT, instance);
438 let mut maybe_retry_backoff = None;
439 let message = loop {
440 match self
441 .inner
442 .client
443 .request(subject.clone(), request.encode().into())
444 .await
445 {
446 Ok(message) => {
447 break message;
448 }
449 Err(error) => {
450 match error.kind() {
451 RequestErrorKind::TimedOut | RequestErrorKind::NoResponders => {
452 }
454 RequestErrorKind::Other => {
455 return Err(error);
456 }
457 }
458
459 let retry_backoff = maybe_retry_backoff.get_or_insert_with(|| {
460 let mut retry_backoff = self.inner.request_retry_backoff_policy.clone();
461 retry_backoff.reset();
462 retry_backoff
463 });
464
465 if let Some(delay) = retry_backoff.next_backoff() {
466 debug!(
467 %subject,
468 %error,
469 request_type = %type_name::<Request>(),
470 ?delay,
471 "Failed to make request, retrying after some delay"
472 );
473
474 tokio::time::sleep(delay).await;
475 continue;
476 } else {
477 return Err(error);
478 }
479 }
480 }
481 };
482
483 let response =
484 Request::Response::decode(&mut message.payload.as_ref()).map_err(|error| {
485 warn!(
486 %subject,
487 %error,
488 response_type = %type_name::<Request::Response>(),
489 response = %hex::encode(message.payload),
490 "Response decoding failed"
491 );
492
493 RequestErrorKind::Other
494 })?;
495
496 Ok(response)
497 }
498
499 pub async fn request_responder<Request, F, OP>(
514 &self,
515 instance: Option<&str>,
516 queue_group: Option<String>,
517 process: OP,
518 ) -> anyhow::Result<()>
519 where
520 Request: GenericRequest,
521 F: Future<Output = Option<Request::Response>> + Send,
522 OP: Fn(Request) -> F + Send + Sync,
523 {
524 let mut processing = FuturesUnordered::new();
526
527 let subscription = self
528 .common_subscribe(Request::SUBJECT, instance, queue_group)
529 .await
530 .map_err(|error| {
531 anyhow!(
532 "Failed to subscribe to {} requests for {instance:?}: {error}",
533 type_name::<Request>(),
534 )
535 })?;
536
537 debug!(
538 request_type = %type_name::<Request>(),
539 ?subscription,
540 "Requests subscription"
541 );
542 let mut subscription = subscription.fuse();
543
544 loop {
545 select! {
546 message = subscription.select_next_some() => {
547 processing.push(
549 self
550 .process_request(
551 message,
552 &process,
553 )
554 .in_current_span(),
555 );
556 },
557 _ = processing.next() => {
558 },
560 complete => {
561 break;
562 }
563 }
564 }
565
566 Ok(())
567 }
568
569 async fn process_request<Request, F, OP>(&self, message: Message, process: OP)
570 where
571 Request: GenericRequest,
572 F: Future<Output = Option<Request::Response>> + Send,
573 OP: Fn(Request) -> F + Send + Sync,
574 {
575 let Some(reply_subject) = message.reply else {
576 return;
577 };
578
579 let message_payload_size = message.payload.len();
580 let request = match Request::decode(&mut message.payload.as_ref()) {
581 Ok(request) => {
582 drop(message.payload);
584 request
585 }
586 Err(error) => {
587 warn!(
588 request_type = %type_name::<Request>(),
589 %error,
590 message = %hex::encode(message.payload),
591 "Failed to decode request"
592 );
593 return;
594 }
595 };
596
597 if message_payload_size > 1024 {
599 trace!(
600 request_type = %type_name::<Request>(),
601 %reply_subject,
602 "Processing request"
603 );
604 } else {
605 trace!(
606 request_type = %type_name::<Request>(),
607 ?request,
608 %reply_subject,
609 "Processing request"
610 );
611 }
612
613 if let Some(response) = process(request).await
614 && let Err(error) = self.publish(reply_subject, response.encode().into()).await
615 {
616 warn!(
617 request_type = %type_name::<Request>(),
618 %error,
619 "Failed to send response"
620 );
621 }
622 }
623
624 pub async fn stream_request<Request>(
626 &self,
627 request: &Request,
628 instance: Option<&str>,
629 ) -> Result<StreamResponseSubscriber<Request::Response>, StreamRequestError>
630 where
631 Request: GenericStreamRequest,
632 {
633 let stream_request_subject = subject_with_instance(Request::SUBJECT, instance);
634 let stream_response_subject = format!("stream-response.{}", Ulid::new());
635
636 let subscriber = self
637 .inner
638 .client
639 .subscribe(stream_response_subject.clone())
640 .await?;
641
642 debug!(
643 request_type = %type_name::<Request>(),
644 %stream_request_subject,
645 %stream_response_subject,
646 ?subscriber,
647 "Stream request subscription"
648 );
649
650 self.inner
651 .client
652 .publish_with_reply(
653 stream_request_subject,
654 stream_response_subject.clone(),
655 request.encode().into(),
656 )
657 .await?;
658
659 Ok(StreamResponseSubscriber::new(
660 subscriber,
661 stream_response_subject,
662 self.clone(),
663 ))
664 }
665
666 pub async fn stream_request_responder<Request, F, S, OP>(
681 &self,
682 instance: Option<&str>,
683 queue_group: Option<String>,
684 process: OP,
685 ) -> anyhow::Result<()>
686 where
687 Request: GenericStreamRequest,
688 F: Future<Output = Option<S>> + Send,
689 S: Stream<Item = Request::Response> + Unpin,
690 OP: Fn(Request) -> F + Send + Sync,
691 {
692 let mut processing = FuturesUnordered::new();
694
695 let subscription = self
696 .common_subscribe(Request::SUBJECT, instance, queue_group)
697 .await
698 .map_err(|error| {
699 anyhow!(
700 "Failed to subscribe to {} stream requests for {instance:?}: {error}",
701 type_name::<Request>(),
702 )
703 })?;
704
705 debug!(
706 request_type = %type_name::<Request>(),
707 ?subscription,
708 "Stream requests subscription"
709 );
710 let mut subscription = subscription.fuse();
711
712 loop {
713 select! {
714 message = subscription.select_next_some() => {
715 processing.push(
717 self
718 .process_stream_request(
719 message,
720 &process,
721 )
722 .in_current_span(),
723 );
724 },
725 _ = processing.next() => {
726 },
728 complete => {
729 break;
730 }
731 }
732 }
733
734 Ok(())
735 }
736
737 async fn process_stream_request<Request, F, S, OP>(&self, message: Message, process: OP)
738 where
739 Request: GenericStreamRequest,
740 F: Future<Output = Option<S>> + Send,
741 S: Stream<Item = Request::Response> + Unpin,
742 OP: Fn(Request) -> F + Send + Sync,
743 {
744 let Some(reply_subject) = message.reply else {
745 return;
746 };
747
748 let message_payload_size = message.payload.len();
749 let request = match Request::decode(&mut message.payload.as_ref()) {
750 Ok(request) => {
751 drop(message.payload);
753 request
754 }
755 Err(error) => {
756 warn!(
757 request_type = %type_name::<Request>(),
758 %error,
759 message = %hex::encode(message.payload),
760 "Failed to decode request"
761 );
762 return;
763 }
764 };
765
766 if message_payload_size > 1024 {
768 trace!(
769 request_type = %type_name::<Request>(),
770 %reply_subject,
771 "Processing request"
772 );
773 } else {
774 trace!(
775 request_type = %type_name::<Request>(),
776 ?request,
777 %reply_subject,
778 "Processing request"
779 );
780 }
781
782 if let Some(stream) = process(request).await {
783 self.stream_response::<Request, _>(reply_subject, stream)
784 .await;
785 }
786 }
787
788 async fn stream_response<Request, S>(&self, response_subject: Subject, response_stream: S)
790 where
791 Request: GenericStreamRequest,
792 S: Stream<Item = Request::Response> + Unpin,
793 {
794 type Response<Request> =
795 GenericStreamResponses<<Request as GenericStreamRequest>::Response>;
796
797 let mut response_stream = response_stream.fuse();
798
799 let first_element = match response_stream.next().await {
801 Some(first_element) => first_element,
802 None => {
803 if let Err(error) = self
804 .publish(
805 response_subject.clone(),
806 Response::<Request>::Last {
807 index: 0,
808 responses: VecDeque::new(),
809 }
810 .encode()
811 .into(),
812 )
813 .await
814 {
815 warn!(
816 %response_subject,
817 %error,
818 request_type = %type_name::<Request>(),
819 response_type = %type_name::<Request::Response>(),
820 "Failed to send stream response"
821 );
822 }
823
824 return;
825 }
826 };
827 let max_message_size = self.inner.max_message_size;
828 let approximate_max_message_size = self.approximate_max_message_size();
829 let max_responses_per_message = approximate_max_message_size / first_element.encoded_size();
830
831 let ack_subject = format!("stream-response-ack.{}", Ulid::new());
832 let mut ack_subscription = match self.subscribe(ack_subject.clone()).await {
833 Ok(ack_subscription) => ack_subscription,
834 Err(error) => {
835 warn!(
836 %response_subject,
837 %error,
838 request_type = %type_name::<Request>(),
839 response_type = %type_name::<Request::Response>(),
840 "Failed to subscribe to ack subject"
841 );
842 return;
843 }
844 };
845 debug!(
846 %response_subject,
847 request_type = %type_name::<Request>(),
848 response_type = %type_name::<Request::Response>(),
849 ?ack_subscription,
850 "Ack subscription subscription"
851 );
852 let mut index = 0;
853 let mut buffer = VecDeque::with_capacity(max_responses_per_message);
855 buffer.push_back(first_element);
856 let mut overflow_buffer = VecDeque::new();
857
858 loop {
859 if buffer.is_empty()
861 && let Some(element) = response_stream.next().await
862 {
863 buffer.push_back(element);
864 }
865 while buffer.encoded_size() < approximate_max_message_size
866 && let Some(element) = response_stream.next().now_or_never().flatten()
867 {
868 buffer.push_back(element);
869 }
870
871 loop {
872 let is_done = response_stream.is_done() && overflow_buffer.is_empty();
873 let num_messages = buffer.len();
874 let response = if is_done {
875 Response::<Request>::Last {
876 index,
877 responses: buffer,
878 }
879 } else {
880 Response::<Request>::Continue {
881 index,
882 responses: buffer,
883 ack_subject: ack_subject.clone(),
884 }
885 };
886 let encoded_response = response.encode();
887 let encoded_response_len = encoded_response.len();
888 if encoded_response_len > max_message_size {
891 buffer = response.into();
892 if let Some(element) = buffer.pop_back() {
893 if buffer.is_empty() {
894 error!(
895 ?element,
896 encoded_response_len,
897 max_message_size,
898 "Element was too large to fit into NATS message, this is an \
899 implementation bug"
900 );
901 }
902 overflow_buffer.push_front(element);
903 continue;
904 } else {
905 error!(
906 %response_subject,
907 request_type = %type_name::<Request>(),
908 response_type = %type_name::<Request::Response>(),
909 "Empty response overflown message size, this should never happen"
910 );
911 return;
912 }
913 }
914
915 debug!(
916 %response_subject,
917 num_messages,
918 %index,
919 %is_done,
920 "Publishing stream response messages",
921 );
922
923 if let Err(error) = self
924 .publish(response_subject.clone(), encoded_response.into())
925 .await
926 {
927 warn!(
928 %response_subject,
929 %error,
930 request_type = %type_name::<Request>(),
931 response_type = %type_name::<Request::Response>(),
932 "Failed to send stream response"
933 );
934 return;
935 }
936
937 if is_done {
938 return;
939 } else {
940 buffer = response.into();
941 buffer.clear();
942 buffer.extend(overflow_buffer.drain(..));
944 }
945
946 if index >= 1 {
947 let expected_index = index - 1;
949
950 trace!(
951 %response_subject,
952 %expected_index,
953 "Waiting for acknowledgement"
954 );
955 match tokio::time::timeout(ACKNOWLEDGEMENT_TIMEOUT, ack_subscription.next())
956 .await
957 {
958 Ok(Some(message)) => {
959 if let Some(received_index) =
960 message.payload.get(..size_of::<u32>()).map(|bytes| {
961 u32::from_le_bytes(
962 bytes.try_into().expect("Correctly chunked slice; qed"),
963 )
964 })
965 {
966 debug!(
967 %response_subject,
968 %received_index,
969 "Received acknowledgement"
970 );
971 if received_index != expected_index {
972 warn!(
973 %response_subject,
974 %received_index,
975 %expected_index,
976 request_type = %type_name::<Request>(),
977 response_type = %type_name::<Request::Response>(),
978 message = %hex::encode(message.payload),
979 "Unexpected acknowledgement index"
980 );
981 return;
982 }
983 } else {
984 warn!(
985 %response_subject,
986 request_type = %type_name::<Request>(),
987 response_type = %type_name::<Request::Response>(),
988 message = %hex::encode(message.payload),
989 "Unexpected acknowledgement message"
990 );
991 return;
992 }
993 }
994 Ok(None) => {
995 warn!(
996 %response_subject,
997 request_type = %type_name::<Request>(),
998 response_type = %type_name::<Request::Response>(),
999 "Acknowledgement stream ended unexpectedly"
1000 );
1001 return;
1002 }
1003 Err(_error) => {
1004 warn!(
1005 %response_subject,
1006 %expected_index,
1007 request_type = %type_name::<Request>(),
1008 response_type = %type_name::<Request::Response>(),
1009 "Acknowledgement wait timed out"
1010 );
1011 return;
1012 }
1013 }
1014 }
1015
1016 index += 1;
1017
1018 if buffer.is_empty() {
1020 break;
1021 }
1022 }
1023 }
1024 }
1025
1026 pub async fn notification<Notification>(
1028 &self,
1029 notification: &Notification,
1030 instance: Option<&str>,
1031 ) -> Result<(), PublishError>
1032 where
1033 Notification: GenericNotification,
1034 {
1035 self.inner
1036 .client
1037 .publish(
1038 subject_with_instance(Notification::SUBJECT, instance),
1039 notification.encode().into(),
1040 )
1041 .await
1042 }
1043
1044 pub async fn broadcast<Broadcast>(
1046 &self,
1047 message: &Broadcast,
1048 instance: &str,
1049 ) -> Result<(), PublishError>
1050 where
1051 Broadcast: GenericBroadcast,
1052 {
1053 self.inner
1054 .client
1055 .publish_with_headers(
1056 Broadcast::SUBJECT.replace('*', instance),
1057 {
1058 let mut headers = HeaderMap::new();
1059 if let Some(message_id) = message.deterministic_message_id() {
1060 headers.insert("Nats-Msg-Id", message_id);
1061 }
1062 headers
1063 },
1064 message.encode().into(),
1065 )
1066 .await
1067 }
1068
1069 pub async fn subscribe_to_notifications<Notification>(
1072 &self,
1073 instance: Option<&str>,
1074 queue_group: Option<String>,
1075 ) -> Result<SubscriberWrapper<Notification>, SubscribeError>
1076 where
1077 Notification: GenericNotification,
1078 {
1079 self.simple_subscribe(Notification::SUBJECT, instance, queue_group)
1080 .await
1081 }
1082
1083 pub async fn subscribe_to_broadcasts<Broadcast>(
1086 &self,
1087 instance: Option<&str>,
1088 queue_group: Option<String>,
1089 ) -> Result<SubscriberWrapper<Broadcast>, SubscribeError>
1090 where
1091 Broadcast: GenericBroadcast,
1092 {
1093 self.simple_subscribe(Broadcast::SUBJECT, instance, queue_group)
1094 .await
1095 }
1096
1097 async fn simple_subscribe<Message>(
1100 &self,
1101 subject: &'static str,
1102 instance: Option<&str>,
1103 queue_group: Option<String>,
1104 ) -> Result<SubscriberWrapper<Message>, SubscribeError>
1105 where
1106 Message: Decode,
1107 {
1108 let subscriber = self
1109 .common_subscribe(subject, instance, queue_group)
1110 .await?;
1111 debug!(
1112 %subject,
1113 message_type = %type_name::<Message>(),
1114 ?subscriber,
1115 "Simple subscription"
1116 );
1117
1118 Ok(SubscriberWrapper {
1119 subscriber,
1120 _phantom: PhantomData,
1121 })
1122 }
1123
1124 async fn common_subscribe(
1127 &self,
1128 subject: &'static str,
1129 instance: Option<&str>,
1130 queue_group: Option<String>,
1131 ) -> Result<Subscriber, SubscribeError> {
1132 let subscriber = if let Some(queue_group) = queue_group {
1133 self.inner
1134 .client
1135 .queue_subscribe(subject_with_instance(subject, instance), queue_group)
1136 .await?
1137 } else {
1138 self.inner
1139 .client
1140 .subscribe(subject_with_instance(subject, instance))
1141 .await?
1142 };
1143
1144 Ok(subscriber)
1145 }
1146}
1147
1148fn subject_with_instance(subject: &'static str, instance: Option<&str>) -> Subject {
1149 if let Some(instance) = instance {
1150 Subject::from(subject.replace('*', instance))
1151 } else {
1152 Subject::from_static(subject)
1153 }
1154}