diff --git a/shotover-proxy/benches/windsock/aws/mod.rs b/shotover-proxy/benches/windsock/aws/mod.rs index 6a54e5047..eb9b7bf56 100644 --- a/shotover-proxy/benches/windsock/aws/mod.rs +++ b/shotover-proxy/benches/windsock/aws/mod.rs @@ -109,6 +109,16 @@ curl -sSL https://get.docker.com/ | sudo sh"#, ) .await; + let local_shotover_path = bin_path!("shotover-proxy"); + instance + .ssh() + .push_file(local_shotover_path, Path::new("shotover-bin")) + .await; + instance + .ssh() + .push_file(Path::new("config/config.yaml"), Path::new("config.yaml")) + .await; + let instance = Arc::new(Ec2InstanceWithDocker { instance }); (*self.docker_instances.write().await).push(instance.clone()); instance @@ -162,6 +172,7 @@ sudo apt-get install -y sysstat"#, } } +/// Despite the name can also run shotover pub struct Ec2InstanceWithDocker { pub instance: Ec2Instance, } @@ -239,6 +250,15 @@ sudo docker system prune -af"#, } } } + + #[cfg(feature = "rdkafka-driver-tests")] + pub async fn run_shotover(self: Arc, topology: &str) -> RunningShotover { + self.instance + .ssh() + .push_file_from_bytes(topology.as_bytes(), Path::new("topology.yaml")) + .await; + RunningShotover::new(&self.instance).await + } } fn get_compatible_instance_type() -> InstanceType { @@ -278,28 +298,34 @@ impl Ec2InstanceWithShotover { .ssh() .push_file_from_bytes(topology.as_bytes(), Path::new("topology.yaml")) .await; + RunningShotover::new(&self.instance).await + } +} + +pub struct RunningShotover { + shutdown_tx: tokio::sync::mpsc::UnboundedSender<()>, + event_rx: tokio::sync::mpsc::UnboundedReceiver, +} +impl RunningShotover { + async fn new(instance: &Ec2Instance) -> Self { let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel(); let (event_tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel(); + let mut receiver = instance + .ssh() + .shell_stdout_lines(r#" + killall -w shotover-bin > /dev/null || true + RUST_BACKTRACE=1 ./shotover-bin --config-file config.yaml --topology-file topology.yaml --log-format json"#) + .await; tokio::task::spawn(async move { - let mut receiver = self - .instance - .ssh() - .shell_stdout_lines(r#" -killall -w shotover-bin > /dev/null || true -RUST_BACKTRACE=1 ./shotover-bin --config-file config.yaml --topology-file topology.yaml --log-format json"#) - .await; loop { tokio::select! { line = receiver.recv() => { match line { Some(Ok(line)) => { let event = Event::from_json_str(&line).unwrap(); - if let Level::Warn = event.level { - tracing::error!("shotover warn:\n {event}"); - } - if let Level::Error = event.level { - tracing::error!("shotover error:\n {event}"); + if let Level::Warn | Level::Error = event.level { + println!("AWS shotover: {event}"); } if event_tx.send(event).is_err() { return @@ -310,11 +336,6 @@ RUST_BACKTRACE=1 ./shotover-bin --config-file config.yaml --topology-file topolo } }, _ = shutdown_rx.recv() => { - // shutdown_tx is dropped, instructing us to shutdown - // we MUST drop self before dropping event_tx to ensure that the Arc clone is dropped before the task indicates that it has terminated. - // Otherwise we may hit a race condition and fail the assertion that there is only one Arc clone alive. - std::mem::drop(self); - std::mem::drop(event_tx); return; }, } @@ -334,27 +355,29 @@ RUST_BACKTRACE=1 ./shotover-bin --config-file config.yaml --topology-file topolo break; } } - RunningShotover { shutdown_tx, event_rx, } } -} - -pub struct RunningShotover { - shutdown_tx: tokio::sync::mpsc::UnboundedSender<()>, - event_rx: tokio::sync::mpsc::UnboundedReceiver, -} -impl RunningShotover { pub async fn shutdown(mut self) { // dropping shutdown_tx instructs the task to shutdown causing shotover to be terminated std::mem::drop(self.shutdown_tx); + let ignore = [ + // Occurs when shotover is under really heavy kafka load, maybe shotover isnt reading off the socket and then kafka times out and gives up? + "failed to receive message on tcp stream: Custom { kind: Other, error: \"bytes remaining on stream\" }", + ]; + while let Some(event) = self.event_rx.recv().await { if let Level::Warn | Level::Error = event.level { - panic!("Received error/warn event from shotover:\n {event}") + if !ignore + .iter() + .any(|ignore| event.fields.message.contains(ignore)) + { + panic!("Received error/warn event from shotover:\n {event}") + } } } } diff --git a/shotover-proxy/benches/windsock/kafka.rs b/shotover-proxy/benches/windsock/kafka.rs index 50421a38b..f261af2d6 100644 --- a/shotover-proxy/benches/windsock/kafka.rs +++ b/shotover-proxy/benches/windsock/kafka.rs @@ -1,4 +1,4 @@ -use crate::aws::{Ec2InstanceWithDocker, Ec2InstanceWithShotover}; +use crate::aws::Ec2InstanceWithDocker; use crate::common::{self, Shotover}; use crate::profilers::{self, CloudProfilerRunner, ProfilerRunner}; use crate::shotover::shotover_process_custom_topology; @@ -69,14 +69,14 @@ impl KafkaBench { async fn run_aws_shotover( &self, - instance: Arc, + instance: Arc, kafka_ip: String, ) -> Option { let ip = instance.instance.private_ip().to_string(); match self.shotover { Shotover::Standard | Shotover::ForcedMessageParsed => { let topology = - self.generate_topology_yaml(format!("{ip}:9092"), format!("{kafka_ip}:9092")); + self.generate_topology_yaml(format!("{ip}:9092"), format!("{kafka_ip}:9192")); Some(instance.run_shotover(&topology).await) } Shotover::None => None, @@ -123,33 +123,38 @@ impl Bench for KafkaBench { aws.create_shotover_instance() ); - let mut profiler_instances: HashMap = [ + let profiler_instances: HashMap = [ ("bencher".to_owned(), &bench_instance.instance), ("kafka".to_owned(), &kafka_instance.instance), ] .into(); - if let Shotover::ForcedMessageParsed | Shotover::Standard = self.shotover { - profiler_instances.insert("shotover".to_owned(), &shotover_instance.instance); - } + + // TODO: enable when testing KafkaSinkCluster + //profiler_instances.insert("shotover".to_owned(), &shotover_instance.instance); + let mut profiler = CloudProfilerRunner::new(self.name(), profiling, profiler_instances).await; let kafka_ip = kafka_instance.instance.private_ip().to_string(); - let shotover_ip = shotover_instance.instance.private_ip().to_string(); + // TODO: make use of this when we start benching KafkaSinkCluster + let _shotover_ip = shotover_instance.instance.private_ip().to_string(); let (_, running_shotover) = futures::join!( - run_aws_kafka(kafka_instance), - self.run_aws_shotover(shotover_instance, kafka_ip.clone()) + run_aws_kafka(kafka_instance.clone(), 9192), + self.run_aws_shotover(kafka_instance, kafka_ip.clone()) ); - let destination_ip = if running_shotover.is_some() { - shotover_ip + let destination_address = if running_shotover.is_some() { + format!("{kafka_ip}:9092") } else { - kafka_ip + format!("{kafka_ip}:9192") }; bench_instance - .run_bencher(&self.run_args(&destination_ip, ¶meters), &self.name()) + .run_bencher( + &self.run_args(&destination_address, ¶meters), + &self.name(), + ) .await; profiler.finish(); @@ -263,7 +268,7 @@ impl Bench for KafkaBench { } } -async fn run_aws_kafka(instance: Arc) { +async fn run_aws_kafka(instance: Arc, port: i16) { let ip = instance.instance.private_ip().to_string(); instance .run_container( @@ -272,7 +277,11 @@ async fn run_aws_kafka(instance: Arc) { ("ALLOW_PLAINTEXT_LISTENER".to_owned(), "yes".to_owned()), ( "KAFKA_CFG_ADVERTISED_LISTENERS".to_owned(), - format!("PLAINTEXT://{ip}:9092"), + format!("PLAINTEXT://{ip}:{port}"), + ), + ( + "KAFKA_CFG_LISTENERS".to_owned(), + format!("PLAINTEXT://:{port},CONTROLLER://:9093"), ), ("KAFKA_HEAP_OPTS".to_owned(), "-Xmx512M -Xms512M".to_owned()), ],