Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make use of into_future for requests and messages #1003

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 147 additions & 111 deletions async-nats/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@ use crate::ServerInfo;
use super::{header::HeaderMap, status::StatusCode, Command, Message, Subscriber};
use crate::error::Error;
use bytes::Bytes;
use futures::future::TryFutureExt;
use futures::stream::StreamExt;
use futures::{Future, TryFutureExt};
use once_cell::sync::Lazy;
use regex::Regex;
use std::fmt::Display;
use std::future::IntoFuture;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tokio::sync::mpsc;
use tracing::trace;

static VERSION_RE: Lazy<Regex> =
Lazy::new(|| Regex::new(r#"\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?"#).unwrap());
Expand All @@ -44,6 +45,63 @@ impl From<tokio::sync::mpsc::error::SendError<Command>> for PublishError {
}
}

#[must_use]
pub struct Publish {
n1ghtmare marked this conversation as resolved.
Show resolved Hide resolved
sender: mpsc::Sender<Command>,
subject: String,
payload: Bytes,
headers: Option<HeaderMap>,
respond: Option<String>,
}

impl Publish {
pub(crate) fn new(sender: mpsc::Sender<Command>, subject: String, payload: Bytes) -> Publish {
Publish {
sender,
subject,
payload,
headers: None,
respond: None,
}
}

pub fn headers(mut self, headers: HeaderMap) -> Publish {
self.headers = Some(headers);
self
}

pub fn reply(mut self, subject: String) -> Publish {
self.respond = Some(subject);
self
}
}

impl IntoFuture for Publish {
type Output = Result<(), PublishError>;
type IntoFuture = Pin<Box<dyn Future<Output = Result<(), PublishError>> + Send>>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also BoxFuture here


fn into_future(self) -> Self::IntoFuture {
let sender = self.sender;
let subject = self.subject;
let payload = self.payload;
let respond = self.respond;
let headers = self.headers;

Box::pin(async move {
sender
.send(Command::Publish {
subject,
payload,
respond,
headers,
})
.await?;

Ok(())
})
}
}

/// Client is a `Cloneable` handle to NATS connection.
/// Client should not be created directly. Instead, one of two methods can be used:
/// [crate::connect] and [crate::ConnectOptions::connect]
Expand Down Expand Up @@ -149,16 +207,8 @@ impl Client {
/// # Ok(())
/// # }
/// ```
pub async fn publish(&self, subject: String, payload: Bytes) -> Result<(), PublishError> {
self.sender
.send(Command::Publish {
subject,
payload,
respond: None,
headers: None,
})
.await?;
Ok(())
pub fn publish(&self, subject: String, payload: Bytes) -> Publish {
Publish::new(self.sender.clone(), subject, payload)
}

/// Publish a [Message] with headers to a given subject.
Expand Down Expand Up @@ -186,14 +236,7 @@ impl Client {
headers: HeaderMap,
payload: Bytes,
) -> Result<(), PublishError> {
self.sender
.send(Command::Publish {
subject,
payload,
respond: None,
headers: Some(headers),
})
.await?;
self.publish(subject, payload).headers(headers).await?;
Ok(())
}

Expand Down Expand Up @@ -223,14 +266,7 @@ impl Client {
reply: String,
payload: Bytes,
) -> Result<(), PublishError> {
self.sender
.send(Command::Publish {
subject,
payload,
respond: Some(reply),
headers: None,
})
.await?;
self.publish(subject, payload).reply(reply).await?;
Ok(())
}

Expand Down Expand Up @@ -264,13 +300,9 @@ impl Client {
headers: HeaderMap,
payload: Bytes,
) -> Result<(), PublishError> {
self.sender
.send(Command::Publish {
subject,
payload,
respond: Some(reply),
headers: Some(headers),
})
self.publish(subject, payload)
.headers(headers)
.reply(reply)
.await?;
Ok(())
}
Expand All @@ -286,10 +318,8 @@ impl Client {
/// # Ok(())
/// # }
/// ```
pub async fn request(&self, subject: String, payload: Bytes) -> Result<Message, RequestError> {
trace!("request sent to subject: {} ({})", subject, payload.len());
let request = Request::new().payload(payload);
self.send_request(subject, request).await
pub fn request(&self, subject: String, payload: Bytes) -> Request {
Request::new(self.clone(), subject, payload)
}

/// Sends the request with headers.
Expand All @@ -313,65 +343,11 @@ impl Client {
headers: HeaderMap,
payload: Bytes,
) -> Result<Message, RequestError> {
let request = Request::new().headers(headers).payload(payload);
self.send_request(subject, request).await
}
let message = Request::new(self.clone(), subject, payload)
.headers(headers)
.await?;

/// Sends the request created by the [Request].
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("demo.nats.io").await?;
/// let request = async_nats::Request::new().payload("data".into());
/// let response = client.send_request("service".into(), request).await?;
/// # Ok(())
/// # }
/// ```
pub async fn send_request(
&self,
subject: String,
request: Request,
) -> Result<Message, RequestError> {
let inbox = request.inbox.unwrap_or_else(|| self.new_inbox());
let timeout = request.timeout.unwrap_or(self.request_timeout);
let mut sub = self.subscribe(inbox.clone()).await?;
let payload: Bytes = request.payload.unwrap_or_else(Bytes::new);
match request.headers {
Some(headers) => {
self.publish_with_reply_and_headers(subject, inbox, headers, payload)
.await?
}
None => self.publish_with_reply(subject, inbox, payload).await?,
}
self.flush()
.await
.map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
let request = match timeout {
Some(timeout) => {
tokio::time::timeout(timeout, sub.next())
.map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
.await?
}
None => sub.next().await,
};
match request {
Some(message) => {
if message.status == Some(StatusCode::NO_RESPONDERS) {
return Err(RequestError::with_source(
RequestErrorKind::NoResponders,
"no responders",
));
}
Ok(message)
}
None => Err(RequestError::with_source(
RequestErrorKind::Other,
"broken pipe",
)),
}
Ok(message)
}

/// Create a new globally unique inbox which can be used for replies.
Expand Down Expand Up @@ -503,17 +479,26 @@ impl Client {
}

/// Used for building customized requests.
#[derive(Default)]
#[derive(Debug)]
pub struct Request {
client: Client,
subject: String,
payload: Option<Bytes>,
headers: Option<HeaderMap>,
timeout: Option<Option<Duration>>,
inbox: Option<String>,
}

impl Request {
pub fn new() -> Request {
Default::default()
pub fn new(client: Client, subject: String, payload: Bytes) -> Request {
Request {
client,
subject,
payload: Some(payload),
headers: None,
timeout: None,
inbox: None,
}
}

/// Sets the payload of the request. If not used, empty payload will be sent.
Expand All @@ -523,8 +508,7 @@ impl Request {
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("demo.nats.io").await?;
/// let request = async_nats::Request::new().payload("data".into());
/// client.send_request("service".into(), request).await?;
/// client.request("service".into(), "data".into()).await?;
/// # Ok(())
/// # }
/// ```
Expand All @@ -546,10 +530,11 @@ impl Request {
/// "X-Example",
/// async_nats::HeaderValue::from_str("Value").unwrap(),
/// );
/// let request = async_nats::Request::new()
/// client
/// .request("subject".into(), "data".into())
/// .headers(headers)
/// .payload("data".into());
/// client.send_request("service".into(), request).await?;
/// .await?;
///
/// # Ok(())
/// # }
/// ```
Expand All @@ -567,10 +552,11 @@ impl Request {
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("demo.nats.io").await?;
/// let request = async_nats::Request::new()
/// client
/// .request("service".into(), "data".into())
/// .timeout(Some(std::time::Duration::from_secs(15)))
/// .payload("data".into());
/// client.send_request("service".into(), request).await?;
/// .await?;
///
/// # Ok(())
/// # }
/// ```
Expand All @@ -587,17 +573,67 @@ impl Request {
/// # async fn main() -> Result<(), async_nats::Error> {
/// use std::str::FromStr;
/// let client = async_nats::connect("demo.nats.io").await?;
/// let request = async_nats::Request::new()
/// client
/// .request("subject".into(), "data".into())
/// .inbox("custom_inbox".into())
/// .payload("data".into());
/// client.send_request("service".into(), request).await?;
/// .await?;
///
/// # Ok(())
/// # }
/// ```
pub fn inbox(mut self, inbox: String) -> Request {
self.inbox = Some(inbox);
self
}

async fn send(self) -> Result<Message, RequestError> {
let inbox = self.inbox.unwrap_or_else(|| self.client.new_inbox());
let mut subscriber = self.client.subscribe(inbox.clone()).await?;
let mut publish = self
.client
.publish(self.subject, self.payload.unwrap_or_else(Bytes::new));

if let Some(headers) = self.headers {
publish = publish.headers(headers);
}

publish = publish.reply(inbox);
publish.into_future().await?;

self.client
.flush()
.map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))
.await?;

let period = self.timeout.unwrap_or(self.client.request_timeout);
let message = match period {
Some(period) => {
tokio::time::timeout(period, subscriber.next())
.map_err(|_| RequestError::new(RequestErrorKind::TimedOut))
.await?
}
None => subscriber.next().await,
};

match message {
Some(message) => {
if message.status == Some(StatusCode::NO_RESPONDERS) {
return Err(RequestError::new(RequestErrorKind::NoResponders));
}
Ok(message)
}
None => Err(RequestError::new(RequestErrorKind::Other)),
}
}
}

impl IntoFuture for Request {
type Output = Result<Message, RequestError>;
type IntoFuture = Pin<Box<dyn Future<Output = Result<Message, RequestError>> + Send>>;

fn into_future(self) -> Self::IntoFuture {
Box::pin(self.send())
}
}

#[derive(Error, Debug)]
Expand Down
Loading