Skip to content

Commit

Permalink
feat: add archive_batch
Browse files Browse the repository at this point in the history
  • Loading branch information
l-7-l committed Jan 11, 2024
1 parent 2e6159b commit 9ab9d5e
Showing 1 changed file with 49 additions and 4 deletions.
53 changes: 49 additions & 4 deletions pgmq-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -804,15 +804,60 @@ impl PGMQueue {
/// Ok(())
/// }
pub async fn archive(&self, queue_name: &str, msg_id: i64) -> Result<u64, PgmqError> {
self.archive_batch(queue_name, &[msg_id]).await
}

/// Moves multiple messages, by message id, from the queue table to archive table
/// View messages on the archive table with sql:
/// ```sql
/// SELECT * FROM pgmq_<queue_name>_archive;
/// ```
///
/// Example:
///
/// ```rust
/// use pgmq::{PgmqError, PGMQueue};
/// use serde::Serialize;
/// use serde_json::Value;
///
/// #[tokio::main]
/// async fn main() -> Result<(), PgmqError> {
///
/// println!("Connecting to Postgres");
/// let queue: PGMQueue = PGMQueue::new("postgres://postgres:postgres@0.0.0.0:5432".to_owned())
/// .await
/// .expect("Failed to connect to postgres");
/// let my_queue = "my_queue".to_owned();
/// queue.create(&my_queue)
/// .await
/// .expect("Failed to create queue");
///
/// let msgs = vec![
/// serde_json::json!({"foo": "bar1"}),
/// serde_json::json!({"foo": "bar2"}),
/// serde_json::json!({"foo": "bar3"}),
/// ];
///
/// let msg_ids: Vec<i64> = queue
/// .send_batch(&my_queue, &msgs)
/// .await
/// .expect("Failed to enqueue messages");
///
/// queue.archive_batch(&my_queue, &msg_ids).await.expect("failed to archive messages");
///
/// Ok(())
/// }
pub async fn archive_batch(&self, queue_name: &str, msg_ids: &[i64]) -> Result<u64, PgmqError> {
let query = core_query::archive_batch(queue_name)?;
let row = sqlx::query(&query)
.bind(vec![msg_id])
.bind(msg_ids)
.execute(&self.connection)
.await?;
let num_deleted = row.rows_affected();
Ok(num_deleted)
}

let num_achived = row.rows_affected();

Ok(num_achived)
}
/// Reads single message from the queue and delete it at the same time.
/// Similar to [read](#method.read) and [read_batch](#method.read_batch),
/// if no messages are available, [`Option::None`] is returned. Unlike these methods,
Expand Down

0 comments on commit 9ab9d5e

Please sign in to comment.