Skip to main content

ab_farmer/cluster/
nats_client.rs

1//! NATS client
2//!
3//! [`NatsClient`] provided here is a wrapper around [`Client`] that provides convenient methods
4//! using domain-specific traits.
5//!
6//! Before reading code, make sure to familiarize yourself with NATS documentation, especially with
7//! [subjects](https://docs.nats.io/nats-concepts/subjects) and
8//! [Core NATS](https://docs.nats.io/nats-concepts/core-nats) features.
9//!
10//! Abstractions provided here cover a few use cases:
11//! * request/response (for example piece request)
12//! * request/stream of responses (for example a stream of plotted sectors of the farmer)
13//! * notifications (typically targeting a particular instance of an app) and corresponding
14//!   subscriptions (for example solution notification)
15//! * broadcasts and corresponding subscriptions (for example slot info broadcast)
16
17use crate::utils::AsyncJoinOnDrop;
18use anyhow::anyhow;
19use async_nats::{
20    Client, ConnectOptions, HeaderMap, HeaderValue, Message, PublishError, RequestError,
21    RequestErrorKind, Subject, SubscribeError, Subscriber, ToServerAddrs,
22};
23use backon::{BackoffBuilder, ExponentialBuilder};
24use futures::channel::mpsc;
25use futures::stream::FuturesUnordered;
26use futures::{FutureExt, Stream, StreamExt, select};
27use parity_scale_codec::{Decode, Encode};
28use std::any::type_name;
29use std::collections::VecDeque;
30use std::fmt;
31use std::future::Future;
32use std::marker::PhantomData;
33use std::ops::Deref;
34use std::pin::Pin;
35use std::sync::Arc;
36use std::task::{Context, Poll};
37use std::time::Duration;
38use thiserror::Error;
39use tracing::{Instrument, debug, error, trace, warn};
40use ulid::Ulid;
41
42const EXPECTED_MESSAGE_SIZE: usize = 2 * 1024 * 1024;
43const ACKNOWLEDGEMENT_TIMEOUT: Duration = Duration::from_mins(1);
44/// Requests should time out eventually, but we should set a larger timeout to allow for spikes in
45/// load to be absorbed gracefully
46const REQUEST_TIMEOUT: Duration = Duration::from_mins(5);
47
48/// Generic request with associated response.
49///
50/// Used for cases where request/response pattern is needed and response contains a single small
51/// message. For large messages or multiple messages chunking with [`GenericStreamRequest`] can be
52/// used instead.
53pub trait GenericRequest: Encode + Decode + fmt::Debug + Send + Sync + 'static {
54    /// Request subject with optional `*` in place of application instance to receive the request
55    const SUBJECT: &'static str;
56    /// Response type that corresponds to this request
57    type Response: Encode + Decode + fmt::Debug + Send + Sync + 'static;
58}
59
60/// Generic stream request where response is streamed using
61/// [`NatsClient::stream_request_responder`].
62///
63/// Used for cases where a large payload that doesn't fit into NATS message needs to be sent or
64/// there is a very large number of messages to send. For simple request/response patten
65/// [`GenericRequest`] can be used instead.
66pub trait GenericStreamRequest: Encode + Decode + fmt::Debug + Send + Sync + 'static {
67    /// Request subject with optional `*` in place of application instance to receive the request
68    const SUBJECT: &'static str;
69    /// Response type that corresponds to this stream request.
70    ///
71    /// These responses are send as a stream of messages, each message must fit into NATS message,
72    /// [`NatsClient::approximate_max_message_size()`] can be used to estimate appropriate message
73    /// size in case chunking is needed.
74    type Response: Encode + Decode + fmt::Debug + Send + Sync + 'static;
75}
76
77/// Messages sent in response to [`GenericStreamRequest`].
78///
79/// Empty list of responses means the end of the stream.
80#[derive(Debug, Encode, Decode)]
81enum GenericStreamResponses<Response> {
82    /// Some responses, but the stream didn't end yet
83    Continue {
84        /// Monotonically increasing index of responses in a stream
85        index: u32,
86        /// Individual responses
87        responses: VecDeque<Response>,
88        /// Subject where to send acknowledgement of received stream response indices, which acts
89        /// as a backpressure mechanism
90        ack_subject: String,
91    },
92    /// Remaining responses and this is the end of the stream.
93    Last {
94        /// Monotonically increasing index of responses in a stream
95        index: u32,
96        /// Individual responses
97        responses: VecDeque<Response>,
98    },
99}
100
101impl<Response> From<GenericStreamResponses<Response>> for VecDeque<Response> {
102    #[inline]
103    fn from(value: GenericStreamResponses<Response>) -> Self {
104        match value {
105            GenericStreamResponses::Continue { responses, .. } => responses,
106            GenericStreamResponses::Last { responses, .. } => responses,
107        }
108    }
109}
110
111impl<Response> GenericStreamResponses<Response> {
112    fn next(&mut self) -> Option<Response> {
113        match self {
114            GenericStreamResponses::Continue { responses, .. } => responses.pop_front(),
115            GenericStreamResponses::Last { responses, .. } => responses.pop_front(),
116        }
117    }
118
119    fn index(&self) -> u32 {
120        match self {
121            GenericStreamResponses::Continue { index, .. } => *index,
122            GenericStreamResponses::Last { index, .. } => *index,
123        }
124    }
125
126    fn ack_subject(&self) -> Option<&str> {
127        if let GenericStreamResponses::Continue { ack_subject, .. } = self {
128            Some(ack_subject)
129        } else {
130            None
131        }
132    }
133
134    fn is_last(&self) -> bool {
135        matches!(self, Self::Last { .. })
136    }
137}
138
139/// Stream request error
140#[derive(Debug, Error)]
141pub enum StreamRequestError {
142    /// Subscribe error
143    #[error("Subscribe error: {0}")]
144    Subscribe(#[from] SubscribeError),
145    /// Publish error
146    #[error("Publish error: {0}")]
147    Publish(#[from] PublishError),
148}
149
150/// Wrapper around subscription that transforms stream of wrapped response messages into a normal
151/// `Response` stream.
152#[derive(Debug)]
153#[pin_project::pin_project]
154pub struct StreamResponseSubscriber<Response> {
155    #[pin]
156    subscriber: Subscriber,
157    response_subject: String,
158    buffered_responses: Option<GenericStreamResponses<Response>>,
159    next_index: u32,
160    acknowledgement_sender: mpsc::UnboundedSender<(String, u32)>,
161    _background_task: AsyncJoinOnDrop<()>,
162    _phantom: PhantomData<Response>,
163}
164
165impl<Response> Stream for StreamResponseSubscriber<Response>
166where
167    Response: Decode,
168{
169    type Item = Response;
170
171    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
172        if let Some(buffered_responses) = self.buffered_responses.as_mut() {
173            if let Some(response) = buffered_responses.next() {
174                return Poll::Ready(Some(response));
175            } else if buffered_responses.is_last() {
176                return Poll::Ready(None);
177            }
178
179            self.buffered_responses.take();
180            self.next_index += 1;
181        }
182
183        let mut projected = self.project();
184        match projected.subscriber.poll_next_unpin(cx) {
185            Poll::Ready(Some(message)) => {
186                match GenericStreamResponses::<Response>::decode(&mut message.payload.as_ref()) {
187                    Ok(mut responses) => {
188                        if responses.index() != *projected.next_index {
189                            warn!(
190                                actual_index = %responses.index(),
191                                expected_index = %*projected.next_index,
192                                message_type = %type_name::<Response>(),
193                                response_subject = %projected.response_subject,
194                                "Received unexpected response stream index, aborting stream"
195                            );
196
197                            return Poll::Ready(None);
198                        }
199
200                        if let Some(ack_subject) = responses.ack_subject() {
201                            let index = responses.index();
202                            let ack_subject = ack_subject.to_string();
203
204                            if let Err(error) = projected
205                                .acknowledgement_sender
206                                .unbounded_send((ack_subject.clone(), index))
207                            {
208                                warn!(
209                                    %error,
210                                    %index,
211                                    message_type = %type_name::<Response>(),
212                                    response_subject = %projected.response_subject,
213                                    %ack_subject,
214                                    "Failed to send acknowledgement for stream response"
215                                );
216                            }
217                        }
218
219                        if let Some(response) = responses.next() {
220                            *projected.buffered_responses = Some(responses);
221                            Poll::Ready(Some(response))
222                        } else {
223                            Poll::Ready(None)
224                        }
225                    }
226                    Err(error) => {
227                        warn!(
228                            %error,
229                            response_type = %type_name::<Response>(),
230                            response_subject = %projected.response_subject,
231                            message = %hex::encode(message.payload),
232                            "Failed to decode stream response"
233                        );
234
235                        Poll::Ready(None)
236                    }
237                }
238            }
239            Poll::Ready(None) => Poll::Ready(None),
240            Poll::Pending => Poll::Pending,
241        }
242    }
243}
244
245impl<Response> StreamResponseSubscriber<Response> {
246    fn new(subscriber: Subscriber, response_subject: String, nats_client: NatsClient) -> Self {
247        let (acknowledgement_sender, mut acknowledgement_receiver) =
248            mpsc::unbounded::<(String, u32)>();
249
250        let ack_publisher_fut = {
251            let response_subject = response_subject.clone();
252
253            async move {
254                while let Some((subject, index)) = acknowledgement_receiver.next().await {
255                    trace!(
256                        %subject,
257                        %index,
258                        %response_subject,
259                        %index,
260                        "Sending stream response acknowledgement"
261                    );
262                    if let Err(error) = nats_client
263                        .publish(subject.clone(), index.to_le_bytes().to_vec().into())
264                        .await
265                    {
266                        warn!(
267                            %error,
268                            %subject,
269                            %index,
270                            %response_subject,
271                            %index,
272                            "Failed to send stream response acknowledgement"
273                        );
274                        return;
275                    }
276                }
277            }
278        };
279        let background_task =
280            AsyncJoinOnDrop::new(tokio::spawn(ack_publisher_fut.in_current_span()), true);
281
282        Self {
283            response_subject,
284            subscriber,
285            buffered_responses: None,
286            next_index: 0,
287            acknowledgement_sender,
288            _background_task: background_task,
289            _phantom: PhantomData,
290        }
291    }
292}
293
294/// Generic one-off notification
295pub trait GenericNotification: Encode + Decode + fmt::Debug + Send + Sync + 'static {
296    /// Notification subject with optional `*` in place of application instance receiving the
297    /// request
298    const SUBJECT: &'static str;
299}
300
301/// Generic broadcast message.
302///
303/// Broadcast messages are sent by an instance to (potentially) an instance-specific subject that
304/// any other app can subscribe to. The same broadcast message can also originate from multiple
305/// places and be de-duplicated using [`Self::deterministic_message_id`].
306pub trait GenericBroadcast: Encode + Decode + fmt::Debug + Send + Sync + 'static {
307    /// Broadcast subject with optional `*` in place of application instance sending broadcast
308    const SUBJECT: &'static str;
309
310    /// Deterministic message ID that is used for de-duplicating messages broadcast by different
311    /// instances
312    fn deterministic_message_id(&self) -> Option<HeaderValue> {
313        None
314    }
315}
316
317/// Subscriber wrapper that decodes messages automatically and skips messages that can't be decoded
318#[derive(Debug)]
319#[pin_project::pin_project]
320pub struct SubscriberWrapper<Message> {
321    #[pin]
322    subscriber: Subscriber,
323    _phantom: PhantomData<Message>,
324}
325
326impl<Message> Stream for SubscriberWrapper<Message>
327where
328    Message: Decode,
329{
330    type Item = Message;
331
332    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
333        match self.project().subscriber.poll_next_unpin(cx) {
334            Poll::Ready(Some(message)) => match Message::decode(&mut message.payload.as_ref()) {
335                Ok(message) => Poll::Ready(Some(message)),
336                Err(error) => {
337                    warn!(
338                        %error,
339                        message_type = %type_name::<Message>(),
340                        message = %hex::encode(message.payload),
341                        "Failed to decode stream message"
342                    );
343
344                    Poll::Pending
345                }
346            },
347            Poll::Ready(None) => Poll::Ready(None),
348            Poll::Pending => Poll::Pending,
349        }
350    }
351}
352
353#[derive(Debug)]
354struct Inner {
355    client: Client,
356    request_retry_backoff_policy: ExponentialBuilder,
357    approximate_max_message_size: usize,
358    max_message_size: usize,
359}
360
361/// NATS client wrapper that can be used to interact with other cluster-specific clients
362#[derive(Debug, Clone)]
363pub struct NatsClient {
364    inner: Arc<Inner>,
365}
366
367impl Deref for NatsClient {
368    type Target = Client;
369
370    #[inline]
371    fn deref(&self) -> &Self::Target {
372        &self.inner.client
373    }
374}
375
376impl NatsClient {
377    /// Create a new instance by connecting to specified addresses
378    pub async fn new<A>(
379        addrs: A,
380        request_retry_backoff_policy: ExponentialBuilder,
381    ) -> Result<Self, async_nats::Error>
382    where
383        A: ToServerAddrs,
384    {
385        let servers = addrs.to_server_addrs()?.collect::<Vec<_>>();
386        Self::from_client(
387            async_nats::connect_with_options(
388                &servers,
389                ConnectOptions::default().request_timeout(Some(REQUEST_TIMEOUT)),
390            )
391            .await?,
392            request_retry_backoff_policy,
393        )
394    }
395
396    /// Create new client from existing NATS instance
397    pub fn from_client(
398        client: Client,
399        request_retry_backoff_policy: ExponentialBuilder,
400    ) -> Result<Self, async_nats::Error> {
401        let max_payload = client.server_info().max_payload;
402        if max_payload < EXPECTED_MESSAGE_SIZE {
403            return Err(format!(
404                "Max payload {max_payload} is smaller than expected {EXPECTED_MESSAGE_SIZE}, \
405                increase it by specifying max_payload = 2MB or higher number in NATS configuration"
406            )
407            .into());
408        }
409
410        let inner = Inner {
411            client,
412            request_retry_backoff_policy,
413            // Allow up to 90%, the rest will be wrapper data structures, etc.
414            approximate_max_message_size: max_payload * 9 / 10,
415            // Allow up to 90%, the rest will be wrapper data structures, etc.
416            max_message_size: max_payload,
417        };
418
419        Ok(Self {
420            inner: Arc::new(inner),
421        })
422    }
423
424    /// Approximate max message size (a few more bytes will not hurt), the actual limit is expected
425    /// to be a bit higher
426    pub fn approximate_max_message_size(&self) -> usize {
427        self.inner.approximate_max_message_size
428    }
429
430    /// Make request and wait for response
431    pub async fn request<Request>(
432        &self,
433        request: &Request,
434        instance: Option<&str>,
435    ) -> Result<Request::Response, RequestError>
436    where
437        Request: GenericRequest,
438    {
439        let subject = subject_with_instance(Request::SUBJECT, instance);
440        let mut maybe_retry_backoff = None;
441        let message = loop {
442            match self
443                .inner
444                .client
445                .request(subject.clone(), request.encode().into())
446                .await
447            {
448                Ok(message) => {
449                    break message;
450                }
451                Err(error) => {
452                    match error.kind() {
453                        RequestErrorKind::TimedOut | RequestErrorKind::NoResponders => {
454                            // Continue with retries
455                        }
456                        RequestErrorKind::InvalidSubject
457                        | RequestErrorKind::MaxPayloadExceeded
458                        | RequestErrorKind::Other => {
459                            return Err(error);
460                        }
461                    }
462
463                    let retry_backoff = maybe_retry_backoff
464                        .get_or_insert_with(|| self.inner.request_retry_backoff_policy.build());
465
466                    if let Some(delay) = retry_backoff.next() {
467                        debug!(
468                            %subject,
469                            %error,
470                            request_type = %type_name::<Request>(),
471                            ?delay,
472                            "Failed to make request, retrying after some delay"
473                        );
474
475                        tokio::time::sleep(delay).await;
476                    } else {
477                        return Err(error);
478                    }
479                }
480            }
481        };
482
483        let response =
484            Request::Response::decode(&mut message.payload.as_ref()).map_err(|error| {
485                warn!(
486                    %subject,
487                    %error,
488                    response_type = %type_name::<Request::Response>(),
489                    response = %hex::encode(message.payload),
490                    "Response decoding failed"
491                );
492
493                RequestErrorKind::Other
494            })?;
495
496        Ok(response)
497    }
498
499    /// Responds to requests from the given subject using the provided processing function.
500    ///
501    /// This will create a subscription on the subject for the given instance (if provided) and
502    /// queue group. Incoming messages will be deserialized as the request type `Request` and passed
503    /// to the `process` function to produce a response of type `Request::Response`. The response
504    /// will then be sent back on the reply subject from the original request.
505    ///
506    /// Each request is processed in a newly created async tokio task.
507    ///
508    /// # Arguments
509    ///
510    /// * `instance` - Optional instance name to use in place of the `*` in the subject
511    /// * `group` - The queue group name for the subscription
512    /// * `process` - The function to call with the decoded request to produce a response
513    pub async fn request_responder<Request, F, OP>(
514        &self,
515        instance: Option<&str>,
516        queue_group: Option<String>,
517        process: OP,
518    ) -> anyhow::Result<()>
519    where
520        Request: GenericRequest,
521        F: Future<Output = Option<Request::Response>> + Send,
522        OP: Fn(Request) -> F + Send + Sync,
523    {
524        // Initialize with pending future so it never ends
525        let mut processing = FuturesUnordered::new();
526
527        let subscription = self
528            .common_subscribe(Request::SUBJECT, instance, queue_group)
529            .await
530            .map_err(|error| {
531                anyhow!(
532                    "Failed to subscribe to {} requests for {instance:?}: {error}",
533                    type_name::<Request>(),
534                )
535            })?;
536
537        debug!(
538            request_type = %type_name::<Request>(),
539            ?subscription,
540            "Requests subscription"
541        );
542        let mut subscription = subscription.fuse();
543
544        loop {
545            select! {
546                message = subscription.select_next_some() => {
547                    // Create background task for concurrent processing
548                    processing.push(
549                        self
550                            .process_request(
551                                message,
552                                &process,
553                            )
554                            .in_current_span(),
555                    );
556                },
557                _ = processing.next() => {
558                    // Nothing to do here
559                },
560                complete => {
561                    break;
562                }
563            }
564        }
565
566        Ok(())
567    }
568
569    async fn process_request<Request, F, OP>(&self, message: Message, process: OP)
570    where
571        Request: GenericRequest,
572        F: Future<Output = Option<Request::Response>> + Send,
573        OP: Fn(Request) -> F + Send + Sync,
574    {
575        let Some(reply_subject) = message.reply else {
576            return;
577        };
578
579        let message_payload_size = message.payload.len();
580        let request = match Request::decode(&mut message.payload.as_ref()) {
581            Ok(request) => {
582                // Free allocation early
583                drop(message.payload);
584                request
585            }
586            Err(error) => {
587                warn!(
588                    request_type = %type_name::<Request>(),
589                    %error,
590                    message = %hex::encode(message.payload),
591                    "Failed to decode request"
592                );
593                return;
594            }
595        };
596
597        // Avoid printing large messages in logs
598        if message_payload_size > 1024 {
599            trace!(
600                request_type = %type_name::<Request>(),
601                %reply_subject,
602                "Processing request"
603            );
604        } else {
605            trace!(
606                request_type = %type_name::<Request>(),
607                ?request,
608                %reply_subject,
609                "Processing request"
610            );
611        }
612
613        if let Some(response) = process(request).await
614            && let Err(error) = self.publish(reply_subject, response.encode().into()).await
615        {
616            warn!(
617                request_type = %type_name::<Request>(),
618                %error,
619                "Failed to send response"
620            );
621        }
622    }
623
624    /// Make request that expects stream response
625    pub async fn stream_request<Request>(
626        &self,
627        request: &Request,
628        instance: Option<&str>,
629    ) -> Result<StreamResponseSubscriber<Request::Response>, StreamRequestError>
630    where
631        Request: GenericStreamRequest,
632    {
633        let stream_request_subject = subject_with_instance(Request::SUBJECT, instance);
634        let stream_response_subject = format!("stream-response.{}", Ulid::new());
635
636        let subscriber = self
637            .inner
638            .client
639            .subscribe(stream_response_subject.clone())
640            .await?;
641
642        debug!(
643            request_type = %type_name::<Request>(),
644            %stream_request_subject,
645            %stream_response_subject,
646            ?subscriber,
647            "Stream request subscription"
648        );
649
650        self.inner
651            .client
652            .publish_with_reply(
653                stream_request_subject,
654                stream_response_subject.clone(),
655                request.encode().into(),
656            )
657            .await?;
658
659        Ok(StreamResponseSubscriber::new(
660            subscriber,
661            stream_response_subject,
662            self.clone(),
663        ))
664    }
665
666    /// Responds to stream requests from the given subject using the provided processing function.
667    ///
668    /// This will create a subscription on the subject for the given instance (if provided) and
669    /// queue group. Incoming messages will be deserialized as the request type `Request` and passed
670    /// to the `process` function to produce a stream response of type `Request::Response`. The
671    /// stream response will then be sent back on the reply subject from the original request.
672    ///
673    /// Each request is processed in a newly created async tokio task.
674    ///
675    /// # Arguments
676    ///
677    /// * `instance` - Optional instance name to use in place of the `*` in the subject
678    /// * `group` - The queue group name for the subscription
679    /// * `process` - The function to call with the decoded request to produce a response
680    pub async fn stream_request_responder<Request, F, S, OP>(
681        &self,
682        instance: Option<&str>,
683        queue_group: Option<String>,
684        process: OP,
685    ) -> anyhow::Result<()>
686    where
687        Request: GenericStreamRequest,
688        F: Future<Output = Option<S>> + Send,
689        S: Stream<Item = Request::Response> + Unpin,
690        OP: Fn(Request) -> F + Send + Sync,
691    {
692        // Initialize with pending future so it never ends
693        let mut processing = FuturesUnordered::new();
694
695        let subscription = self
696            .common_subscribe(Request::SUBJECT, instance, queue_group)
697            .await
698            .map_err(|error| {
699                anyhow!(
700                    "Failed to subscribe to {} stream requests for {instance:?}: {error}",
701                    type_name::<Request>(),
702                )
703            })?;
704
705        debug!(
706            request_type = %type_name::<Request>(),
707            ?subscription,
708            "Stream requests subscription"
709        );
710        let mut subscription = subscription.fuse();
711
712        loop {
713            select! {
714                message = subscription.select_next_some() => {
715                    // Create background task for concurrent processing
716                    processing.push(
717                        self
718                        .process_stream_request(
719                            message,
720                            &process,
721                        )
722                        .in_current_span(),
723                    );
724                },
725                _ = processing.next() => {
726                    // Nothing to do here
727                },
728                complete => {
729                    break;
730                }
731            }
732        }
733
734        Ok(())
735    }
736
737    async fn process_stream_request<Request, F, S, OP>(&self, message: Message, process: OP)
738    where
739        Request: GenericStreamRequest,
740        F: Future<Output = Option<S>> + Send,
741        S: Stream<Item = Request::Response> + Unpin,
742        OP: Fn(Request) -> F + Send + Sync,
743    {
744        let Some(reply_subject) = message.reply else {
745            return;
746        };
747
748        let message_payload_size = message.payload.len();
749        let request = match Request::decode(&mut message.payload.as_ref()) {
750            Ok(request) => {
751                // Free allocation early
752                drop(message.payload);
753                request
754            }
755            Err(error) => {
756                warn!(
757                    request_type = %type_name::<Request>(),
758                    %error,
759                    message = %hex::encode(message.payload),
760                    "Failed to decode request"
761                );
762                return;
763            }
764        };
765
766        // Avoid printing large messages in logs
767        if message_payload_size > 1024 {
768            trace!(
769                request_type = %type_name::<Request>(),
770                %reply_subject,
771                "Processing request"
772            );
773        } else {
774            trace!(
775                request_type = %type_name::<Request>(),
776                ?request,
777                %reply_subject,
778                "Processing request"
779            );
780        }
781
782        if let Some(stream) = process(request).await {
783            self.stream_response::<Request, _>(reply_subject, stream)
784                .await;
785        }
786    }
787
788    /// Helper method to send responses to requests initiated with [`Self::stream_request`]
789    async fn stream_response<Request, S>(&self, response_subject: Subject, response_stream: S)
790    where
791        Request: GenericStreamRequest,
792        S: Stream<Item = Request::Response> + Unpin,
793    {
794        type Response<Request> =
795            GenericStreamResponses<<Request as GenericStreamRequest>::Response>;
796
797        let mut response_stream = response_stream.fuse();
798
799        // Pull the first element to measure response size
800        let Some(first_element) = response_stream.next().await else {
801            if let Err(error) = self
802                .publish(
803                    response_subject.clone(),
804                    Response::<Request>::Last {
805                        index: 0,
806                        responses: VecDeque::new(),
807                    }
808                    .encode()
809                    .into(),
810                )
811                .await
812            {
813                warn!(
814                    %response_subject,
815                    %error,
816                    request_type = %type_name::<Request>(),
817                    response_type = %type_name::<Request::Response>(),
818                    "Failed to send stream response"
819                );
820            }
821
822            return;
823        };
824        let max_message_size = self.inner.max_message_size;
825        let approximate_max_message_size = self.approximate_max_message_size();
826        let max_responses_per_message = approximate_max_message_size / first_element.encoded_size();
827
828        let ack_subject = format!("stream-response-ack.{}", Ulid::new());
829        let mut ack_subscription = match self.subscribe(ack_subject.clone()).await {
830            Ok(ack_subscription) => ack_subscription,
831            Err(error) => {
832                warn!(
833                    %response_subject,
834                    %error,
835                    request_type = %type_name::<Request>(),
836                    response_type = %type_name::<Request::Response>(),
837                    "Failed to subscribe to ack subject"
838                );
839                return;
840            }
841        };
842        debug!(
843            %response_subject,
844            request_type = %type_name::<Request>(),
845            response_type = %type_name::<Request::Response>(),
846            ?ack_subscription,
847            "Ack subscription subscription"
848        );
849        let mut index = 0;
850        // Initialize buffer that will be reused for responses
851        let mut buffer = VecDeque::with_capacity(max_responses_per_message);
852        buffer.push_back(first_element);
853        let mut overflow_buffer = VecDeque::new();
854
855        loop {
856            // Try to fill the buffer
857            if buffer.is_empty()
858                && let Some(element) = response_stream.next().await
859            {
860                buffer.push_back(element);
861            }
862            while buffer.encoded_size() < approximate_max_message_size
863                && let Some(element) = response_stream.next().now_or_never().flatten()
864            {
865                buffer.push_back(element);
866            }
867
868            loop {
869                let is_done = response_stream.is_done() && overflow_buffer.is_empty();
870                let num_messages = buffer.len();
871                let response = if is_done {
872                    Response::<Request>::Last {
873                        index,
874                        responses: buffer,
875                    }
876                } else {
877                    Response::<Request>::Continue {
878                        index,
879                        responses: buffer,
880                        ack_subject: ack_subject.clone(),
881                    }
882                };
883                let encoded_response = response.encode();
884                let encoded_response_len = encoded_response.len();
885                // When encoded response is too large, remove one of the responses from it and try
886                // again
887                if encoded_response_len > max_message_size {
888                    buffer = response.into();
889                    if let Some(element) = buffer.pop_back() {
890                        if buffer.is_empty() {
891                            error!(
892                                ?element,
893                                encoded_response_len,
894                                max_message_size,
895                                "Element was too large to fit into NATS message, this is an \
896                                implementation bug"
897                            );
898                        }
899                        overflow_buffer.push_front(element);
900                        continue;
901                    }
902
903                    error!(
904                        %response_subject,
905                        request_type = %type_name::<Request>(),
906                        response_type = %type_name::<Request::Response>(),
907                        "Empty response overflown message size, this should never happen"
908                    );
909                    return;
910                }
911
912                debug!(
913                    %response_subject,
914                    num_messages,
915                    %index,
916                    %is_done,
917                    "Publishing stream response messages",
918                );
919
920                if let Err(error) = self
921                    .publish(response_subject.clone(), encoded_response.into())
922                    .await
923                {
924                    warn!(
925                        %response_subject,
926                        %error,
927                        request_type = %type_name::<Request>(),
928                        response_type = %type_name::<Request::Response>(),
929                        "Failed to send stream response"
930                    );
931                    return;
932                }
933
934                if is_done {
935                    return;
936                }
937
938                buffer = response.into();
939                buffer.clear();
940                // Fill buffer with any overflown responses that may have been stored
941                buffer.extend(overflow_buffer.drain(..));
942
943                if index >= 1 {
944                    // Acknowledgements are received with delay
945                    let expected_index = index - 1;
946
947                    trace!(
948                        %response_subject,
949                        %expected_index,
950                        "Waiting for acknowledgement"
951                    );
952                    match tokio::time::timeout(ACKNOWLEDGEMENT_TIMEOUT, ack_subscription.next())
953                        .await
954                    {
955                        Ok(Some(message)) => {
956                            if let Some(received_index) =
957                                message.payload.get(..size_of::<u32>()).map(|bytes| {
958                                    u32::from_le_bytes(
959                                        bytes.try_into().expect("Correctly chunked slice; qed"),
960                                    )
961                                })
962                            {
963                                debug!(
964                                    %response_subject,
965                                    %received_index,
966                                    "Received acknowledgement"
967                                );
968                                if received_index != expected_index {
969                                    warn!(
970                                        %response_subject,
971                                        %received_index,
972                                        %expected_index,
973                                        request_type = %type_name::<Request>(),
974                                        response_type = %type_name::<Request::Response>(),
975                                        message = %hex::encode(message.payload),
976                                        "Unexpected acknowledgement index"
977                                    );
978                                    return;
979                                }
980                            } else {
981                                warn!(
982                                    %response_subject,
983                                    request_type = %type_name::<Request>(),
984                                    response_type = %type_name::<Request::Response>(),
985                                    message = %hex::encode(message.payload),
986                                    "Unexpected acknowledgement message"
987                                );
988                                return;
989                            }
990                        }
991                        Ok(None) => {
992                            warn!(
993                                %response_subject,
994                                request_type = %type_name::<Request>(),
995                                response_type = %type_name::<Request::Response>(),
996                                "Acknowledgement stream ended unexpectedly"
997                            );
998                            return;
999                        }
1000                        Err(_error) => {
1001                            warn!(
1002                                %response_subject,
1003                                %expected_index,
1004                                request_type = %type_name::<Request>(),
1005                                response_type = %type_name::<Request::Response>(),
1006                                "Acknowledgement wait timed out"
1007                            );
1008                            return;
1009                        }
1010                    }
1011                }
1012
1013                index += 1;
1014
1015                // Unless `overflow_buffer` wasn't empty abort inner loop
1016                if buffer.is_empty() {
1017                    break;
1018                }
1019            }
1020        }
1021    }
1022
1023    /// Make notification without waiting for response
1024    pub async fn notification<Notification>(
1025        &self,
1026        notification: &Notification,
1027        instance: Option<&str>,
1028    ) -> Result<(), PublishError>
1029    where
1030        Notification: GenericNotification,
1031    {
1032        self.inner
1033            .client
1034            .publish(
1035                subject_with_instance(Notification::SUBJECT, instance),
1036                notification.encode().into(),
1037            )
1038            .await
1039    }
1040
1041    /// Send a broadcast message
1042    pub async fn broadcast<Broadcast>(
1043        &self,
1044        message: &Broadcast,
1045        instance: &str,
1046    ) -> Result<(), PublishError>
1047    where
1048        Broadcast: GenericBroadcast,
1049    {
1050        self.inner
1051            .client
1052            .publish_with_headers(
1053                Broadcast::SUBJECT.replace('*', instance),
1054                {
1055                    let mut headers = HeaderMap::new();
1056                    if let Some(message_id) = message.deterministic_message_id() {
1057                        headers.insert("Nats-Msg-Id", message_id);
1058                    }
1059                    headers
1060                },
1061                message.encode().into(),
1062            )
1063            .await
1064    }
1065
1066    /// Simple subscription that will produce decoded notifications, while skipping messages that
1067    /// fail to decode
1068    pub async fn subscribe_to_notifications<Notification>(
1069        &self,
1070        instance: Option<&str>,
1071        queue_group: Option<String>,
1072    ) -> Result<SubscriberWrapper<Notification>, SubscribeError>
1073    where
1074        Notification: GenericNotification,
1075    {
1076        self.simple_subscribe(Notification::SUBJECT, instance, queue_group)
1077            .await
1078    }
1079
1080    /// Simple subscription that will produce decoded broadcasts, while skipping messages that
1081    /// fail to decode
1082    pub async fn subscribe_to_broadcasts<Broadcast>(
1083        &self,
1084        instance: Option<&str>,
1085        queue_group: Option<String>,
1086    ) -> Result<SubscriberWrapper<Broadcast>, SubscribeError>
1087    where
1088        Broadcast: GenericBroadcast,
1089    {
1090        self.simple_subscribe(Broadcast::SUBJECT, instance, queue_group)
1091            .await
1092    }
1093
1094    /// Simple subscription that will produce decoded messages, while skipping messages that fail to
1095    /// decode
1096    async fn simple_subscribe<Message>(
1097        &self,
1098        subject: &'static str,
1099        instance: Option<&str>,
1100        queue_group: Option<String>,
1101    ) -> Result<SubscriberWrapper<Message>, SubscribeError>
1102    where
1103        Message: Decode,
1104    {
1105        let subscriber = self
1106            .common_subscribe(subject, instance, queue_group)
1107            .await?;
1108        debug!(
1109            %subject,
1110            message_type = %type_name::<Message>(),
1111            ?subscriber,
1112            "Simple subscription"
1113        );
1114
1115        Ok(SubscriberWrapper {
1116            subscriber,
1117            _phantom: PhantomData,
1118        })
1119    }
1120
1121    /// Simple subscription that will produce decoded messages, while skipping messages that fail to
1122    /// decode
1123    async fn common_subscribe(
1124        &self,
1125        subject: &'static str,
1126        instance: Option<&str>,
1127        queue_group: Option<String>,
1128    ) -> Result<Subscriber, SubscribeError> {
1129        let subscriber = if let Some(queue_group) = queue_group {
1130            self.inner
1131                .client
1132                .queue_subscribe(subject_with_instance(subject, instance), queue_group)
1133                .await?
1134        } else {
1135            self.inner
1136                .client
1137                .subscribe(subject_with_instance(subject, instance))
1138                .await?
1139        };
1140
1141        Ok(subscriber)
1142    }
1143}
1144
1145fn subject_with_instance(subject: &'static str, instance: Option<&str>) -> Subject {
1146    if let Some(instance) = instance {
1147        Subject::from(subject.replace('*', instance))
1148    } else {
1149        Subject::from_static(subject)
1150    }
1151}