From c8ec59a321634da6d7d3bc1175c255e9cf00ee78 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Tue, 21 Nov 2023 11:17:06 +1100 Subject: [PATCH] Fix kafka benches --- shotover-proxy/benches/windsock/aws/mod.rs | 58 ++++++++++++++-------- shotover-proxy/benches/windsock/kafka.rs | 32 +++++++----- 2 files changed, 57 insertions(+), 33 deletions(-) diff --git a/shotover-proxy/benches/windsock/aws/mod.rs b/shotover-proxy/benches/windsock/aws/mod.rs index 6a54e5047..acbf6dc84 100644 --- a/shotover-proxy/benches/windsock/aws/mod.rs +++ b/shotover-proxy/benches/windsock/aws/mod.rs @@ -109,6 +109,17 @@ curl -sSL https://get.docker.com/ | sudo sh"#, ) .await; + let local_shotover_path = bin_path!("shotover-proxy"); + + instance + .ssh() + .push_file(local_shotover_path, Path::new("shotover-bin")) + .await; + + instance + .ssh() + .push_file(Path::new("config/config.yaml"), Path::new("config.yaml")) + .await; let instance = Arc::new(Ec2InstanceWithDocker { instance }); (*self.docker_instances.write().await).push(instance.clone()); instance @@ -162,6 +173,7 @@ sudo apt-get install -y sysstat"#, } } +/// Despite the name also supports running shotover on these instances. pub struct Ec2InstanceWithDocker { pub instance: Ec2Instance, } @@ -239,6 +251,15 @@ sudo docker system prune -af"#, } } } + + #[cfg(feature = "rdkafka-driver-tests")] + pub async fn run_shotover(self: Arc, topology: &str) -> RunningShotover { + self.instance + .ssh() + .push_file_from_bytes(topology.as_bytes(), Path::new("topology.yaml")) + .await; + RunningShotover::new(&self.instance).await + } } fn get_compatible_instance_type() -> InstanceType { @@ -278,17 +299,26 @@ impl Ec2InstanceWithShotover { .ssh() .push_file_from_bytes(topology.as_bytes(), Path::new("topology.yaml")) .await; + RunningShotover::new(&self.instance).await + } +} + +pub struct RunningShotover { + shutdown_tx: tokio::sync::mpsc::UnboundedSender<()>, + event_rx: tokio::sync::mpsc::UnboundedReceiver, +} +impl RunningShotover { + async fn new(instance: &Ec2Instance) -> Self { let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel(); let (event_tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel(); + let mut receiver = instance + .ssh() + .shell_stdout_lines(r#" + killall -w shotover-bin > /dev/null || true + RUST_BACKTRACE=1 ./shotover-bin --config-file config.yaml --topology-file topology.yaml --log-format json"#) + .await; tokio::task::spawn(async move { - let mut receiver = self - .instance - .ssh() - .shell_stdout_lines(r#" -killall -w shotover-bin > /dev/null || true -RUST_BACKTRACE=1 ./shotover-bin --config-file config.yaml --topology-file topology.yaml --log-format json"#) - .await; loop { tokio::select! { line = receiver.recv() => { @@ -310,17 +340,11 @@ RUST_BACKTRACE=1 ./shotover-bin --config-file config.yaml --topology-file topolo } }, _ = shutdown_rx.recv() => { - // shutdown_tx is dropped, instructing us to shutdown - // we MUST drop self before dropping event_tx to ensure that the Arc clone is dropped before the task indicates that it has terminated. - // Otherwise we may hit a race condition and fail the assertion that there is only one Arc clone alive. - std::mem::drop(self); - std::mem::drop(event_tx); return; }, } } }); - // wait for shotover to startup loop { let event = event_rx @@ -334,20 +358,12 @@ RUST_BACKTRACE=1 ./shotover-bin --config-file config.yaml --topology-file topolo break; } } - RunningShotover { shutdown_tx, event_rx, } } -} -pub struct RunningShotover { - shutdown_tx: tokio::sync::mpsc::UnboundedSender<()>, - event_rx: tokio::sync::mpsc::UnboundedReceiver, -} - -impl RunningShotover { pub async fn shutdown(mut self) { // dropping shutdown_tx instructs the task to shutdown causing shotover to be terminated std::mem::drop(self.shutdown_tx); diff --git a/shotover-proxy/benches/windsock/kafka.rs b/shotover-proxy/benches/windsock/kafka.rs index 50421a38b..6e749edbf 100644 --- a/shotover-proxy/benches/windsock/kafka.rs +++ b/shotover-proxy/benches/windsock/kafka.rs @@ -1,4 +1,4 @@ -use crate::aws::{Ec2InstanceWithDocker, Ec2InstanceWithShotover}; +use crate::aws::Ec2InstanceWithDocker; use crate::common::{self, Shotover}; use crate::profilers::{self, CloudProfilerRunner, ProfilerRunner}; use crate::shotover::shotover_process_custom_topology; @@ -69,14 +69,14 @@ impl KafkaBench { async fn run_aws_shotover( &self, - instance: Arc, + instance: Arc, kafka_ip: String, ) -> Option { let ip = instance.instance.private_ip().to_string(); match self.shotover { Shotover::Standard | Shotover::ForcedMessageParsed => { let topology = - self.generate_topology_yaml(format!("{ip}:9092"), format!("{kafka_ip}:9092")); + self.generate_topology_yaml(format!("{ip}:9092"), format!("{kafka_ip}:9192")); Some(instance.run_shotover(&topology).await) } Shotover::None => None, @@ -135,21 +135,25 @@ impl Bench for KafkaBench { CloudProfilerRunner::new(self.name(), profiling, profiler_instances).await; let kafka_ip = kafka_instance.instance.private_ip().to_string(); - let shotover_ip = shotover_instance.instance.private_ip().to_string(); + // TODO: make use of this when we start benching KafkaSinkCluster + let _shotover_ip = shotover_instance.instance.private_ip().to_string(); let (_, running_shotover) = futures::join!( - run_aws_kafka(kafka_instance), - self.run_aws_shotover(shotover_instance, kafka_ip.clone()) + run_aws_kafka(kafka_instance.clone(), 9192), + self.run_aws_shotover(kafka_instance, kafka_ip.clone()) ); - let destination_ip = if running_shotover.is_some() { - shotover_ip + let destination_address = if running_shotover.is_some() { + format!("{kafka_ip}:9192") } else { - kafka_ip + format!("{kafka_ip}:9092") }; bench_instance - .run_bencher(&self.run_args(&destination_ip, ¶meters), &self.name()) + .run_bencher( + &self.run_args(&destination_address, ¶meters), + &self.name(), + ) .await; profiler.finish(); @@ -263,7 +267,7 @@ impl Bench for KafkaBench { } } -async fn run_aws_kafka(instance: Arc) { +async fn run_aws_kafka(instance: Arc, port: i16) { let ip = instance.instance.private_ip().to_string(); instance .run_container( @@ -272,7 +276,11 @@ async fn run_aws_kafka(instance: Arc) { ("ALLOW_PLAINTEXT_LISTENER".to_owned(), "yes".to_owned()), ( "KAFKA_CFG_ADVERTISED_LISTENERS".to_owned(), - format!("PLAINTEXT://{ip}:9092"), + format!("PLAINTEXT://{ip}:{port}"), + ), + ( + "KAFKA_CFG_LISTENERS".to_owned(), + format!("PLAINTEXT://:{port},CONTROLLER://:9093"), ), ("KAFKA_HEAP_OPTS".to_owned(), "-Xmx512M -Xms512M".to_owned()), ],