Skip to content

Commit

Permalink
fix kafka unmarshaller args typing and defaults (#240)
Browse files Browse the repository at this point in the history
* fix kafka unmarshaller args typing and defaults

Signed-off-by: Christoph Hösler <christoph.hoesler@inovex.de>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Signed-off-by: Christoph Hösler <christoph.hoesler@inovex.de>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
hoesler and pre-commit-ci[bot] authored Oct 30, 2024
1 parent c6c7e8c commit efca352
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions cloudevents/kafka/conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,14 @@
from cloudevents.kafka.exceptions import KeyMapperError
from cloudevents.sdk import types

DEFAULT_MARSHALLER: types.MarshallerType = json.dumps
DEFAULT_UNMARSHALLER: types.MarshallerType = json.loads
DEFAULT_EMBEDDED_DATA_MARSHALLER: types.MarshallerType = lambda x: x
JSON_MARSHALLER: types.MarshallerType = json.dumps
JSON_UNMARSHALLER: types.UnmarshallerType = json.loads
IDENTITY_MARSHALLER = IDENTITY_UNMARSHALLER = lambda x: x

DEFAULT_MARSHALLER: types.MarshallerType = JSON_MARSHALLER
DEFAULT_UNMARSHALLER: types.UnmarshallerType = JSON_UNMARSHALLER
DEFAULT_EMBEDDED_DATA_MARSHALLER: types.MarshallerType = IDENTITY_MARSHALLER
DEFAULT_EMBEDDED_DATA_UNMARSHALLER: types.UnmarshallerType = IDENTITY_UNMARSHALLER


class KafkaMessage(typing.NamedTuple):
Expand Down Expand Up @@ -109,7 +114,7 @@ def to_binary(
def from_binary(
message: KafkaMessage,
event_type: typing.Optional[typing.Type[AnyCloudEvent]] = None,
data_unmarshaller: typing.Optional[types.MarshallerType] = None,
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> AnyCloudEvent:
"""
Returns a CloudEvent from a KafkaMessage in binary format.
Expand Down Expand Up @@ -208,7 +213,7 @@ def to_structured(
def from_structured(
message: KafkaMessage,
event_type: typing.Optional[typing.Type[AnyCloudEvent]] = None,
data_unmarshaller: typing.Optional[types.MarshallerType] = None,
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
envelope_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> AnyCloudEvent:
"""
Expand All @@ -222,7 +227,7 @@ def from_structured(
:returns: CloudEvent
"""

data_unmarshaller = data_unmarshaller or DEFAULT_EMBEDDED_DATA_MARSHALLER
data_unmarshaller = data_unmarshaller or DEFAULT_EMBEDDED_DATA_UNMARSHALLER
envelope_unmarshaller = envelope_unmarshaller or DEFAULT_UNMARSHALLER
try:
structure = envelope_unmarshaller(message.value)
Expand Down

0 comments on commit efca352

Please sign in to comment.