Skip to content

Commit

Permalink
some refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielePalaia committed Oct 17, 2024
1 parent df6a3e9 commit f90a2df
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 38 deletions.
3 changes: 0 additions & 3 deletions protocol/src/commands/superstream_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@ use std::io::Write;

#[cfg(test)]
use fake::Fake;
<<<<<<< HEAD
=======

>>>>>>> 4a7cb45 (refactoring tests)
use super::Command;
use crate::{
codec::{Decoder, Encoder},
Expand Down
2 changes: 1 addition & 1 deletion src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use crate::{
producer::ProducerBuilder,
stream_creator::StreamCreator,
superstream::RoutingStrategy,
superstream_producer::SuperStreamProducerBuilder,
superstream_consumer::SuperStreamConsumerBuilder,
superstream_producer::SuperStreamProducerBuilder,
RabbitMQStreamResult,
};

Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ pub mod types {
pub use crate::stream_creator::StreamCreator;
pub use crate::superstream::HashRoutingMurmurStrategy;
pub use crate::superstream::RoutingKeyRoutingStrategy;
pub use crate::superstream_consumer::SuperStreamConsumer;
pub use crate::superstream::RoutingStrategy;
pub use crate::superstream_consumer::SuperStreamConsumer;
pub use rabbitmq_stream_protocol::message::Message;
pub use rabbitmq_stream_protocol::{Response, ResponseCode, ResponseKind};

Expand Down
17 changes: 8 additions & 9 deletions src/superstream_consumer.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
use std::sync::Arc;

use rabbitmq_stream_protocol::{commands::subscribe::OffsetSpecification, message::Message};
use rabbitmq_stream_protocol::{commands::subscribe::OffsetSpecification};

use crate::superstream::DefaultSuperStreamMetadata;
use crate::{error::ConsumerCreateError, Client, Consumer, Environment};

type FilterPredicate = Option<Arc<dyn Fn(&Message) -> bool + Send + Sync>>;
//type FilterPredicate = Option<Arc<dyn Fn(&Message) -> bool + Send + Sync>>;

/// API for consuming RabbitMQ stream messages
#[derive(Clone)]
pub struct SuperStreamConsumer {
pub internal: Arc<SuperStreamConsumerInternal>,
internal: Arc<SuperStreamConsumerInternal>,
}

struct SuperStreamConsumerInternal {
client: Client,
super_stream: String,
offset_specification: OffsetSpecification,
pub consumers: Vec<Consumer>,
consumers: Vec<Consumer>,
}

/// Builder for [`Consumer`]
Expand Down Expand Up @@ -68,15 +68,14 @@ impl SuperStreamConsumerBuilder {
})
}

pub fn offset(mut self, offset_specification: OffsetSpecification) -> Self {
pub async fn offset(mut self, offset_specification: OffsetSpecification) -> Self {
self.offset_specification = offset_specification;
self
}
}

impl SuperStreamConsumer {

pub async fn get_consumers(&self) -> &Vec<Consumer> {
return &self.internal.consumers
impl SuperStreamConsumer {
pub async fn get_consumers(&self) -> &Vec<Consumer> {
return &self.internal.consumers;
}
}
46 changes: 22 additions & 24 deletions tests/integration/consumer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::time::Duration;
use crate::common::TestEnvironment;
use fake::{Fake, Faker};
use futures::StreamExt;
use tokio::task;
use rabbitmq_stream_client::{
error::{
ClientError, ConsumerCloseError, ConsumerDeliveryError, ConsumerStoreOffsetError,
Expand All @@ -12,11 +11,12 @@ use rabbitmq_stream_client::{
types::{Delivery, Message, OffsetSpecification, SuperStreamConsumer},
Consumer, FilterConfiguration, NoDedup, Producer,
};
use tokio::task;

use rabbitmq_stream_client::types::{HashRoutingMurmurStrategy, RoutingStrategy};
use rabbitmq_stream_protocol::ResponseCode;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use rabbitmq_stream_client::types::{HashRoutingMurmurStrategy, RoutingStrategy};
use std::sync::Arc;

#[tokio::test(flavor = "multi_thread")]
async fn consumer_test() {
Expand Down Expand Up @@ -79,13 +79,14 @@ async fn super_stream_consumer_test() {
.await
.unwrap();

static super_stream_consumer: SuperStreamConsumer = env
.env
.super_stream_consumer()
.offset(OffsetSpecification::Next)
.build(&env.stream)
.await
.unwrap();
let super_stream_consumer: Arc<SuperStreamConsumer> = Arc::new(
env.env
.super_stream_consumer()
//.offset(OffsetSpecification::Next)
.build(&env.stream)
.await
.unwrap(),
);

for n in 0..message_count {
let msg = Message::builder().body(format!("message{}", n)).build();
Expand All @@ -97,22 +98,19 @@ async fn super_stream_consumer_test() {
.unwrap();
}


let received_messages = Arc::new(AtomicU32::new(0));
let consumers = super_stream_consumer.get_consumers().await;

let mut tasks = Vec::new();
for mut consumer in consumers.into_iter() {
let received_messages_outer = received_messages.clone();
tasks.push(task::spawn(async move {
let inner_received_messages = received_messages_outer.clone();
let delivery = consumer.next().await.unwrap();
let _ = String::from_utf8(delivery.unwrap().message().data().unwrap().to_vec()).unwrap();
inner_received_messages.fetch_add(1, Ordering::Relaxed);

}));
}

for mut consumer in super_stream_consumer.get_consumers().await.into_iter() {
let received_messages_outer = received_messages.clone();
tasks.push(task::spawn(async move {
let inner_received_messages = received_messages_outer.clone();
let delivery = consumer.next().await.unwrap();
let _ =
String::from_utf8(delivery.unwrap().message().data().unwrap().to_vec()).unwrap();
inner_received_messages.fetch_add(1, Ordering::Relaxed);
}));
}
futures::future::join_all(tasks).await;

assert!(received_messages.fetch_add(1, Ordering::Relaxed) == message_count);
Expand Down Expand Up @@ -398,7 +396,7 @@ async fn consumer_test_with_store_offset() {
// Store an offset
if i == offset_to_store {
//Store the 5th element produced
let result = consumer_store
let _ = consumer_store
.store_offset(delivery.unwrap().offset())
.await;
}
Expand Down

0 comments on commit f90a2df

Please sign in to comment.