diff --git a/shotover-proxy/benches/windsock/kafka.rs b/shotover-proxy/benches/windsock/kafka.rs index f261af2d6..1e46c5420 100644 --- a/shotover-proxy/benches/windsock/kafka.rs +++ b/shotover-proxy/benches/windsock/kafka.rs @@ -41,7 +41,7 @@ impl KafkaBench { } } - fn generate_topology_yaml(&self, host_address: String, kafka_address: String) -> String { + fn generate_topology_yaml(&self, host_address: String) -> String { let mut transforms = vec![]; if let Shotover::ForcedMessageParsed = self.shotover { transforms.push(Box::new(DebugForceEncodeConfig { @@ -51,7 +51,7 @@ impl KafkaBench { } transforms.push(Box::new(KafkaSinkSingleConfig { - address: kafka_address, + destination_port: 9192, connect_timeout_ms: 3000, read_timeout: None, })); @@ -70,13 +70,11 @@ impl KafkaBench { async fn run_aws_shotover( &self, instance: Arc, - kafka_ip: String, ) -> Option { let ip = instance.instance.private_ip().to_string(); match self.shotover { Shotover::Standard | Shotover::ForcedMessageParsed => { - let topology = - self.generate_topology_yaml(format!("{ip}:9092"), format!("{kafka_ip}:9192")); + let topology = self.generate_topology_yaml(format!("{ip}:9092")); Some(instance.run_shotover(&topology).await) } Shotover::None => None, @@ -141,7 +139,7 @@ impl Bench for KafkaBench { let (_, running_shotover) = futures::join!( run_aws_kafka(kafka_instance.clone(), 9192), - self.run_aws_shotover(kafka_instance, kafka_ip.clone()) + self.run_aws_shotover(kafka_instance) ); let destination_address = if running_shotover.is_some() { @@ -177,18 +175,15 @@ impl Bench for KafkaBench { let mut profiler = ProfilerRunner::new(self.name(), profiling); let shotover = match self.shotover { Shotover::Standard | Shotover::ForcedMessageParsed => { - let topology_yaml = self.generate_topology_yaml( - "127.0.0.1:9192".to_owned(), - "127.0.0.1:9092".to_owned(), - ); + let topology_yaml = self.generate_topology_yaml("127.0.0.1:9092".to_owned()); Some(shotover_process_custom_topology(&topology_yaml, &profiler).await) } Shotover::None => None, }; let broker_address = match self.shotover { - Shotover::ForcedMessageParsed | Shotover::Standard => "127.0.0.1:9192", - Shotover::None => "127.0.0.1:9092", + Shotover::ForcedMessageParsed | Shotover::Standard => "127.0.0.1:9092", + Shotover::None => "127.0.0.1:9192", }; profiler.run(&shotover).await; diff --git a/shotover-proxy/tests/test-configs/kafka/bench/docker-compose.yaml b/shotover-proxy/tests/test-configs/kafka/bench/docker-compose.yaml index adbbf7266..c33b3a8e7 100644 --- a/shotover-proxy/tests/test-configs/kafka/bench/docker-compose.yaml +++ b/shotover-proxy/tests/test-configs/kafka/bench/docker-compose.yaml @@ -3,8 +3,8 @@ services: kafka: image: 'bitnami/kafka:3.4.0-debian-11-r22' ports: - - '9092:9092' + - '9192:9192' environment: - - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 - - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9192,CONTROLLER://:9093 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9192 - ALLOW_PLAINTEXT_LISTENER=yes diff --git a/shotover/src/transforms/kafka/sink_single.rs b/shotover/src/transforms/kafka/sink_single.rs index 82649b260..52afd3774 100644 --- a/shotover/src/transforms/kafka/sink_single.rs +++ b/shotover/src/transforms/kafka/sink_single.rs @@ -16,9 +16,10 @@ use tokio::time::timeout; #[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] +/// KafkaSinkSingle is designed solely for the use case of running a shotover instance on the same machine as each kafka instance. +/// The kafka instance and shotover instance must run on seperate ports. pub struct KafkaSinkSingleConfig { - #[serde(rename = "remote_address")] - pub address: String, + pub destination_port: u16, pub connect_timeout_ms: u64, pub read_timeout: Option, } @@ -32,7 +33,7 @@ use crate::transforms::TransformConfig; impl TransformConfig for KafkaSinkSingleConfig { async fn get_builder(&self, chain_name: String) -> Result> { Ok(Box::new(KafkaSinkSingleBuilder::new( - self.address.clone(), + self.destination_port, chain_name, self.connect_timeout_ms, self.read_timeout, @@ -42,7 +43,6 @@ impl TransformConfig for KafkaSinkSingleConfig { pub struct KafkaSinkSingleBuilder { // contains address and port - address: String, address_port: u16, connect_timeout: Duration, read_timeout: Option, @@ -50,20 +50,14 @@ pub struct KafkaSinkSingleBuilder { impl KafkaSinkSingleBuilder { pub fn new( - address: String, + address_port: u16, _chain_name: String, connect_timeout_ms: u64, timeout: Option, ) -> KafkaSinkSingleBuilder { let receive_timeout = timeout.map(Duration::from_secs); - let address_port = address - .rsplit(':') - .next() - .and_then(|str| str.parse().ok()) - .unwrap_or(9092); KafkaSinkSingleBuilder { - address, address_port, connect_timeout: Duration::from_millis(connect_timeout_ms), read_timeout: receive_timeout, @@ -75,7 +69,6 @@ impl TransformBuilder for KafkaSinkSingleBuilder { fn build(&self) -> Transforms { Transforms::KafkaSinkSingle(KafkaSinkSingle { outbound: None, - address: self.address.clone(), address_port: self.address_port, pushed_messages_tx: None, connect_timeout: self.connect_timeout, @@ -93,7 +86,6 @@ impl TransformBuilder for KafkaSinkSingleBuilder { } pub struct KafkaSinkSingle { - address: String, address_port: u16, outbound: Option, pushed_messages_tx: Option>, @@ -106,7 +98,11 @@ impl Transform for KafkaSinkSingle { async fn transform<'a>(&'a mut self, mut requests_wrapper: Wrapper<'a>) -> Result { if self.outbound.is_none() { let codec = KafkaCodecBuilder::new(Direction::Sink); - let tcp_stream = tcp::tcp_stream(self.connect_timeout, &self.address).await?; + let tcp_stream = tcp::tcp_stream( + self.connect_timeout, + (requests_wrapper.local_addr.ip(), self.address_port), + ) + .await?; let (rx, tx) = tcp_stream.into_split(); self.outbound = Some(spawn_read_write_tasks(&codec, rx, tx)); }