From 9ab9d5efd5d327757cfcd07f0591f32515c4108d Mon Sep 17 00:00:00 2001 From: l-7-l Date: Thu, 11 Jan 2024 14:03:17 +0800 Subject: [PATCH] feat: add archive_batch --- pgmq-rs/src/lib.rs | 53 ++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 49 insertions(+), 4 deletions(-) diff --git a/pgmq-rs/src/lib.rs b/pgmq-rs/src/lib.rs index 8162da84..627087c0 100644 --- a/pgmq-rs/src/lib.rs +++ b/pgmq-rs/src/lib.rs @@ -804,15 +804,60 @@ impl PGMQueue { /// Ok(()) /// } pub async fn archive(&self, queue_name: &str, msg_id: i64) -> Result { + self.archive_batch(queue_name, &[msg_id]).await + } + + /// Moves multiple messages, by message id, from the queue table to archive table + /// View messages on the archive table with sql: + /// ```sql + /// SELECT * FROM pgmq__archive; + /// ``` + /// + /// Example: + /// + /// ```rust + /// use pgmq::{PgmqError, PGMQueue}; + /// use serde::Serialize; + /// use serde_json::Value; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), PgmqError> { + /// + /// println!("Connecting to Postgres"); + /// let queue: PGMQueue = PGMQueue::new("postgres://postgres:postgres@0.0.0.0:5432".to_owned()) + /// .await + /// .expect("Failed to connect to postgres"); + /// let my_queue = "my_queue".to_owned(); + /// queue.create(&my_queue) + /// .await + /// .expect("Failed to create queue"); + /// + /// let msgs = vec![ + /// serde_json::json!({"foo": "bar1"}), + /// serde_json::json!({"foo": "bar2"}), + /// serde_json::json!({"foo": "bar3"}), + /// ]; + /// + /// let msg_ids: Vec = queue + /// .send_batch(&my_queue, &msgs) + /// .await + /// .expect("Failed to enqueue messages"); + /// + /// queue.archive_batch(&my_queue, &msg_ids).await.expect("failed to archive messages"); + /// + /// Ok(()) + /// } + pub async fn archive_batch(&self, queue_name: &str, msg_ids: &[i64]) -> Result { let query = core_query::archive_batch(queue_name)?; let row = sqlx::query(&query) - .bind(vec![msg_id]) + .bind(msg_ids) .execute(&self.connection) .await?; - let num_deleted = row.rows_affected(); - Ok(num_deleted) - } + let num_achived = row.rows_affected(); + + Ok(num_achived) + } /// Reads single message from the queue and delete it at the same time. /// Similar to [read](#method.read) and [read_batch](#method.read_batch), /// if no messages are available, [`Option::None`] is returned. Unlike these methods,