Skip to content

Commit

Permalink
Merge pull request #134 from kaskada-ai/pulsar-add-tls-configs
Browse files Browse the repository at this point in the history
Add cert chain field to pulsar
  • Loading branch information
rltvty authored Mar 21, 2023
2 parents 1dd4072 + b4277be commit 29fb802
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 33 deletions.
89 changes: 56 additions & 33 deletions crates/sparrow-runtime/src/execute/output/pulsar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use arrow::datatypes::{ArrowPrimitiveType, DataType, SchemaRef, TimestampMicrose
use arrow::record_batch::RecordBatch;
use futures::stream::BoxStream;
use futures::StreamExt;
use pulsar::authentication::token::TokenAuthentication;
use pulsar::compression::Compression;
use pulsar::Authentication;
use sparrow_api::kaskada::v1alpha::output_to;
Expand Down Expand Up @@ -89,38 +88,7 @@ pub(super) async fn write(
.into_report()
.change_context(Error::ProgressUpdate)?;

// Currently, we only support auth with jwt tokens
// https://pulsar.apache.org/docs/2.4.0/security-token-client/
error_stack::ensure!(
pulsar.auth_plugin == "org.apache.pulsar.client.impl.auth.AuthenticationToken",
Error::PulsarAuth {
context: format!("unsupported auth plugin: {}", pulsar.auth_plugin)
}
);
// Additionally, only the string format is supported
let auth_token = if let Some(token) = pulsar.auth_params.strip_prefix("token:") {
token
} else {
error_stack::bail!(Error::PulsarAuth {
context: format!(
"expected \"token:\" style prefix. Saw {}",
pulsar.auth_params
)
})
};

let pulsar_auth = Authentication {
name: "token".to_owned(),
data: auth_token.as_bytes().to_vec(),
};

let client = Pulsar::builder(broker_url, TokioExecutor)
.with_auth(pulsar_auth)
.build()
.await
.into_report()
.change_context(Error::JsonSerialization)?;

let client = build_client(broker_url, &pulsar).await?;
let mut producer = client
.producer()
.with_topic(topic_url.clone())
Expand Down Expand Up @@ -191,6 +159,61 @@ pub(super) async fn write(
Ok(())
}

// Builds the pulsar client
async fn build_client(
broker_url: &str,
pulsar: &PulsarDestination,
) -> error_stack::Result<Pulsar<TokioExecutor>, Error> {
let mut client_builder = Pulsar::builder(broker_url, TokioExecutor);

// Add authorization
if !pulsar.auth_plugin.is_empty() {
// Currently, we only support auth with jwt tokens
// https://pulsar.apache.org/docs/2.4.0/security-token-client/
error_stack::ensure!(
pulsar.auth_plugin == "org.apache.pulsar.client.impl.auth.AuthenticationToken",
Error::PulsarAuth {
context: format!("unsupported auth plugin: {}", pulsar.auth_plugin)
}
);
// Additionally, only the string format is supported
let auth_token = if let Some(token) = pulsar.auth_params.strip_prefix("token:") {
token
} else {
error_stack::bail!(Error::PulsarAuth {
context: format!(
"expected \"token:\" style prefix. Saw {}",
pulsar.auth_params
)
})
};

let pulsar_auth = Authentication {
name: "token".to_owned(),
data: auth_token.as_bytes().to_vec(),
};
client_builder = client_builder.with_auth(pulsar_auth);
};

// Add TLS encryption
if !pulsar.certificate_chain.is_empty() {
// The default values for the other configs are explicitly show here for
// clarity. We can allow the user to configure these if requested.
client_builder = client_builder
.with_allow_insecure_connection(false)
.with_tls_hostname_verification_enabled(true)
.with_certificate_chain(pulsar.certificate_chain.as_bytes().to_vec());
};

let client = client_builder
.build()
.await
.into_report()
.change_context(Error::JsonSerialization)?;

Ok(client)
}

// Drops columns to match the given output schema
fn get_output_batch(
output_schema: SchemaRef,
Expand Down
3 changes: 3 additions & 0 deletions proto/kaskada/kaskada/v1alpha/destinations.proto
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,7 @@ message PulsarDestination {
//
// This field is populated in the output response.
string topic_url = 7 [(google.api.field_behavior) = OUTPUT_ONLY];

// A custom certificate chain to authenticate the server in TLS connections.
string certificate_chain = 8;
}

0 comments on commit 29fb802

Please sign in to comment.