Skip to content

Commit

Permalink
Fix annotation get (#174)
Browse files Browse the repository at this point in the history
Add the implementation for get function.
Add tests for complex messages

Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
  • Loading branch information
Gsantomaggio authored Jul 19, 2023
1 parent 1c3a803 commit 6b50429
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 7 deletions.
2 changes: 1 addition & 1 deletion protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ byteorder = "1"
ordered-float = "3.0.0"
uuid = "1"
chrono = "0.4.19"
num_enum = "0.5.7"
num_enum = "0.6.1"
derive_more = "0.99"

[dev-dependencies]
Expand Down
7 changes: 7 additions & 0 deletions protocol/src/message/amqp/types/annotations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ impl Annotations {
{
self.0.insert(key.into(), value.into())
}

pub fn get<K>(&self, key: K) -> Option<&Value>
where
K: Into<AnnonationKey>,
{
self.0.get(&key.into())
}
}

impl From<&str> for AnnonationKey {
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/environment_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ async fn environment_create_streams_with_parameters() {
.stream_creator()
.max_age(Duration::from_secs(10))
.max_length(ByteCapacity::B(1))
.max_segment_size(ByteCapacity::B(1))
.max_segment_size(ByteCapacity::GB(1))
.create(&stream_to_test)
.await;
assert_eq!(response.is_ok(), true);
Expand Down
95 changes: 90 additions & 5 deletions tests/integration/producer_test.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use chrono::Utc;
use fake::{Fake, Faker};
use futures::StreamExt;
use rabbitmq_stream_client::types::{Message, OffsetSpecification};
use tokio::sync::mpsc::channel;

use rabbitmq_stream_client::types::{Message, OffsetSpecification};

use crate::common::TestEnvironment;

#[tokio::test(flavor = "multi_thread")]
Expand Down Expand Up @@ -86,6 +88,7 @@ async fn producer_send_name_with_deduplication_ok() {

consumer.handle().close().await.unwrap();
}

#[tokio::test(flavor = "multi_thread")]
async fn producer_send_with_callback() {
let env = TestEnvironment::create().await;
Expand Down Expand Up @@ -185,18 +188,33 @@ async fn producer_send_with_complex_message_ok() {
.build(&env.stream)
.await
.unwrap();

let now = Utc::now();
let _ = producer
.send_with_confirm(
Message::builder()
.body(b"message".to_vec())
.application_properties()
.insert("test_key", "test_value")
.message_builder()
.properties()
.user_id("test_user")
.correlation_id(444u64)
.absolute_expiry_time(now)
.content_encoding("deflate")
.content_type("application/json")
.group_id("test_group")
.group_sequence(1u32)
.reply_to("test_reply")
.subject("test_subject")
.to("test_to")
.creation_time(Utc::now())
.reply_to_group_id("test_reply_group")
.message_id(32u64)
.message_builder()
.message_annotations()
.insert("test", "test")
.insert("test", true)
.insert("test", 3u8)
.insert("test_string", "string_value")
.insert("test_bool", true)
.insert("test_number", 3u8)
.message_builder()
.build(),
)
Expand All @@ -217,5 +235,72 @@ async fn producer_send_with_complex_message_ok() {
properties.and_then(|properties| properties.message_id.clone())
);

assert_eq!(
Some("test_user".into()),
properties.and_then(|properties| properties.user_id.clone())
);

assert_eq!(
Some("test_group".into()),
properties.and_then(|properties| properties.group_id.clone())
);

assert_eq!(
Some("test_reply".into()),
properties.and_then(|properties| properties.reply_to.clone())
);

assert_eq!(
Some("test_subject".into()),
properties.and_then(|properties| properties.subject.clone())
);

assert_eq!(
Some("test_to".into()),
properties.and_then(|properties| properties.to.clone())
);

assert_eq!(
Some("test_reply_group".into()),
properties.and_then(|properties| properties.reply_to_group_id.clone())
);

assert_eq!(
Some(444u64.into()),
properties.and_then(|properties| properties.correlation_id.clone())
);

assert_eq!(
Some(1u32.into()),
properties.and_then(|properties| properties.group_sequence.clone())
);

assert_eq!(
Some("deflate".into()),
properties.and_then(|properties| properties.content_encoding.clone())
);

assert_eq!(
Some("application/json".into()),
properties.and_then(|properties| properties.content_type.clone())
);

let message_annotations = message.message_annotations();

assert_eq!(
Some("string_value".into()),
message_annotations.and_then(|annotations| annotations.get("test_string").cloned())
);

assert_eq!(
Some(true.into()),
message_annotations.and_then(|annotations| annotations.get("test_bool").cloned())
);

assert_eq!(
Some(3u8.into()),
message_annotations.and_then(|annotations| annotations.get("test_number").cloned())
);

consumer.handle().close().await.unwrap();
}

0 comments on commit 6b50429

Please sign in to comment.