Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add archive_batch #180

Merged
merged 3 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 49 additions & 4 deletions pgmq-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -804,15 +804,60 @@ impl PGMQueue {
/// Ok(())
/// }
pub async fn archive(&self, queue_name: &str, msg_id: i64) -> Result<u64, PgmqError> {
self.archive_batch(queue_name, &[msg_id]).await
}

/// Moves multiple messages, by message id, from the queue table to archive table
/// View messages on the archive table with sql:
/// ```sql
/// SELECT * FROM pgmq_<queue_name>_archive;
/// ```
///
/// Example:
///
/// ```rust
/// use pgmq::{PgmqError, PGMQueue};
/// use serde::Serialize;
/// use serde_json::Value;
///
/// #[tokio::main]
/// async fn main() -> Result<(), PgmqError> {
///
/// println!("Connecting to Postgres");
/// let queue: PGMQueue = PGMQueue::new("postgres://postgres:postgres@0.0.0.0:5432".to_owned())
/// .await
/// .expect("Failed to connect to postgres");
/// let my_queue = "my_queue".to_owned();
/// queue.create(&my_queue)
/// .await
/// .expect("Failed to create queue");
///
/// let msgs = vec![
/// serde_json::json!({"foo": "bar1"}),
/// serde_json::json!({"foo": "bar2"}),
/// serde_json::json!({"foo": "bar3"}),
/// ];
///
/// let msg_ids: Vec<i64> = queue
/// .send_batch(&my_queue, &msgs)
/// .await
/// .expect("Failed to enqueue messages");
///
/// queue.archive_batch(&my_queue, &msg_ids).await.expect("failed to archive messages");
///
/// Ok(())
/// }
pub async fn archive_batch(&self, queue_name: &str, msg_ids: &[i64]) -> Result<u64, PgmqError> {
let query = core_query::archive_batch(queue_name)?;
let row = sqlx::query(&query)
.bind(vec![msg_id])
.bind(msg_ids)
.execute(&self.connection)
.await?;
let num_deleted = row.rows_affected();
Ok(num_deleted)
}

let num_achived = row.rows_affected();

Ok(num_achived)
}
/// Reads single message from the queue and delete it at the same time.
/// Similar to [read](#method.read) and [read_batch](#method.read_batch),
/// if no messages are available, [`Option::None`] is returned. Unlike these methods,
Expand Down
23 changes: 23 additions & 0 deletions pgmq-rs/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,29 @@ async fn test_archive() {
assert_eq!(num_rows_archive, 1);
}

#[tokio::test]
async fn test_archive_batch() {
let test_queue = "test_archive_batch_queue".to_owned();
let queue = init_queue(&test_queue).await;
let msg = MyMessage::default();

let msg_1 = queue.send(&test_queue, &msg).await.unwrap();
let msg_2 = queue.send(&test_queue, &msg).await.unwrap();
let msg_3 = queue.send(&test_queue, &msg).await.unwrap();

let num_moved = queue
.archive_batch(&test_queue, &[msg_1, msg_2, msg_3])
.await
.unwrap();
assert_eq!(num_moved, 3);

let num_rows_queue = rowcount(&test_queue, &queue.connection).await;
assert_eq!(num_rows_queue, 0);

let num_rows_archive = archive_rowcount(&test_queue, &queue.connection).await;
assert_eq!(num_rows_archive, 3);
}

/// test db operations that should produce errors
#[tokio::test]
async fn test_database_error_modes() {
Expand Down
Loading