Skip to content

Commit

Permalink
Make Context::request return IntoFuture builder
Browse files Browse the repository at this point in the history
Co-authored-by: Casper Beyer <caspervonb@pm.me>
  • Loading branch information
n1ghtmare and caspervonb committed Jul 17, 2023
1 parent c3152bc commit 06e8c7d
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 26 deletions.
36 changes: 31 additions & 5 deletions async-nats/src/jetstream/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ pub mod pull;
pub mod push;
#[cfg(feature = "server_2_10")]
use std::collections::HashMap;
use std::future::IntoFuture;
use std::time::Duration;

use serde::{Deserialize, Serialize};
use serde_json::json;
use time::serde::rfc3339;

use super::context::RequestError;
use super::context::{RequestError, RequestErrorKind};
use super::response::Response;
use super::stream::ClusterInfo;
use super::Context;
use crate::jetstream::consumer;
Expand Down Expand Up @@ -76,14 +78,38 @@ impl<T: IntoConsumerConfig> Consumer<T> {
pub async fn info(&mut self) -> Result<&consumer::Info, RequestError> {
let subject = format!("CONSUMER.INFO.{}.{}", self.info.stream_name, self.info.name);

let info = self.context.request(subject, &json!({})).await?;
self.info = info;
Ok(&self.info)
let response: Response<consumer::Info> = self
.context
.request(subject, &json!({}))
.into_future()
.await?;

match response {
Response::Ok(info) => {
self.info = info;
Ok(&self.info)
}
Response::Err { error } => {
Err(RequestError::with_source(RequestErrorKind::Other, error))
}
}
}

async fn fetch_info(&self) -> Result<consumer::Info, RequestError> {
let subject = format!("CONSUMER.INFO.{}.{}", self.info.stream_name, self.info.name);
self.context.request(subject, &json!({})).await

let response: Response<consumer::Info> = self
.context
.request(subject, &json!({}))
.into_future()
.await?;

match response {
Response::Ok(info) => Ok(info),
Response::Err { error } => {
Err(RequestError::with_source(RequestErrorKind::Other, error))
}
}
}

/// Returns cached [Info] for the [Consumer].
Expand Down
86 changes: 65 additions & 21 deletions async-nats/src/jetstream/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::borrow::Borrow;
use std::fmt::Display;
use std::future::IntoFuture;
use std::io::ErrorKind;
use std::marker::PhantomData;
use std::pin::Pin;
use std::str::from_utf8;
use std::task::Poll;
Expand Down Expand Up @@ -746,30 +747,12 @@ impl Context {
/// # Ok(())
/// # }
/// ```
pub async fn request<T, V>(&self, subject: String, payload: &T) -> Result<V, RequestError>
pub fn request<T, V>(&self, subject: String, payload: T) -> Request<T, V>
where
T: ?Sized + Serialize,
T: Sized + Serialize,
V: DeserializeOwned,
{
let request = serde_json::to_vec(&payload)
.map(Bytes::from)
.map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;

debug!("JetStream request sent: {:?}", request);

let message = self
.client
.request(format!("{}.{}", self.prefix, subject), request)
.await;
let message = message?;
debug!(
"JetStream request response: {:?}",
from_utf8(&message.payload)
);
let response = serde_json::from_slice(message.payload.as_ref())
.map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;

Ok(response)
Request::new(self.clone(), subject, payload)
}

/// Creates a new object store bucket.
Expand Down Expand Up @@ -1251,6 +1234,67 @@ impl IntoFuture for Publish {
}
}

#[derive(Debug)]
pub struct Request<T: Sized + Serialize, V: DeserializeOwned> {
context: Context,
subject: String,
payload: T,
timeout: Option<Duration>,
response_type: PhantomData<V>,
}

impl<T: Sized + Serialize, V: DeserializeOwned> Request<T, V> {
pub fn new(context: Context, subject: String, payload: T) -> Self {
Self {
context,
subject,
payload,
timeout: None,
response_type: PhantomData,
}
}

pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
}

impl<T: Sized + Serialize, V: DeserializeOwned> IntoFuture for Request<T, V> {
type Output = Result<Response<V>, RequestError>;

type IntoFuture = Pin<Box<dyn Future<Output = Result<Response<V>, RequestError>> + Send>>;

fn into_future(self) -> Self::IntoFuture {
let payload_result = serde_json::to_vec(&self.payload).map(Bytes::from);

let prefix = self.context.prefix;
let client = self.context.client;
let subject = self.subject;
let timeout = self.timeout;

Box::pin(std::future::IntoFuture::into_future(async move {
let payload = payload_result
.map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;

debug!("JetStream request sent: {:?}", payload);

let request = client.request(format!("{}.{}", prefix, subject), payload);
let request = request.timeout(timeout);
let message = request.await?;

debug!(
"JetStream request response: {:?}",
from_utf8(&message.payload)
);
let response = serde_json::from_slice(message.payload.as_ref())
.map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;

Ok(response)
}))
}
}

#[derive(Debug)]
pub struct RequestError {
kind: RequestErrorKind,
Expand Down
3 changes: 3 additions & 0 deletions async-nats/src/jetstream/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,7 @@ impl Stream {
let response: Response<GetRawMessage> = self
.context
.request(subject, &payload)
.into_future()
.map_err(|err| LastRawMessageError::with_source(LastRawMessageErrorKind::Other, err))
.await?;
match response {
Expand Down Expand Up @@ -640,6 +641,7 @@ impl Stream {
let response: Response<DeleteStatus> = self
.context
.request(subject, &payload)
.into_future()
.map_err(|err| match err.kind() {
RequestErrorKind::TimedOut => {
DeleteMessageError::new(DeleteMessageErrorKind::TimedOut)
Expand Down Expand Up @@ -1577,6 +1579,7 @@ where
.stream
.context
.request(request_subject, &self.inner)
.into_future()
.map_err(|err| match err.kind() {
RequestErrorKind::TimedOut => PurgeError::new(PurgeErrorKind::TimedOut),
_ => PurgeError::with_source(PurgeErrorKind::Request, err),
Expand Down
15 changes: 15 additions & 0 deletions async-nats/tests/jetstream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,21 @@ mod jetstream {
assert!(matches!(response, Response::Err { .. }));
}

#[tokio::test]
async fn request_timeout() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();
let context = async_nats::jetstream::new(client);

let response: Response<AccountInfo> = context
.request("INFO".to_string(), &())
.timeout(Duration::from_secs(1))
.await
.unwrap();

assert!(matches!(response, Response::Ok { .. }));
}

#[tokio::test]
async fn create_stream() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
Expand Down

0 comments on commit 06e8c7d

Please sign in to comment.