diff --git a/shotover/src/transforms/kafka/sink_single.rs b/shotover/src/transforms/kafka/sink_single.rs index 82649b260..52afd3774 100644 --- a/shotover/src/transforms/kafka/sink_single.rs +++ b/shotover/src/transforms/kafka/sink_single.rs @@ -16,9 +16,10 @@ use tokio::time::timeout; #[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] +/// KafkaSinkSingle is designed solely for the use case of running a shotover instance on the same machine as each kafka instance. +/// The kafka instance and shotover instance must run on seperate ports. pub struct KafkaSinkSingleConfig { - #[serde(rename = "remote_address")] - pub address: String, + pub destination_port: u16, pub connect_timeout_ms: u64, pub read_timeout: Option, } @@ -32,7 +33,7 @@ use crate::transforms::TransformConfig; impl TransformConfig for KafkaSinkSingleConfig { async fn get_builder(&self, chain_name: String) -> Result> { Ok(Box::new(KafkaSinkSingleBuilder::new( - self.address.clone(), + self.destination_port, chain_name, self.connect_timeout_ms, self.read_timeout, @@ -42,7 +43,6 @@ impl TransformConfig for KafkaSinkSingleConfig { pub struct KafkaSinkSingleBuilder { // contains address and port - address: String, address_port: u16, connect_timeout: Duration, read_timeout: Option, @@ -50,20 +50,14 @@ pub struct KafkaSinkSingleBuilder { impl KafkaSinkSingleBuilder { pub fn new( - address: String, + address_port: u16, _chain_name: String, connect_timeout_ms: u64, timeout: Option, ) -> KafkaSinkSingleBuilder { let receive_timeout = timeout.map(Duration::from_secs); - let address_port = address - .rsplit(':') - .next() - .and_then(|str| str.parse().ok()) - .unwrap_or(9092); KafkaSinkSingleBuilder { - address, address_port, connect_timeout: Duration::from_millis(connect_timeout_ms), read_timeout: receive_timeout, @@ -75,7 +69,6 @@ impl TransformBuilder for KafkaSinkSingleBuilder { fn build(&self) -> Transforms { Transforms::KafkaSinkSingle(KafkaSinkSingle { outbound: None, - address: self.address.clone(), address_port: self.address_port, pushed_messages_tx: None, connect_timeout: self.connect_timeout, @@ -93,7 +86,6 @@ impl TransformBuilder for KafkaSinkSingleBuilder { } pub struct KafkaSinkSingle { - address: String, address_port: u16, outbound: Option, pushed_messages_tx: Option>, @@ -106,7 +98,11 @@ impl Transform for KafkaSinkSingle { async fn transform<'a>(&'a mut self, mut requests_wrapper: Wrapper<'a>) -> Result { if self.outbound.is_none() { let codec = KafkaCodecBuilder::new(Direction::Sink); - let tcp_stream = tcp::tcp_stream(self.connect_timeout, &self.address).await?; + let tcp_stream = tcp::tcp_stream( + self.connect_timeout, + (requests_wrapper.local_addr.ip(), self.address_port), + ) + .await?; let (rx, tx) = tcp_stream.into_split(); self.outbound = Some(spawn_read_write_tasks(&codec, rx, tx)); }