Skip to content

Commit

Permalink
refactor avro deserializer
Browse files Browse the repository at this point in the history
  • Loading branch information
sauljabin committed Jan 17, 2025
1 parent cf7acb9 commit 12c29b9
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 10 deletions.
9 changes: 3 additions & 6 deletions kaskade/deserializers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import json
from abc import abstractmethod, ABC
from enum import Enum, auto
from io import BytesIO
from struct import unpack
from typing import Any, Type

Expand All @@ -14,15 +13,13 @@
ProtobufDeserializer as ConfluentProtobufDeserializer,
)
from confluent_kafka.serialization import MessageField, SerializationContext
from fastavro import schemaless_reader
from fastavro.schema import load_schema
from google.protobuf.descriptor_pb2 import FileDescriptorSet
from google.protobuf.json_format import MessageToDict
from google.protobuf.message import Message
from google.protobuf.message_factory import GetMessages

from kaskade.configs import SCHEMA_REGISTRY_MAGIC_BYTE
from kaskade.utils import unpack_bytes, file_to_bytes
from kaskade.utils import unpack_bytes, file_to_bytes, avro_to_py


class Deserialization(Enum):
Expand Down Expand Up @@ -195,9 +192,9 @@ def deserialize(
if magic == SCHEMA_REGISTRY_MAGIC_BYTE:
# in case that the avro has a confluent schema registry magic byte
# https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
return schemaless_reader(BytesIO(data[5:]), load_schema(schema_path), None)
return avro_to_py(schema_path, data[5:])

return schemaless_reader(BytesIO(data), load_schema(schema_path), None)
return avro_to_py(schema_path, data)


class ProtobufDeserializer(Deserializer):
Expand Down
14 changes: 10 additions & 4 deletions kaskade/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from typing import Callable, Any

from confluent_kafka import KafkaException
from fastavro import schemaless_writer
from fastavro import schemaless_writer, schemaless_reader
from fastavro.schema import load_schema
from textual.app import App

Expand Down Expand Up @@ -66,6 +66,12 @@ def load_properties(file_path: str, sep: str = "=", comment_char: str = "#") ->

def py_to_avro(schema_path: str, data: dict[str, Any] | MappingProxyType[str, Any]) -> bytes:
schema = load_schema(schema_path)
buffer_writer = BytesIO()
schemaless_writer(buffer_writer, schema, data)
return buffer_writer.getvalue()
buffer = BytesIO()
schemaless_writer(buffer, schema, data)
return buffer.getvalue()


def avro_to_py(schema_path: str, data: bytes) -> Any:
schema = load_schema(schema_path)
buffer = BytesIO(data)
return schemaless_reader(buffer, schema, None)

0 comments on commit 12c29b9

Please sign in to comment.