Skip to content

Commit

Permalink
KafkaSinkSingle: remove dest host config, rely solely on dest port
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Nov 20, 2023
1 parent 312f287 commit f47bc89
Showing 1 changed file with 10 additions and 14 deletions.
24 changes: 10 additions & 14 deletions shotover/src/transforms/kafka/sink_single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ use tokio::time::timeout;

#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
/// KafkaSinkSingle is designed solely for the use case of running a shotover instance on the same machine as each kafka instance.
/// The kafka instance and shotover instance must run on seperate ports.
pub struct KafkaSinkSingleConfig {
#[serde(rename = "remote_address")]
pub address: String,
pub destination_port: u16,
pub connect_timeout_ms: u64,
pub read_timeout: Option<u64>,
}
Expand All @@ -32,7 +33,7 @@ use crate::transforms::TransformConfig;
impl TransformConfig for KafkaSinkSingleConfig {
async fn get_builder(&self, chain_name: String) -> Result<Box<dyn TransformBuilder>> {
Ok(Box::new(KafkaSinkSingleBuilder::new(
self.address.clone(),
self.destination_port,
chain_name,
self.connect_timeout_ms,
self.read_timeout,
Expand All @@ -42,28 +43,21 @@ impl TransformConfig for KafkaSinkSingleConfig {

pub struct KafkaSinkSingleBuilder {
// contains address and port
address: String,
address_port: u16,
connect_timeout: Duration,
read_timeout: Option<Duration>,
}

impl KafkaSinkSingleBuilder {
pub fn new(
address: String,
address_port: u16,
_chain_name: String,
connect_timeout_ms: u64,
timeout: Option<u64>,
) -> KafkaSinkSingleBuilder {
let receive_timeout = timeout.map(Duration::from_secs);
let address_port = address
.rsplit(':')
.next()
.and_then(|str| str.parse().ok())
.unwrap_or(9092);

KafkaSinkSingleBuilder {
address,
address_port,
connect_timeout: Duration::from_millis(connect_timeout_ms),
read_timeout: receive_timeout,
Expand All @@ -75,7 +69,6 @@ impl TransformBuilder for KafkaSinkSingleBuilder {
fn build(&self) -> Transforms {
Transforms::KafkaSinkSingle(KafkaSinkSingle {
outbound: None,
address: self.address.clone(),
address_port: self.address_port,
pushed_messages_tx: None,
connect_timeout: self.connect_timeout,
Expand All @@ -93,7 +86,6 @@ impl TransformBuilder for KafkaSinkSingleBuilder {
}

pub struct KafkaSinkSingle {
address: String,
address_port: u16,
outbound: Option<Connection>,
pushed_messages_tx: Option<mpsc::UnboundedSender<Messages>>,
Expand All @@ -106,7 +98,11 @@ impl Transform for KafkaSinkSingle {
async fn transform<'a>(&'a mut self, mut requests_wrapper: Wrapper<'a>) -> Result<Messages> {
if self.outbound.is_none() {
let codec = KafkaCodecBuilder::new(Direction::Sink);
let tcp_stream = tcp::tcp_stream(self.connect_timeout, &self.address).await?;
let tcp_stream = tcp::tcp_stream(
self.connect_timeout,
(requests_wrapper.local_addr.ip(), self.address_port),
)
.await?;
let (rx, tx) = tcp_stream.into_split();
self.outbound = Some(spawn_read_write_tasks(&codec, rx, tx));
}
Expand Down

0 comments on commit f47bc89

Please sign in to comment.