Skip to content

Commit

Permalink
KafkaSinkSingle: remove dest host config, rely solely on dest port (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Nov 27, 2023
1 parent 977c6f6 commit f4fde8a
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 31 deletions.
19 changes: 7 additions & 12 deletions shotover-proxy/benches/windsock/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl KafkaBench {
}
}

fn generate_topology_yaml(&self, host_address: String, kafka_address: String) -> String {
fn generate_topology_yaml(&self, host_address: String) -> String {
let mut transforms = vec![];
if let Shotover::ForcedMessageParsed = self.shotover {
transforms.push(Box::new(DebugForceEncodeConfig {
Expand All @@ -51,7 +51,7 @@ impl KafkaBench {
}

transforms.push(Box::new(KafkaSinkSingleConfig {
address: kafka_address,
destination_port: 9192,
connect_timeout_ms: 3000,
read_timeout: None,
}));
Expand All @@ -70,13 +70,11 @@ impl KafkaBench {
async fn run_aws_shotover(
&self,
instance: Arc<Ec2InstanceWithDocker>,
kafka_ip: String,
) -> Option<crate::aws::RunningShotover> {
let ip = instance.instance.private_ip().to_string();
match self.shotover {
Shotover::Standard | Shotover::ForcedMessageParsed => {
let topology =
self.generate_topology_yaml(format!("{ip}:9092"), format!("{kafka_ip}:9192"));
let topology = self.generate_topology_yaml(format!("{ip}:9092"));
Some(instance.run_shotover(&topology).await)
}
Shotover::None => None,
Expand Down Expand Up @@ -141,7 +139,7 @@ impl Bench for KafkaBench {

let (_, running_shotover) = futures::join!(
run_aws_kafka(kafka_instance.clone(), 9192),
self.run_aws_shotover(kafka_instance, kafka_ip.clone())
self.run_aws_shotover(kafka_instance)
);

let destination_address = if running_shotover.is_some() {
Expand Down Expand Up @@ -177,18 +175,15 @@ impl Bench for KafkaBench {
let mut profiler = ProfilerRunner::new(self.name(), profiling);
let shotover = match self.shotover {
Shotover::Standard | Shotover::ForcedMessageParsed => {
let topology_yaml = self.generate_topology_yaml(
"127.0.0.1:9192".to_owned(),
"127.0.0.1:9092".to_owned(),
);
let topology_yaml = self.generate_topology_yaml("127.0.0.1:9092".to_owned());
Some(shotover_process_custom_topology(&topology_yaml, &profiler).await)
}
Shotover::None => None,
};

let broker_address = match self.shotover {
Shotover::ForcedMessageParsed | Shotover::Standard => "127.0.0.1:9192",
Shotover::None => "127.0.0.1:9092",
Shotover::ForcedMessageParsed | Shotover::Standard => "127.0.0.1:9092",
Shotover::None => "127.0.0.1:9192",
};

profiler.run(&shotover).await;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ services:
kafka:
image: 'bitnami/kafka:3.4.0-debian-11-r22'
ports:
- '9092:9092'
- '9192:9192'
environment:
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9192,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9192
- ALLOW_PLAINTEXT_LISTENER=yes
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ sources:
encode_requests: true
encode_responses: true
- KafkaSinkSingle:
remote_address: "127.0.0.1:9092"
destination_port: 9092
connect_timeout_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ sources:
listen_addr: "127.0.0.1:9192"
chain:
- KafkaSinkSingle:
remote_address: "127.0.0.1:9092"
destination_port: 9092
connect_timeout_ms: 3000
24 changes: 10 additions & 14 deletions shotover/src/transforms/kafka/sink_single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ use tokio::time::timeout;

#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
/// KafkaSinkSingle is designed solely for the use case of running a shotover instance on the same machine as each kafka instance.
/// The kafka instance and shotover instance must run on seperate ports.
pub struct KafkaSinkSingleConfig {
#[serde(rename = "remote_address")]
pub address: String,
pub destination_port: u16,
pub connect_timeout_ms: u64,
pub read_timeout: Option<u64>,
}
Expand All @@ -32,7 +33,7 @@ use crate::transforms::TransformConfig;
impl TransformConfig for KafkaSinkSingleConfig {
async fn get_builder(&self, chain_name: String) -> Result<Box<dyn TransformBuilder>> {
Ok(Box::new(KafkaSinkSingleBuilder::new(
self.address.clone(),
self.destination_port,
chain_name,
self.connect_timeout_ms,
self.read_timeout,
Expand All @@ -42,28 +43,21 @@ impl TransformConfig for KafkaSinkSingleConfig {

pub struct KafkaSinkSingleBuilder {
// contains address and port
address: String,
address_port: u16,
connect_timeout: Duration,
read_timeout: Option<Duration>,
}

impl KafkaSinkSingleBuilder {
pub fn new(
address: String,
address_port: u16,
_chain_name: String,
connect_timeout_ms: u64,
timeout: Option<u64>,
) -> KafkaSinkSingleBuilder {
let receive_timeout = timeout.map(Duration::from_secs);
let address_port = address
.rsplit(':')
.next()
.and_then(|str| str.parse().ok())
.unwrap_or(9092);

KafkaSinkSingleBuilder {
address,
address_port,
connect_timeout: Duration::from_millis(connect_timeout_ms),
read_timeout: receive_timeout,
Expand All @@ -75,7 +69,6 @@ impl TransformBuilder for KafkaSinkSingleBuilder {
fn build(&self) -> Transforms {
Transforms::KafkaSinkSingle(KafkaSinkSingle {
outbound: None,
address: self.address.clone(),
address_port: self.address_port,
pushed_messages_tx: None,
connect_timeout: self.connect_timeout,
Expand All @@ -93,7 +86,6 @@ impl TransformBuilder for KafkaSinkSingleBuilder {
}

pub struct KafkaSinkSingle {
address: String,
address_port: u16,
outbound: Option<Connection>,
pushed_messages_tx: Option<mpsc::UnboundedSender<Messages>>,
Expand All @@ -106,7 +98,11 @@ impl Transform for KafkaSinkSingle {
async fn transform<'a>(&'a mut self, mut requests_wrapper: Wrapper<'a>) -> Result<Messages> {
if self.outbound.is_none() {
let codec = KafkaCodecBuilder::new(Direction::Sink);
let tcp_stream = tcp::tcp_stream(self.connect_timeout, &self.address).await?;
let tcp_stream = tcp::tcp_stream(
self.connect_timeout,
(requests_wrapper.local_addr.ip(), self.address_port),
)
.await?;
let (rx, tx) = tcp_stream.into_split();
self.outbound = Some(spawn_read_write_tasks(&codec, rx, tx));
}
Expand Down

0 comments on commit f4fde8a

Please sign in to comment.