Skip to content

Commit

Permalink
feat: add pause consumer
Browse files Browse the repository at this point in the history
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
  • Loading branch information
yordis committed Mar 19, 2024
1 parent 182efc2 commit 115e501
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 0 deletions.
5 changes: 5 additions & 0 deletions async-nats/src/jetstream/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ pub struct Info {
/// Indicates if any client is connected and receiving messages from a push consumer
#[serde(default, skip_serializing_if = "is_default")]
pub push_bound: bool,
/// Indicates if the consumer is paused
pub paused: bool,
/// The remaining time the consumer is paused
#[serde(with = "serde_nanos")]
pub pause_remaining: Option<Duration>,
}

/// Information about a consumer and the stream it is consuming
Expand Down
77 changes: 77 additions & 0 deletions async-nats/src/jetstream/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,64 @@ impl Stream {
}
}

/// Pause a [Consumer] until the given time.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use async_nats::jetstream::consumer;
/// use futures::StreamExt;
/// let client = async_nats::connect("localhost:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
/// let pause_until = time::OffsetDateTime::now_utc() + time::Duration::from_secs(60);
///
/// jetstream
/// .get_stream("events")
/// .await?
/// .pause_consumer("my_consumer", pause_until)
/// .await?;
/// # Ok(())
/// # }
/// ```
pub async fn pause_consumer(&self, name: &str, pause_until: OffsetDateTime) -> Result<PauseResponse, ConsumerError> {
self.request_pause_consumer(name, Some(pause_until)).await
}

/// Resume a paused [Consumer].
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use async_nats::jetstream::consumer;
/// use futures::StreamExt;
/// let client = async_nats::connect("localhost:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
///
/// jetstream
/// .get_stream("events")
/// .await?
/// .resume_consumer("my_consumer")
/// .await?;
/// # Ok(())
/// # }
/// ```
pub async fn resume_consumer(&self, name: &str) -> Result<PauseResponse, ConsumerError> {
self.request_pause_consumer(name, None).await
}

async fn request_pause_consumer(&self, name: &str, pause_until: Option<OffsetDateTime>) -> Result<PauseResponse, ConsumerError> {
let subject = format!("CONSUMER.PAUSE.{}.{}", self.info.config.name, name);
let payload = &PauseConsumerRequest{ pause_until };
match self.context.request(subject, payload).await? {
Response::Ok::<PauseResponse>(resp) => { Ok(resp) }
Response::Err { error } => Err(error.into()),
}
}

/// Lists names of all consumers for current stream.
///
/// # Examples
Expand Down Expand Up @@ -1024,6 +1082,10 @@ pub struct Config {
/// Sets the first sequence for the stream.
#[serde(default, skip_serializing_if = "Option::is_none", rename = "first_seq")]
pub first_sequence: Option<u64>,

/// PauseUntil is for suspending the consumer until the deadline.
#[sende(with = "rfc3339")]
pub pause_until: Option<OffsetDateTime>,
}

impl From<&Config> for Config {
Expand Down Expand Up @@ -1167,6 +1229,21 @@ pub struct DeleteStatus {
pub success: bool,
}

#[derive(Deserialize)]
pub struct PauseResponse {
pub paused: bool,
#[sende(with = "rfc3339")]
pub pause_until: Option<OffsetDateTime>,
#[sende(with = "serde_nanos")]
pub pause_remaining: Option<Duration>,
}

#[derive(Serialize)]
struct PauseConsumerRequest {
#[serde(with = "rfc3339", skip_serializing_if = "Option::is_none")]
pause_until: Option<OffsetDateTime>,
}

/// information about the given stream.
#[derive(Debug, Deserialize, Clone, Copy)]
pub struct State {
Expand Down
1 change: 1 addition & 0 deletions async-nats/tests/jetstream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3475,6 +3475,7 @@ mod jetstream {
max_ack_pending: 150,
}),
first_sequence: Some(505),
pause_until: None,
};

let stream = jetstream.create_stream(config.clone()).await.unwrap();
Expand Down

0 comments on commit 115e501

Please sign in to comment.