diff --git a/protocol/src/commands/superstream_partitions.rs b/protocol/src/commands/superstream_partitions.rs index 452ab2d..bee07a7 100644 --- a/protocol/src/commands/superstream_partitions.rs +++ b/protocol/src/commands/superstream_partitions.rs @@ -2,10 +2,7 @@ use std::io::Write; #[cfg(test)] use fake::Fake; -<<<<<<< HEAD -======= ->>>>>>> 4a7cb45 (refactoring tests) use super::Command; use crate::{ codec::{Decoder, Encoder}, diff --git a/src/environment.rs b/src/environment.rs index fca0086..ccd94ed 100644 --- a/src/environment.rs +++ b/src/environment.rs @@ -12,8 +12,8 @@ use crate::{ producer::ProducerBuilder, stream_creator::StreamCreator, superstream::RoutingStrategy, - superstream_producer::SuperStreamProducerBuilder, superstream_consumer::SuperStreamConsumerBuilder, + superstream_producer::SuperStreamProducerBuilder, RabbitMQStreamResult, }; diff --git a/src/lib.rs b/src/lib.rs index 16c7910..2fe8c66 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -99,8 +99,8 @@ pub mod types { pub use crate::stream_creator::StreamCreator; pub use crate::superstream::HashRoutingMurmurStrategy; pub use crate::superstream::RoutingKeyRoutingStrategy; - pub use crate::superstream_consumer::SuperStreamConsumer; pub use crate::superstream::RoutingStrategy; + pub use crate::superstream_consumer::SuperStreamConsumer; pub use rabbitmq_stream_protocol::message::Message; pub use rabbitmq_stream_protocol::{Response, ResponseCode, ResponseKind}; diff --git a/src/superstream_consumer.rs b/src/superstream_consumer.rs index 9d96daf..a0344e2 100644 --- a/src/superstream_consumer.rs +++ b/src/superstream_consumer.rs @@ -1,23 +1,23 @@ use std::sync::Arc; -use rabbitmq_stream_protocol::{commands::subscribe::OffsetSpecification, message::Message}; +use rabbitmq_stream_protocol::{commands::subscribe::OffsetSpecification}; use crate::superstream::DefaultSuperStreamMetadata; use crate::{error::ConsumerCreateError, Client, Consumer, Environment}; -type FilterPredicate = Option bool + Send + Sync>>; +//type FilterPredicate = Option bool + Send + Sync>>; /// API for consuming RabbitMQ stream messages #[derive(Clone)] pub struct SuperStreamConsumer { - pub internal: Arc, + internal: Arc, } struct SuperStreamConsumerInternal { client: Client, super_stream: String, offset_specification: OffsetSpecification, - pub consumers: Vec, + consumers: Vec, } /// Builder for [`Consumer`] @@ -68,15 +68,14 @@ impl SuperStreamConsumerBuilder { }) } - pub fn offset(mut self, offset_specification: OffsetSpecification) -> Self { + pub async fn offset(mut self, offset_specification: OffsetSpecification) -> Self { self.offset_specification = offset_specification; self } } -impl SuperStreamConsumer { - - pub async fn get_consumers(&self) -> &Vec { - return &self.internal.consumers +impl SuperStreamConsumer { + pub async fn get_consumers(&self) -> &Vec { + return &self.internal.consumers; } } diff --git a/tests/integration/consumer_test.rs b/tests/integration/consumer_test.rs index 0b45f22..4b01309 100644 --- a/tests/integration/consumer_test.rs +++ b/tests/integration/consumer_test.rs @@ -3,7 +3,6 @@ use std::time::Duration; use crate::common::TestEnvironment; use fake::{Fake, Faker}; use futures::StreamExt; -use tokio::task; use rabbitmq_stream_client::{ error::{ ClientError, ConsumerCloseError, ConsumerDeliveryError, ConsumerStoreOffsetError, @@ -12,11 +11,12 @@ use rabbitmq_stream_client::{ types::{Delivery, Message, OffsetSpecification, SuperStreamConsumer}, Consumer, FilterConfiguration, NoDedup, Producer, }; +use tokio::task; +use rabbitmq_stream_client::types::{HashRoutingMurmurStrategy, RoutingStrategy}; use rabbitmq_stream_protocol::ResponseCode; -use std::sync::Arc; use std::sync::atomic::{AtomicU32, Ordering}; -use rabbitmq_stream_client::types::{HashRoutingMurmurStrategy, RoutingStrategy}; +use std::sync::Arc; #[tokio::test(flavor = "multi_thread")] async fn consumer_test() { @@ -79,13 +79,14 @@ async fn super_stream_consumer_test() { .await .unwrap(); - static super_stream_consumer: SuperStreamConsumer = env - .env - .super_stream_consumer() - .offset(OffsetSpecification::Next) - .build(&env.stream) - .await - .unwrap(); + let super_stream_consumer: Arc = Arc::new( + env.env + .super_stream_consumer() + //.offset(OffsetSpecification::Next) + .build(&env.stream) + .await + .unwrap(), + ); for n in 0..message_count { let msg = Message::builder().body(format!("message{}", n)).build(); @@ -97,22 +98,19 @@ async fn super_stream_consumer_test() { .unwrap(); } - let received_messages = Arc::new(AtomicU32::new(0)); - let consumers = super_stream_consumer.get_consumers().await; let mut tasks = Vec::new(); - for mut consumer in consumers.into_iter() { - let received_messages_outer = received_messages.clone(); - tasks.push(task::spawn(async move { - let inner_received_messages = received_messages_outer.clone(); - let delivery = consumer.next().await.unwrap(); - let _ = String::from_utf8(delivery.unwrap().message().data().unwrap().to_vec()).unwrap(); - inner_received_messages.fetch_add(1, Ordering::Relaxed); - - })); - } - + for mut consumer in super_stream_consumer.get_consumers().await.into_iter() { + let received_messages_outer = received_messages.clone(); + tasks.push(task::spawn(async move { + let inner_received_messages = received_messages_outer.clone(); + let delivery = consumer.next().await.unwrap(); + let _ = + String::from_utf8(delivery.unwrap().message().data().unwrap().to_vec()).unwrap(); + inner_received_messages.fetch_add(1, Ordering::Relaxed); + })); + } futures::future::join_all(tasks).await; assert!(received_messages.fetch_add(1, Ordering::Relaxed) == message_count); @@ -398,7 +396,7 @@ async fn consumer_test_with_store_offset() { // Store an offset if i == offset_to_store { //Store the 5th element produced - let result = consumer_store + let _ = consumer_store .store_offset(delivery.unwrap().offset()) .await; }