Skip to content

Commit

Permalink
best
Browse files Browse the repository at this point in the history
  • Loading branch information
fajpunk committed Sep 19, 2024
1 parent 4657f2c commit ea7bb1a
Show file tree
Hide file tree
Showing 15 changed files with 69 additions and 295 deletions.
28 changes: 15 additions & 13 deletions safir/src/safir/kafka/_aiokafka_admin_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from .config import (
KafkaConnectionSettings,
KafkaPlaintextSettings,
KafkaSaslSettings,
KafkaSecurityProtocol,
KafkaSaslPlaintextSettings,
KafkaSaslSslSettings,
KafkaSslSettings,
)

Expand Down Expand Up @@ -34,11 +34,11 @@ def make_kafka_admin_client(
security_protocol="SSL",
ssl_context=auth.ssl_context,
)
case KafkaSaslSettings():
case KafkaSaslSslSettings() | KafkaSaslPlaintextSettings():
return _sasl(
client_id=client_id,
bootstrap_servers=config.bootstrap_servers,
auth_config=auth,
config=auth,
)
case KafkaPlaintextSettings():
return AIOKafkaAdminClient(
Expand All @@ -49,21 +49,23 @@ def make_kafka_admin_client(


def _sasl(
client_id: str, bootstrap_servers: str, auth_config: KafkaSaslSettings
client_id: str,
bootstrap_servers: str,
config: KafkaSaslSslSettings | KafkaSaslPlaintextSettings,
) -> AIOKafkaAdminClient:
"""Construct an admin client from SASL auth settings."""
match auth_config.security_protocol:
case KafkaSecurityProtocol.SASL_PLAINTEXT:
match config:
case KafkaSaslPlaintextSettings():
ssl_context = None
case KafkaSecurityProtocol.SASL_SSL:
ssl_context = auth_config.ssl_context
case KafkaSaslSslSettings():
ssl_context = config.ssl_context

return AIOKafkaAdminClient(
bootstrap_servers=bootstrap_servers,
client_id=client_id,
security_protocol=auth_config.security_protocol,
sasl_mechanism=auth_config.sasl_mechanism,
sasl_plain_username=auth_config.sasl_username,
sasl_plain_password=auth_config.sasl_password.get_secret_value(),
security_protocol=config.security_protocol,
sasl_mechanism=config.sasl_mechanism,
sasl_plain_username=config.sasl_username,
sasl_plain_password=config.sasl_password.get_secret_value(),
ssl_context=ssl_context,
)
26 changes: 13 additions & 13 deletions safir/src/safir/kafka/_aiokafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from .config import (
KafkaConnectionSettings,
KafkaPlaintextSettings,
KafkaSaslSettings,
KafkaSecurityProtocol,
KafkaSaslPlaintextSettings,
KafkaSaslSslSettings,
KafkaSslSettings,
)

Expand Down Expand Up @@ -37,12 +37,12 @@ def make_kafka_consumer(
security_protocol="SSL",
ssl_context=auth.ssl_context,
)
case KafkaSaslSettings():
case KafkaSaslSslSettings() | KafkaSaslPlaintextSettings():
return _sasl(
client_id=client_id,
group_id=group_id,
bootstrap_servers=config.bootstrap_servers,
auth_config=auth,
config=auth,
)
case KafkaPlaintextSettings():
return AIOKafkaConsumer(
Expand All @@ -56,23 +56,23 @@ def make_kafka_consumer(
def _sasl(
client_id: str,
bootstrap_servers: str,
auth_config: KafkaSaslSettings,
config: KafkaSaslSslSettings | KafkaSaslPlaintextSettings,
group_id: str | None = None,
) -> AIOKafkaConsumer:
"""Construct a consumer from SASL auth settings."""
match auth_config.security_protocol:
case KafkaSecurityProtocol.SASL_PLAINTEXT:
match config:
case KafkaSaslSslSettings():
ssl_context = config.ssl_context
case KafkaSaslPlaintextSettings():
ssl_context = None
case KafkaSecurityProtocol.SASL_SSL:
ssl_context = auth_config.ssl_context

return AIOKafkaConsumer(
bootstrap_servers=bootstrap_servers,
client_id=client_id,
group_id=group_id,
security_protocol=auth_config.security_protocol,
sasl_mechanism=auth_config.sasl_mechanism,
sasl_plain_username=auth_config.sasl_username,
sasl_plain_password=auth_config.sasl_password.get_secret_value(),
security_protocol=config.security_protocol,
sasl_mechanism=config.sasl_mechanism,
sasl_plain_username=config.sasl_username,
sasl_plain_password=config.sasl_password.get_secret_value(),
ssl_context=ssl_context,
)
20 changes: 11 additions & 9 deletions safir/src/safir/kafka/_faststream_kafka_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
KafkaConnectionSettings,
KafkaPlaintextSettings,
KafkaSaslMechanism,
KafkaSaslSettings,
KafkaSecurityProtocol,
KafkaSaslPlaintextSettings,
KafkaSaslSslSettings,
KafkaSslSettings,
)

Expand All @@ -36,7 +36,7 @@ def make_kafka_broker(
match auth:
case KafkaSslSettings():
security = BaseSecurity(ssl_context=auth.ssl_context)
case KafkaSaslSettings():
case KafkaSaslSslSettings() | KafkaSaslPlaintextSettings():
security = _sasl(auth)
case KafkaPlaintextSettings():
security = BaseSecurity()
Expand All @@ -49,9 +49,9 @@ def make_kafka_broker(


def _sasl(
config: KafkaSaslSettings,
config: KafkaSaslSslSettings | KafkaSaslPlaintextSettings,
) -> SASLScram512 | SASLScram256 | SASLPlaintext:
"""Create a Faststream Security for SASL authentication."""
"""Create a FastStream Security for SASL authentication."""
cls: type[SASLScram512 | SASLScram256 | SASLPlaintext]
match config.sasl_mechanism:
case KafkaSaslMechanism.SCRAM_SHA_512:
Expand All @@ -61,10 +61,12 @@ def _sasl(
case KafkaSaslMechanism.PLAIN:
cls = SASLPlaintext

if config.security_protocol == KafkaSecurityProtocol.SASL_PLAINTEXT:
ssl_context = None
else:
ssl_context = config.ssl_context
match config:
case KafkaSaslSslSettings():
ssl_context = config.ssl_context
case KafkaSaslPlaintextSettings():
ssl_context = None

return cls(
username=config.sasl_username,
password=config.sasl_password.get_secret_value(),
Expand Down
48 changes: 26 additions & 22 deletions safir/src/safir/kafka/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ def ssl_context(self) -> ssl.SSLContext:
)


class KafkaSaslSettings(BaseModel):
"""Subset of settings required for SASL auth."""
class KafkaSaslPlaintextSettings(BaseModel):
"""Subset of settings required for SASL SSLauth."""

security_protocol: Literal[
KafkaSecurityProtocol.SASL_PLAINTEXT, KafkaSecurityProtocol.SASL_SSL
Expand All @@ -88,31 +88,33 @@ class KafkaSaslSettings(BaseModel):

sasl_password: SecretStr

cluster_ca_path: FilePath | None

client_cert_path: FilePath | None
class KafkaSaslSslSettings(BaseModel):
"""Subset of settings required for SASL PLAINTEXT auth."""

client_key_path: FilePath | None
security_protocol: Literal[
KafkaSecurityProtocol.SASL_PLAINTEXT, KafkaSecurityProtocol.SASL_SSL
]

sasl_mechanism: KafkaSaslMechanism

sasl_username: str

sasl_password: SecretStr

cluster_ca_path: FilePath | None

@property
def ssl_context(self) -> ssl.SSLContext:
"""An SSL context for connecting to Kafka, if the Kafka connection is
configured to use TLS authentication.
"""
cafile = None
certfile = None
keyfile = None

if self.cluster_ca_path:
cafile = str(self.cluster_ca_path)
if self.client_cert_path:
certfile = str(self.client_cert_path)
if self.client_key_path:
keyfile = str(self.client_key_path)
return helpers.create_ssl_context(
cafile=cafile,
certfile=certfile,
keyfile=keyfile,
)


Expand All @@ -129,9 +131,7 @@ class KafkaConnectionSettings(BaseSettings):
have different sets of required settings. All of these settings can be
provided in ``KAFKA_`` prefixed environment variables. The
``auth_settings`` property enforces at runtime that the correct settings
were provided for the desired authentication method, and provides
non-optional attributes to access those settings::
were provided for the desired authentication method, and provides non-optional attributes to access those settings::
An instance of this model can be passed directly to the various client
constructor functions in this module. This allows for succinct kafka setup
in applications::
Expand Down Expand Up @@ -238,7 +238,12 @@ def check_auth_settings(self) -> Self:
@property
def auth_settings(
self,
) -> KafkaSslSettings | KafkaSaslSettings | KafkaPlaintextSettings:
) -> (
KafkaSslSettings
| KafkaSaslSslSettings
| KafkaSaslPlaintextSettings
| KafkaPlaintextSettings
):
"""Return a model representing a paricular Kafka auth method.
This method will fail with a ValidationError if an invalid set of
Expand All @@ -247,10 +252,9 @@ def auth_settings(
match self.security_protocol:
case KafkaSecurityProtocol.PLAINTEXT:
return KafkaPlaintextSettings(**self.model_dump())
case (
KafkaSecurityProtocol.SASL_SSL
| KafkaSecurityProtocol.SASL_PLAINTEXT
):
return KafkaSaslSettings(**self.model_dump())
case KafkaSecurityProtocol.SASL_SSL:
return KafkaSaslSslSettings(**self.model_dump())
case KafkaSecurityProtocol.SASL_PLAINTEXT:
return KafkaSaslPlaintextSettings(**self.model_dump())
case KafkaSecurityProtocol.SSL:
return KafkaSslSettings(**self.model_dump())
2 changes: 1 addition & 1 deletion safir/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from safir.testing.kubernetes import MockKubernetesApi, patch_kubernetes
from safir.testing.slack import MockSlackWebhook, mock_slack_webhook

from .support.experiment.kafka import FullKafkaContainer
from .support.kafka.container import FullKafkaContainer
from .support.schema_registry import SchemaRegistryContainer


Expand Down
Loading

0 comments on commit ea7bb1a

Please sign in to comment.