ab_farmer/cluster/
nats_client.rs

1//! NATS client
2//!
3//! [`NatsClient`] provided here is a wrapper around [`Client`] that provides convenient methods
4//! using domain-specific traits.
5//!
6//! Before reading code, make sure to familiarize yourself with NATS documentation, especially with
7//! [subjects](https://docs.nats.io/nats-concepts/subjects) and
8//! [Core NATS](https://docs.nats.io/nats-concepts/core-nats) features.
9//!
10//! Abstractions provided here cover a few use cases:
11//! * request/response (for example piece request)
12//! * request/stream of responses (for example a stream of plotted sectors of the farmer)
13//! * notifications (typically targeting a particular instance of an app) and corresponding
14//!   subscriptions (for example solution notification)
15//! * broadcasts and corresponding subscriptions (for example slot info broadcast)
16
17use crate::utils::AsyncJoinOnDrop;
18use anyhow::anyhow;
19use async_nats::{
20    Client, ConnectOptions, HeaderMap, HeaderValue, Message, PublishError, RequestError,
21    RequestErrorKind, Subject, SubscribeError, Subscriber, ToServerAddrs,
22};
23use backoff::ExponentialBackoff;
24use backoff::backoff::Backoff;
25use futures::channel::mpsc;
26use futures::stream::FuturesUnordered;
27use futures::{FutureExt, Stream, StreamExt, select};
28use parity_scale_codec::{Decode, Encode};
29use std::any::type_name;
30use std::collections::VecDeque;
31use std::fmt;
32use std::future::Future;
33use std::marker::PhantomData;
34use std::ops::Deref;
35use std::pin::Pin;
36use std::sync::Arc;
37use std::task::{Context, Poll};
38use std::time::Duration;
39use thiserror::Error;
40use tracing::{Instrument, debug, error, trace, warn};
41use ulid::Ulid;
42
43const EXPECTED_MESSAGE_SIZE: usize = 2 * 1024 * 1024;
44const ACKNOWLEDGEMENT_TIMEOUT: Duration = Duration::from_mins(1);
45/// Requests should time out eventually, but we should set a larger timeout to allow for spikes in
46/// load to be absorbed gracefully
47const REQUEST_TIMEOUT: Duration = Duration::from_mins(5);
48
49/// Generic request with associated response.
50///
51/// Used for cases where request/response pattern is needed and response contains a single small
52/// message. For large messages or multiple messages chunking with [`GenericStreamRequest`] can be
53/// used instead.
54pub trait GenericRequest: Encode + Decode + fmt::Debug + Send + Sync + 'static {
55    /// Request subject with optional `*` in place of application instance to receive the request
56    const SUBJECT: &'static str;
57    /// Response type that corresponds to this request
58    type Response: Encode + Decode + fmt::Debug + Send + Sync + 'static;
59}
60
61/// Generic stream request where response is streamed using
62/// [`NatsClient::stream_request_responder`].
63///
64/// Used for cases where a large payload that doesn't fit into NATS message needs to be sent or
65/// there is a very large number of messages to send. For simple request/response patten
66/// [`GenericRequest`] can be used instead.
67pub trait GenericStreamRequest: Encode + Decode + fmt::Debug + Send + Sync + 'static {
68    /// Request subject with optional `*` in place of application instance to receive the request
69    const SUBJECT: &'static str;
70    /// Response type that corresponds to this stream request.
71    ///
72    /// These responses are send as a stream of messages, each message must fit into NATS message,
73    /// [`NatsClient::approximate_max_message_size()`] can be used to estimate appropriate message
74    /// size in case chunking is needed.
75    type Response: Encode + Decode + fmt::Debug + Send + Sync + 'static;
76}
77
78/// Messages sent in response to [`GenericStreamRequest`].
79///
80/// Empty list of responses means the end of the stream.
81#[derive(Debug, Encode, Decode)]
82enum GenericStreamResponses<Response> {
83    /// Some responses, but the stream didn't end yet
84    Continue {
85        /// Monotonically increasing index of responses in a stream
86        index: u32,
87        /// Individual responses
88        responses: VecDeque<Response>,
89        /// Subject where to send acknowledgement of received stream response indices, which acts
90        /// as a backpressure mechanism
91        ack_subject: String,
92    },
93    /// Remaining responses and this is the end of the stream.
94    Last {
95        /// Monotonically increasing index of responses in a stream
96        index: u32,
97        /// Individual responses
98        responses: VecDeque<Response>,
99    },
100}
101
102impl<Response> From<GenericStreamResponses<Response>> for VecDeque<Response> {
103    #[inline]
104    fn from(value: GenericStreamResponses<Response>) -> Self {
105        match value {
106            GenericStreamResponses::Continue { responses, .. } => responses,
107            GenericStreamResponses::Last { responses, .. } => responses,
108        }
109    }
110}
111
112impl<Response> GenericStreamResponses<Response> {
113    fn next(&mut self) -> Option<Response> {
114        match self {
115            GenericStreamResponses::Continue { responses, .. } => responses.pop_front(),
116            GenericStreamResponses::Last { responses, .. } => responses.pop_front(),
117        }
118    }
119
120    fn index(&self) -> u32 {
121        match self {
122            GenericStreamResponses::Continue { index, .. } => *index,
123            GenericStreamResponses::Last { index, .. } => *index,
124        }
125    }
126
127    fn ack_subject(&self) -> Option<&str> {
128        if let GenericStreamResponses::Continue { ack_subject, .. } = self {
129            Some(ack_subject)
130        } else {
131            None
132        }
133    }
134
135    fn is_last(&self) -> bool {
136        matches!(self, Self::Last { .. })
137    }
138}
139
140/// Stream request error
141#[derive(Debug, Error)]
142pub enum StreamRequestError {
143    /// Subscribe error
144    #[error("Subscribe error: {0}")]
145    Subscribe(#[from] SubscribeError),
146    /// Publish error
147    #[error("Publish error: {0}")]
148    Publish(#[from] PublishError),
149}
150
151/// Wrapper around subscription that transforms stream of wrapped response messages into a normal
152/// `Response` stream.
153#[derive(Debug)]
154#[pin_project::pin_project]
155pub struct StreamResponseSubscriber<Response> {
156    #[pin]
157    subscriber: Subscriber,
158    response_subject: String,
159    buffered_responses: Option<GenericStreamResponses<Response>>,
160    next_index: u32,
161    acknowledgement_sender: mpsc::UnboundedSender<(String, u32)>,
162    _background_task: AsyncJoinOnDrop<()>,
163    _phantom: PhantomData<Response>,
164}
165
166impl<Response> Stream for StreamResponseSubscriber<Response>
167where
168    Response: Decode,
169{
170    type Item = Response;
171
172    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
173        if let Some(buffered_responses) = self.buffered_responses.as_mut() {
174            if let Some(response) = buffered_responses.next() {
175                return Poll::Ready(Some(response));
176            } else if buffered_responses.is_last() {
177                return Poll::Ready(None);
178            }
179
180            self.buffered_responses.take();
181            self.next_index += 1;
182        }
183
184        let mut projected = self.project();
185        match projected.subscriber.poll_next_unpin(cx) {
186            Poll::Ready(Some(message)) => {
187                match GenericStreamResponses::<Response>::decode(&mut message.payload.as_ref()) {
188                    Ok(mut responses) => {
189                        if responses.index() != *projected.next_index {
190                            warn!(
191                                actual_index = %responses.index(),
192                                expected_index = %*projected.next_index,
193                                message_type = %type_name::<Response>(),
194                                response_subject = %projected.response_subject,
195                                "Received unexpected response stream index, aborting stream"
196                            );
197
198                            return Poll::Ready(None);
199                        }
200
201                        if let Some(ack_subject) = responses.ack_subject() {
202                            let index = responses.index();
203                            let ack_subject = ack_subject.to_string();
204
205                            if let Err(error) = projected
206                                .acknowledgement_sender
207                                .unbounded_send((ack_subject.clone(), index))
208                            {
209                                warn!(
210                                    %error,
211                                    %index,
212                                    message_type = %type_name::<Response>(),
213                                    response_subject = %projected.response_subject,
214                                    %ack_subject,
215                                    "Failed to send acknowledgement for stream response"
216                                );
217                            }
218                        }
219
220                        if let Some(response) = responses.next() {
221                            *projected.buffered_responses = Some(responses);
222                            Poll::Ready(Some(response))
223                        } else {
224                            Poll::Ready(None)
225                        }
226                    }
227                    Err(error) => {
228                        warn!(
229                            %error,
230                            response_type = %type_name::<Response>(),
231                            response_subject = %projected.response_subject,
232                            message = %hex::encode(message.payload),
233                            "Failed to decode stream response"
234                        );
235
236                        Poll::Ready(None)
237                    }
238                }
239            }
240            Poll::Ready(None) => Poll::Ready(None),
241            Poll::Pending => Poll::Pending,
242        }
243    }
244}
245
246impl<Response> StreamResponseSubscriber<Response> {
247    fn new(subscriber: Subscriber, response_subject: String, nats_client: NatsClient) -> Self {
248        let (acknowledgement_sender, mut acknowledgement_receiver) =
249            mpsc::unbounded::<(String, u32)>();
250
251        let ack_publisher_fut = {
252            let response_subject = response_subject.clone();
253
254            async move {
255                while let Some((subject, index)) = acknowledgement_receiver.next().await {
256                    trace!(
257                        %subject,
258                        %index,
259                        %response_subject,
260                        %index,
261                        "Sending stream response acknowledgement"
262                    );
263                    if let Err(error) = nats_client
264                        .publish(subject.clone(), index.to_le_bytes().to_vec().into())
265                        .await
266                    {
267                        warn!(
268                            %error,
269                            %subject,
270                            %index,
271                            %response_subject,
272                            %index,
273                            "Failed to send stream response acknowledgement"
274                        );
275                        return;
276                    }
277                }
278            }
279        };
280        let background_task =
281            AsyncJoinOnDrop::new(tokio::spawn(ack_publisher_fut.in_current_span()), true);
282
283        Self {
284            response_subject,
285            subscriber,
286            buffered_responses: None,
287            next_index: 0,
288            acknowledgement_sender,
289            _background_task: background_task,
290            _phantom: PhantomData,
291        }
292    }
293}
294
295/// Generic one-off notification
296pub trait GenericNotification: Encode + Decode + fmt::Debug + Send + Sync + 'static {
297    /// Notification subject with optional `*` in place of application instance receiving the
298    /// request
299    const SUBJECT: &'static str;
300}
301
302/// Generic broadcast message.
303///
304/// Broadcast messages are sent by an instance to (potentially) an instance-specific subject that
305/// any other app can subscribe to. The same broadcast message can also originate from multiple
306/// places and be de-duplicated using [`Self::deterministic_message_id`].
307pub trait GenericBroadcast: Encode + Decode + fmt::Debug + Send + Sync + 'static {
308    /// Broadcast subject with optional `*` in place of application instance sending broadcast
309    const SUBJECT: &'static str;
310
311    /// Deterministic message ID that is used for de-duplicating messages broadcast by different
312    /// instances
313    fn deterministic_message_id(&self) -> Option<HeaderValue> {
314        None
315    }
316}
317
318/// Subscriber wrapper that decodes messages automatically and skips messages that can't be decoded
319#[derive(Debug)]
320#[pin_project::pin_project]
321pub struct SubscriberWrapper<Message> {
322    #[pin]
323    subscriber: Subscriber,
324    _phantom: PhantomData<Message>,
325}
326
327impl<Message> Stream for SubscriberWrapper<Message>
328where
329    Message: Decode,
330{
331    type Item = Message;
332
333    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
334        match self.project().subscriber.poll_next_unpin(cx) {
335            Poll::Ready(Some(message)) => match Message::decode(&mut message.payload.as_ref()) {
336                Ok(message) => Poll::Ready(Some(message)),
337                Err(error) => {
338                    warn!(
339                        %error,
340                        message_type = %type_name::<Message>(),
341                        message = %hex::encode(message.payload),
342                        "Failed to decode stream message"
343                    );
344
345                    Poll::Pending
346                }
347            },
348            Poll::Ready(None) => Poll::Ready(None),
349            Poll::Pending => Poll::Pending,
350        }
351    }
352}
353
354#[derive(Debug)]
355struct Inner {
356    client: Client,
357    request_retry_backoff_policy: ExponentialBackoff,
358    approximate_max_message_size: usize,
359    max_message_size: usize,
360}
361
362/// NATS client wrapper that can be used to interact with other cluster-specific clients
363#[derive(Debug, Clone)]
364pub struct NatsClient {
365    inner: Arc<Inner>,
366}
367
368impl Deref for NatsClient {
369    type Target = Client;
370
371    #[inline]
372    fn deref(&self) -> &Self::Target {
373        &self.inner.client
374    }
375}
376
377impl NatsClient {
378    /// Create a new instance by connecting to specified addresses
379    pub async fn new<A: ToServerAddrs>(
380        addrs: A,
381        request_retry_backoff_policy: ExponentialBackoff,
382    ) -> Result<Self, async_nats::Error> {
383        let servers = addrs.to_server_addrs()?.collect::<Vec<_>>();
384        Self::from_client(
385            async_nats::connect_with_options(
386                &servers,
387                ConnectOptions::default().request_timeout(Some(REQUEST_TIMEOUT)),
388            )
389            .await?,
390            request_retry_backoff_policy,
391        )
392    }
393
394    /// Create new client from existing NATS instance
395    pub fn from_client(
396        client: Client,
397        request_retry_backoff_policy: ExponentialBackoff,
398    ) -> Result<Self, async_nats::Error> {
399        let max_payload = client.server_info().max_payload;
400        if max_payload < EXPECTED_MESSAGE_SIZE {
401            return Err(format!(
402                "Max payload {max_payload} is smaller than expected {EXPECTED_MESSAGE_SIZE}, \
403                increase it by specifying max_payload = 2MB or higher number in NATS configuration"
404            )
405            .into());
406        }
407
408        let inner = Inner {
409            client,
410            request_retry_backoff_policy,
411            // Allow up to 90%, the rest will be wrapper data structures, etc.
412            approximate_max_message_size: max_payload * 9 / 10,
413            // Allow up to 90%, the rest will be wrapper data structures, etc.
414            max_message_size: max_payload,
415        };
416
417        Ok(Self {
418            inner: Arc::new(inner),
419        })
420    }
421
422    /// Approximate max message size (a few more bytes will not hurt), the actual limit is expected
423    /// to be a bit higher
424    pub fn approximate_max_message_size(&self) -> usize {
425        self.inner.approximate_max_message_size
426    }
427
428    /// Make request and wait for response
429    pub async fn request<Request>(
430        &self,
431        request: &Request,
432        instance: Option<&str>,
433    ) -> Result<Request::Response, RequestError>
434    where
435        Request: GenericRequest,
436    {
437        let subject = subject_with_instance(Request::SUBJECT, instance);
438        let mut maybe_retry_backoff = None;
439        let message = loop {
440            match self
441                .inner
442                .client
443                .request(subject.clone(), request.encode().into())
444                .await
445            {
446                Ok(message) => {
447                    break message;
448                }
449                Err(error) => {
450                    match error.kind() {
451                        RequestErrorKind::TimedOut | RequestErrorKind::NoResponders => {
452                            // Continue with retries
453                        }
454                        RequestErrorKind::Other => {
455                            return Err(error);
456                        }
457                    }
458
459                    let retry_backoff = maybe_retry_backoff.get_or_insert_with(|| {
460                        let mut retry_backoff = self.inner.request_retry_backoff_policy.clone();
461                        retry_backoff.reset();
462                        retry_backoff
463                    });
464
465                    if let Some(delay) = retry_backoff.next_backoff() {
466                        debug!(
467                            %subject,
468                            %error,
469                            request_type = %type_name::<Request>(),
470                            ?delay,
471                            "Failed to make request, retrying after some delay"
472                        );
473
474                        tokio::time::sleep(delay).await;
475                        continue;
476                    } else {
477                        return Err(error);
478                    }
479                }
480            }
481        };
482
483        let response =
484            Request::Response::decode(&mut message.payload.as_ref()).map_err(|error| {
485                warn!(
486                    %subject,
487                    %error,
488                    response_type = %type_name::<Request::Response>(),
489                    response = %hex::encode(message.payload),
490                    "Response decoding failed"
491                );
492
493                RequestErrorKind::Other
494            })?;
495
496        Ok(response)
497    }
498
499    /// Responds to requests from the given subject using the provided processing function.
500    ///
501    /// This will create a subscription on the subject for the given instance (if provided) and
502    /// queue group. Incoming messages will be deserialized as the request type `Request` and passed
503    /// to the `process` function to produce a response of type `Request::Response`. The response
504    /// will then be sent back on the reply subject from the original request.
505    ///
506    /// Each request is processed in a newly created async tokio task.
507    ///
508    /// # Arguments
509    ///
510    /// * `instance` - Optional instance name to use in place of the `*` in the subject
511    /// * `group` - The queue group name for the subscription
512    /// * `process` - The function to call with the decoded request to produce a response
513    pub async fn request_responder<Request, F, OP>(
514        &self,
515        instance: Option<&str>,
516        queue_group: Option<String>,
517        process: OP,
518    ) -> anyhow::Result<()>
519    where
520        Request: GenericRequest,
521        F: Future<Output = Option<Request::Response>> + Send,
522        OP: Fn(Request) -> F + Send + Sync,
523    {
524        // Initialize with pending future so it never ends
525        let mut processing = FuturesUnordered::new();
526
527        let subscription = self
528            .common_subscribe(Request::SUBJECT, instance, queue_group)
529            .await
530            .map_err(|error| {
531                anyhow!(
532                    "Failed to subscribe to {} requests for {instance:?}: {error}",
533                    type_name::<Request>(),
534                )
535            })?;
536
537        debug!(
538            request_type = %type_name::<Request>(),
539            ?subscription,
540            "Requests subscription"
541        );
542        let mut subscription = subscription.fuse();
543
544        loop {
545            select! {
546                message = subscription.select_next_some() => {
547                    // Create background task for concurrent processing
548                    processing.push(
549                        self
550                            .process_request(
551                                message,
552                                &process,
553                            )
554                            .in_current_span(),
555                    );
556                },
557                _ = processing.next() => {
558                    // Nothing to do here
559                },
560                complete => {
561                    break;
562                }
563            }
564        }
565
566        Ok(())
567    }
568
569    async fn process_request<Request, F, OP>(&self, message: Message, process: OP)
570    where
571        Request: GenericRequest,
572        F: Future<Output = Option<Request::Response>> + Send,
573        OP: Fn(Request) -> F + Send + Sync,
574    {
575        let Some(reply_subject) = message.reply else {
576            return;
577        };
578
579        let message_payload_size = message.payload.len();
580        let request = match Request::decode(&mut message.payload.as_ref()) {
581            Ok(request) => {
582                // Free allocation early
583                drop(message.payload);
584                request
585            }
586            Err(error) => {
587                warn!(
588                    request_type = %type_name::<Request>(),
589                    %error,
590                    message = %hex::encode(message.payload),
591                    "Failed to decode request"
592                );
593                return;
594            }
595        };
596
597        // Avoid printing large messages in logs
598        if message_payload_size > 1024 {
599            trace!(
600                request_type = %type_name::<Request>(),
601                %reply_subject,
602                "Processing request"
603            );
604        } else {
605            trace!(
606                request_type = %type_name::<Request>(),
607                ?request,
608                %reply_subject,
609                "Processing request"
610            );
611        }
612
613        if let Some(response) = process(request).await
614            && let Err(error) = self.publish(reply_subject, response.encode().into()).await
615        {
616            warn!(
617                request_type = %type_name::<Request>(),
618                %error,
619                "Failed to send response"
620            );
621        }
622    }
623
624    /// Make request that expects stream response
625    pub async fn stream_request<Request>(
626        &self,
627        request: &Request,
628        instance: Option<&str>,
629    ) -> Result<StreamResponseSubscriber<Request::Response>, StreamRequestError>
630    where
631        Request: GenericStreamRequest,
632    {
633        let stream_request_subject = subject_with_instance(Request::SUBJECT, instance);
634        let stream_response_subject = format!("stream-response.{}", Ulid::new());
635
636        let subscriber = self
637            .inner
638            .client
639            .subscribe(stream_response_subject.clone())
640            .await?;
641
642        debug!(
643            request_type = %type_name::<Request>(),
644            %stream_request_subject,
645            %stream_response_subject,
646            ?subscriber,
647            "Stream request subscription"
648        );
649
650        self.inner
651            .client
652            .publish_with_reply(
653                stream_request_subject,
654                stream_response_subject.clone(),
655                request.encode().into(),
656            )
657            .await?;
658
659        Ok(StreamResponseSubscriber::new(
660            subscriber,
661            stream_response_subject,
662            self.clone(),
663        ))
664    }
665
666    /// Responds to stream requests from the given subject using the provided processing function.
667    ///
668    /// This will create a subscription on the subject for the given instance (if provided) and
669    /// queue group. Incoming messages will be deserialized as the request type `Request` and passed
670    /// to the `process` function to produce a stream response of type `Request::Response`. The
671    /// stream response will then be sent back on the reply subject from the original request.
672    ///
673    /// Each request is processed in a newly created async tokio task.
674    ///
675    /// # Arguments
676    ///
677    /// * `instance` - Optional instance name to use in place of the `*` in the subject
678    /// * `group` - The queue group name for the subscription
679    /// * `process` - The function to call with the decoded request to produce a response
680    pub async fn stream_request_responder<Request, F, S, OP>(
681        &self,
682        instance: Option<&str>,
683        queue_group: Option<String>,
684        process: OP,
685    ) -> anyhow::Result<()>
686    where
687        Request: GenericStreamRequest,
688        F: Future<Output = Option<S>> + Send,
689        S: Stream<Item = Request::Response> + Unpin,
690        OP: Fn(Request) -> F + Send + Sync,
691    {
692        // Initialize with pending future so it never ends
693        let mut processing = FuturesUnordered::new();
694
695        let subscription = self
696            .common_subscribe(Request::SUBJECT, instance, queue_group)
697            .await
698            .map_err(|error| {
699                anyhow!(
700                    "Failed to subscribe to {} stream requests for {instance:?}: {error}",
701                    type_name::<Request>(),
702                )
703            })?;
704
705        debug!(
706            request_type = %type_name::<Request>(),
707            ?subscription,
708            "Stream requests subscription"
709        );
710        let mut subscription = subscription.fuse();
711
712        loop {
713            select! {
714                message = subscription.select_next_some() => {
715                    // Create background task for concurrent processing
716                    processing.push(
717                        self
718                        .process_stream_request(
719                            message,
720                            &process,
721                        )
722                        .in_current_span(),
723                    );
724                },
725                _ = processing.next() => {
726                    // Nothing to do here
727                },
728                complete => {
729                    break;
730                }
731            }
732        }
733
734        Ok(())
735    }
736
737    async fn process_stream_request<Request, F, S, OP>(&self, message: Message, process: OP)
738    where
739        Request: GenericStreamRequest,
740        F: Future<Output = Option<S>> + Send,
741        S: Stream<Item = Request::Response> + Unpin,
742        OP: Fn(Request) -> F + Send + Sync,
743    {
744        let Some(reply_subject) = message.reply else {
745            return;
746        };
747
748        let message_payload_size = message.payload.len();
749        let request = match Request::decode(&mut message.payload.as_ref()) {
750            Ok(request) => {
751                // Free allocation early
752                drop(message.payload);
753                request
754            }
755            Err(error) => {
756                warn!(
757                    request_type = %type_name::<Request>(),
758                    %error,
759                    message = %hex::encode(message.payload),
760                    "Failed to decode request"
761                );
762                return;
763            }
764        };
765
766        // Avoid printing large messages in logs
767        if message_payload_size > 1024 {
768            trace!(
769                request_type = %type_name::<Request>(),
770                %reply_subject,
771                "Processing request"
772            );
773        } else {
774            trace!(
775                request_type = %type_name::<Request>(),
776                ?request,
777                %reply_subject,
778                "Processing request"
779            );
780        }
781
782        if let Some(stream) = process(request).await {
783            self.stream_response::<Request, _>(reply_subject, stream)
784                .await;
785        }
786    }
787
788    /// Helper method to send responses to requests initiated with [`Self::stream_request`]
789    async fn stream_response<Request, S>(&self, response_subject: Subject, response_stream: S)
790    where
791        Request: GenericStreamRequest,
792        S: Stream<Item = Request::Response> + Unpin,
793    {
794        type Response<Request> =
795            GenericStreamResponses<<Request as GenericStreamRequest>::Response>;
796
797        let mut response_stream = response_stream.fuse();
798
799        // Pull the first element to measure response size
800        let first_element = match response_stream.next().await {
801            Some(first_element) => first_element,
802            None => {
803                if let Err(error) = self
804                    .publish(
805                        response_subject.clone(),
806                        Response::<Request>::Last {
807                            index: 0,
808                            responses: VecDeque::new(),
809                        }
810                        .encode()
811                        .into(),
812                    )
813                    .await
814                {
815                    warn!(
816                        %response_subject,
817                        %error,
818                        request_type = %type_name::<Request>(),
819                        response_type = %type_name::<Request::Response>(),
820                        "Failed to send stream response"
821                    );
822                }
823
824                return;
825            }
826        };
827        let max_message_size = self.inner.max_message_size;
828        let approximate_max_message_size = self.approximate_max_message_size();
829        let max_responses_per_message = approximate_max_message_size / first_element.encoded_size();
830
831        let ack_subject = format!("stream-response-ack.{}", Ulid::new());
832        let mut ack_subscription = match self.subscribe(ack_subject.clone()).await {
833            Ok(ack_subscription) => ack_subscription,
834            Err(error) => {
835                warn!(
836                    %response_subject,
837                    %error,
838                    request_type = %type_name::<Request>(),
839                    response_type = %type_name::<Request::Response>(),
840                    "Failed to subscribe to ack subject"
841                );
842                return;
843            }
844        };
845        debug!(
846            %response_subject,
847            request_type = %type_name::<Request>(),
848            response_type = %type_name::<Request::Response>(),
849            ?ack_subscription,
850            "Ack subscription subscription"
851        );
852        let mut index = 0;
853        // Initialize buffer that will be reused for responses
854        let mut buffer = VecDeque::with_capacity(max_responses_per_message);
855        buffer.push_back(first_element);
856        let mut overflow_buffer = VecDeque::new();
857
858        loop {
859            // Try to fill the buffer
860            if buffer.is_empty()
861                && let Some(element) = response_stream.next().await
862            {
863                buffer.push_back(element);
864            }
865            while buffer.encoded_size() < approximate_max_message_size
866                && let Some(element) = response_stream.next().now_or_never().flatten()
867            {
868                buffer.push_back(element);
869            }
870
871            loop {
872                let is_done = response_stream.is_done() && overflow_buffer.is_empty();
873                let num_messages = buffer.len();
874                let response = if is_done {
875                    Response::<Request>::Last {
876                        index,
877                        responses: buffer,
878                    }
879                } else {
880                    Response::<Request>::Continue {
881                        index,
882                        responses: buffer,
883                        ack_subject: ack_subject.clone(),
884                    }
885                };
886                let encoded_response = response.encode();
887                let encoded_response_len = encoded_response.len();
888                // When encoded response is too large, remove one of the responses from it and try
889                // again
890                if encoded_response_len > max_message_size {
891                    buffer = response.into();
892                    if let Some(element) = buffer.pop_back() {
893                        if buffer.is_empty() {
894                            error!(
895                                ?element,
896                                encoded_response_len,
897                                max_message_size,
898                                "Element was too large to fit into NATS message, this is an \
899                                implementation bug"
900                            );
901                        }
902                        overflow_buffer.push_front(element);
903                        continue;
904                    } else {
905                        error!(
906                            %response_subject,
907                            request_type = %type_name::<Request>(),
908                            response_type = %type_name::<Request::Response>(),
909                            "Empty response overflown message size, this should never happen"
910                        );
911                        return;
912                    }
913                }
914
915                debug!(
916                    %response_subject,
917                    num_messages,
918                    %index,
919                    %is_done,
920                    "Publishing stream response messages",
921                );
922
923                if let Err(error) = self
924                    .publish(response_subject.clone(), encoded_response.into())
925                    .await
926                {
927                    warn!(
928                        %response_subject,
929                        %error,
930                        request_type = %type_name::<Request>(),
931                        response_type = %type_name::<Request::Response>(),
932                        "Failed to send stream response"
933                    );
934                    return;
935                }
936
937                if is_done {
938                    return;
939                } else {
940                    buffer = response.into();
941                    buffer.clear();
942                    // Fill buffer with any overflown responses that may have been stored
943                    buffer.extend(overflow_buffer.drain(..));
944                }
945
946                if index >= 1 {
947                    // Acknowledgements are received with delay
948                    let expected_index = index - 1;
949
950                    trace!(
951                        %response_subject,
952                        %expected_index,
953                        "Waiting for acknowledgement"
954                    );
955                    match tokio::time::timeout(ACKNOWLEDGEMENT_TIMEOUT, ack_subscription.next())
956                        .await
957                    {
958                        Ok(Some(message)) => {
959                            if let Some(received_index) =
960                                message.payload.get(..size_of::<u32>()).map(|bytes| {
961                                    u32::from_le_bytes(
962                                        bytes.try_into().expect("Correctly chunked slice; qed"),
963                                    )
964                                })
965                            {
966                                debug!(
967                                    %response_subject,
968                                    %received_index,
969                                    "Received acknowledgement"
970                                );
971                                if received_index != expected_index {
972                                    warn!(
973                                        %response_subject,
974                                        %received_index,
975                                        %expected_index,
976                                        request_type = %type_name::<Request>(),
977                                        response_type = %type_name::<Request::Response>(),
978                                        message = %hex::encode(message.payload),
979                                        "Unexpected acknowledgement index"
980                                    );
981                                    return;
982                                }
983                            } else {
984                                warn!(
985                                    %response_subject,
986                                    request_type = %type_name::<Request>(),
987                                    response_type = %type_name::<Request::Response>(),
988                                    message = %hex::encode(message.payload),
989                                    "Unexpected acknowledgement message"
990                                );
991                                return;
992                            }
993                        }
994                        Ok(None) => {
995                            warn!(
996                                %response_subject,
997                                request_type = %type_name::<Request>(),
998                                response_type = %type_name::<Request::Response>(),
999                                "Acknowledgement stream ended unexpectedly"
1000                            );
1001                            return;
1002                        }
1003                        Err(_error) => {
1004                            warn!(
1005                                %response_subject,
1006                                %expected_index,
1007                                request_type = %type_name::<Request>(),
1008                                response_type = %type_name::<Request::Response>(),
1009                                "Acknowledgement wait timed out"
1010                            );
1011                            return;
1012                        }
1013                    }
1014                }
1015
1016                index += 1;
1017
1018                // Unless `overflow_buffer` wasn't empty abort inner loop
1019                if buffer.is_empty() {
1020                    break;
1021                }
1022            }
1023        }
1024    }
1025
1026    /// Make notification without waiting for response
1027    pub async fn notification<Notification>(
1028        &self,
1029        notification: &Notification,
1030        instance: Option<&str>,
1031    ) -> Result<(), PublishError>
1032    where
1033        Notification: GenericNotification,
1034    {
1035        self.inner
1036            .client
1037            .publish(
1038                subject_with_instance(Notification::SUBJECT, instance),
1039                notification.encode().into(),
1040            )
1041            .await
1042    }
1043
1044    /// Send a broadcast message
1045    pub async fn broadcast<Broadcast>(
1046        &self,
1047        message: &Broadcast,
1048        instance: &str,
1049    ) -> Result<(), PublishError>
1050    where
1051        Broadcast: GenericBroadcast,
1052    {
1053        self.inner
1054            .client
1055            .publish_with_headers(
1056                Broadcast::SUBJECT.replace('*', instance),
1057                {
1058                    let mut headers = HeaderMap::new();
1059                    if let Some(message_id) = message.deterministic_message_id() {
1060                        headers.insert("Nats-Msg-Id", message_id);
1061                    }
1062                    headers
1063                },
1064                message.encode().into(),
1065            )
1066            .await
1067    }
1068
1069    /// Simple subscription that will produce decoded notifications, while skipping messages that
1070    /// fail to decode
1071    pub async fn subscribe_to_notifications<Notification>(
1072        &self,
1073        instance: Option<&str>,
1074        queue_group: Option<String>,
1075    ) -> Result<SubscriberWrapper<Notification>, SubscribeError>
1076    where
1077        Notification: GenericNotification,
1078    {
1079        self.simple_subscribe(Notification::SUBJECT, instance, queue_group)
1080            .await
1081    }
1082
1083    /// Simple subscription that will produce decoded broadcasts, while skipping messages that
1084    /// fail to decode
1085    pub async fn subscribe_to_broadcasts<Broadcast>(
1086        &self,
1087        instance: Option<&str>,
1088        queue_group: Option<String>,
1089    ) -> Result<SubscriberWrapper<Broadcast>, SubscribeError>
1090    where
1091        Broadcast: GenericBroadcast,
1092    {
1093        self.simple_subscribe(Broadcast::SUBJECT, instance, queue_group)
1094            .await
1095    }
1096
1097    /// Simple subscription that will produce decoded messages, while skipping messages that fail to
1098    /// decode
1099    async fn simple_subscribe<Message>(
1100        &self,
1101        subject: &'static str,
1102        instance: Option<&str>,
1103        queue_group: Option<String>,
1104    ) -> Result<SubscriberWrapper<Message>, SubscribeError>
1105    where
1106        Message: Decode,
1107    {
1108        let subscriber = self
1109            .common_subscribe(subject, instance, queue_group)
1110            .await?;
1111        debug!(
1112            %subject,
1113            message_type = %type_name::<Message>(),
1114            ?subscriber,
1115            "Simple subscription"
1116        );
1117
1118        Ok(SubscriberWrapper {
1119            subscriber,
1120            _phantom: PhantomData,
1121        })
1122    }
1123
1124    /// Simple subscription that will produce decoded messages, while skipping messages that fail to
1125    /// decode
1126    async fn common_subscribe(
1127        &self,
1128        subject: &'static str,
1129        instance: Option<&str>,
1130        queue_group: Option<String>,
1131    ) -> Result<Subscriber, SubscribeError> {
1132        let subscriber = if let Some(queue_group) = queue_group {
1133            self.inner
1134                .client
1135                .queue_subscribe(subject_with_instance(subject, instance), queue_group)
1136                .await?
1137        } else {
1138            self.inner
1139                .client
1140                .subscribe(subject_with_instance(subject, instance))
1141                .await?
1142        };
1143
1144        Ok(subscriber)
1145    }
1146}
1147
1148fn subject_with_instance(subject: &'static str, instance: Option<&str>) -> Subject {
1149    if let Some(instance) = instance {
1150        Subject::from(subject.replace('*', instance))
1151    } else {
1152        Subject::from_static(subject)
1153    }
1154}