Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SchemaRegistry] update positional args to req kwargs #20763

Merged
merged 8 commits into from
Sep 27, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Features Added

- `auto_register_schemas` keyword argument has been added to `SchemaRegistryAvroSerializer`, which will allow for automatically registering schemas passed in to the `serialize`.
- `value` parameter in `serialize` on `SchemaRegistryAvroSerializer` takes type `Mapping` rather than `Dict`.

### Breaking Changes

Expand All @@ -13,6 +14,9 @@
- `data` parameter in the `serialize` and `deserialize` methods on `SchemaRegistryAvroSerializer` has been renamed `value`.
- `schema` parameter in the `serialize` method on `SchemaRegistryAvroSerializer` no longer accepts argument of type `bytes`.
- `SchemaRegistryAvroSerializer` constructor no longer takes in the `codec` keyword argument.
- The following positional arguments are now required keyword arguments:
- `client` and `group_name` in `SchemaRegistryAvroSerializer` constructor
- `schema` in `serialize` on `SchemaRegistryAvroSerializer`

### Bugs Fixed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#
# --------------------------------------------------------------------------
from io import BytesIO
from typing import Any, Dict, Union
from typing import Any, Dict, Mapping
import avro

from ._constants import SCHEMA_ID_START_INDEX, SCHEMA_ID_LENGTH, DATA_START_INDEX
Expand All @@ -36,20 +36,23 @@ class SchemaRegistryAvroSerializer(object):
SchemaRegistryAvroSerializer provides the ability to serialize and deserialize data according
to the given avro schema. It would automatically register, get and cache the schema.

:param client: The schema registry client
:keyword client: Required. The schema registry client
which is used to register schema and retrieve schema from the service.
:type client: ~azure.schemaregistry.SchemaRegistryClient
:param str group_name: Schema group under which schema should be registered.
:paramtype client: ~azure.schemaregistry.SchemaRegistryClient
:keyword str group_name: Required. Schema group under which schema should be registered.
:keyword bool auto_register_schemas: When true, register new schemas passed to serialize.
Otherwise, and by default, fail if it has not been pre-registered in the registry.

"""

def __init__(self, client, group_name, **kwargs):
# type: ("SchemaRegistryClient", str, Any) -> None
self._schema_group = group_name
def __init__(self, **kwargs):
# type: (Any) -> None
try:
self._schema_group = kwargs.pop("group_name")
self._schema_registry_client = kwargs.pop("client") # type: "SchemaRegistryClient"
except KeyError as e:
raise TypeError("'{}' is a required keyword.".format(e.args[0]))
self._avro_serializer = AvroObjectSerializer(codec=kwargs.get("codec"))
self._schema_registry_client = client # type: "SchemaRegistryClient"
self._auto_register_schemas = kwargs.get("auto_register_schemas", False)
self._auto_register_schema_func = (
self._schema_registry_client.register_schema
Expand Down Expand Up @@ -119,20 +122,23 @@ def _get_schema(self, schema_id, **kwargs):
self._schema_to_id[schema_str] = schema_id
return schema_str

def serialize(self, value, schema, **kwargs):
# type: (Dict[str, Any], Union[str, bytes], Any) -> bytes
def serialize(self, value, **kwargs):
# type: (Mapping[str, Any], Any) -> bytes
"""
Encode data with the given schema. The returns bytes are consisted of: The first 4 bytes
denoting record format identifier. The following 32 bytes denoting schema id returned by schema registry
service. The remaining bytes are the real data payload.

:param value: The data to be encoded.
:type value: Dict[str, Any]
:param schema: The schema used to encode the data.
:type schema: str
:type value: Mapping[str, Any]
:keyword schema: Required. The schema used to encode the data.
:paramtype schema: str
:rtype: bytes
"""
raw_input_schema = schema
try:
raw_input_schema = kwargs.pop("schema")
except KeyError as e:
raise TypeError("'{}' is a required keyword.".format(e.args[0]))
try:
cached_schema = self._user_input_schema_cache[raw_input_schema]
except KeyError:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
CLIENT_ID=os.environ['SCHEMA_REGISTRY_AZURE_CLIENT_ID']
CLIENT_SECRET=os.environ['SCHEMA_REGISTRY_AZURE_CLIENT_SECRET']

SCHEMA_REGISTRY_ENDPOINT=os.environ['SCHEMA_REGISTRY_ENDPOINT']
SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE=os.environ['SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE']
GROUP_NAME=os.environ['SCHEMA_REGISTRY_GROUP']
SCHEMA_STRING = """
{"namespace": "example.avro",
Expand All @@ -59,9 +59,9 @@ def serialize(serializer):
dict_data_alice = {"name": u"Alice", "favorite_number": 15, "favorite_color": u"green"}

# Schema would be automatically registered into Schema Registry and cached locally.
payload_ben = serializer.serialize(dict_data_ben, SCHEMA_STRING)
payload_ben = serializer.serialize(dict_data_ben, schema=SCHEMA_STRING)
# The second call won't trigger a service call.
payload_alice = serializer.serialize(dict_data_alice, SCHEMA_STRING)
payload_alice = serializer.serialize(dict_data_alice, schema=SCHEMA_STRING)

print('Encoded bytes are: ', payload_ben)
print('Encoded bytes are: ', payload_alice)
Expand All @@ -79,8 +79,8 @@ def deserialize(serializer, bytes_payload):


if __name__ == '__main__':
schema_registry = SchemaRegistryClient(endpoint=SCHEMA_REGISTRY_ENDPOINT, credential=token_credential)
serializer = SchemaRegistryAvroSerializer(schema_registry, GROUP_NAME)
schema_registry = SchemaRegistryClient(endpoint=SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE, credential=token_credential)
serializer = SchemaRegistryAvroSerializer(client=schema_registry, group_name=GROUP_NAME, auto_register_schemas=True)
bytes_data_ben, bytes_data_alice = serialize(serializer)
dict_data_ben = deserialize(serializer, bytes_data_ben)
dict_data_alice = deserialize(serializer, bytes_data_alice)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
EVENTHUB_CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR']
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']

SCHEMA_REGISTRY_ENDPOINT = os.environ['SCHEMA_REGISTRY_ENDPOINT']
SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE = os.environ['SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE']
GROUP_NAME = os.environ['SCHEMA_REGISTRY_GROUP']


Expand All @@ -45,12 +45,14 @@ def on_event(partition_context, event):


# create a SchemaRegistryAvroSerializer instance
# TODO: after 'azure-schemaregistry==1.0.0b3' is released, update 'endpoint' to 'fully_qualified_namespace'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha, this comment is nice.

(small suggestion that we could put it as a tasks of the issue #20628 so that we can't forget it)

Copy link
Member Author

@swathipil swathipil Sep 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea! I can add it to the issue as well as leaving this comment for a double reminder. (Even though the tests will definitely fail anyway even if I didn't have this reminder.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good!

avro_serializer = SchemaRegistryAvroSerializer(
client=SchemaRegistryClient(
endpoint=SCHEMA_REGISTRY_ENDPOINT,
endpoint=SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE,
credential=DefaultAzureCredential()
),
group_name=GROUP_NAME
group_name=GROUP_NAME,
auto_register_schemas=True
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
EVENTHUB_CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR']
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']

SCHEMA_REGISTRY_ENDPOINT = os.environ['SCHEMA_REGISTRY_ENDPOINT']
SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE = os.environ['SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE']
GROUP_NAME = os.environ['SCHEMA_REGISTRY_GROUP']

SCHEMA_STRING = """
Expand Down Expand Up @@ -59,12 +59,14 @@ def send_event_data_batch(producer, serializer):


# create a SchemaRegistryAvroSerializer instance
# TODO: after 'azure-schemaregistry==1.0.0b3' is released, update 'endpoint' to 'fully_qualified_namespace'
avro_serializer = SchemaRegistryAvroSerializer(
client=SchemaRegistryClient(
endpoint=SCHEMA_REGISTRY_ENDPOINT,
endpoint=SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE,
credential=DefaultAzureCredential()
),
group_name=GROUP_NAME
group_name=GROUP_NAME,
auto_register_schemas=True
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ interactions:
uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2017-04
response:
body:
string: '{"id":"041afcdb34a546faa3aa26a991567e32"}'
string: '{"id":"fc61e4d3e31b46f6a758fa1b67f35cc5"}'
headers:
content-type:
- application/json
date:
- Wed, 08 Sep 2021 22:17:05 GMT
- Fri, 24 Sep 2021 19:54:45 GMT
location:
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2017-04
server:
Expand All @@ -38,9 +38,9 @@ interactions:
transfer-encoding:
- chunked
x-schema-id:
- 041afcdb34a546faa3aa26a991567e32
- fc61e4d3e31b46f6a758fa1b67f35cc5
x-schema-id-location:
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/041afcdb34a546faa3aa26a991567e32?api-version=2017-04
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/fc61e4d3e31b46f6a758fa1b67f35cc5?api-version=2017-04
x-schema-type:
- Avro
x-schema-version:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ interactions:
uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2017-04
response:
body:
string: '{"id":"041afcdb34a546faa3aa26a991567e32"}'
string: '{"id":"fc61e4d3e31b46f6a758fa1b67f35cc5"}'
headers:
content-type:
- application/json
date:
- Wed, 08 Sep 2021 22:17:06 GMT
- Fri, 24 Sep 2021 19:54:47 GMT
location:
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2017-04
server:
Expand All @@ -38,9 +38,9 @@ interactions:
transfer-encoding:
- chunked
x-schema-id:
- 041afcdb34a546faa3aa26a991567e32
- fc61e4d3e31b46f6a758fa1b67f35cc5
x-schema-id-location:
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/041afcdb34a546faa3aa26a991567e32?api-version=2017-04
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/fc61e4d3e31b46f6a758fa1b67f35cc5?api-version=2017-04
x-schema-type:
- Avro
x-schema-version:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
)


SCHEMA_REGISTRY_ENDPOINT_PARAM = "schemaregistry_endpoint"
SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE_PARAM = "schemaregistry_fully_qualified_namespace"
SCHEMA_REGISTRY_GROUP_PARAM = "schemaregistry_group"
SCHEMA_REGISTRY_ENDPOINT_ENV_KEY_NAME = 'SCHEMA_REGISTRY_ENDPOINT'
SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE_ENV_KEY_NAME = 'SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE'
SCHEMA_REGISTRY_GROUP_ENV_KEY_NAME = 'SCHEMA_REGISTRY_GROUP'


Expand All @@ -59,13 +59,13 @@ def create_resource(self, name, **kwargs):
# TODO: right now the endpoint/group is fixed, as there is no way to create/delete resources using api, in the future we should be able to dynamically create and remove resources
if self.is_live:
return {
SCHEMA_REGISTRY_ENDPOINT_PARAM: os.environ[SCHEMA_REGISTRY_ENDPOINT_ENV_KEY_NAME],
SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE_PARAM: os.environ[SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE_ENV_KEY_NAME],
SCHEMA_REGISTRY_GROUP_PARAM: os.environ[SCHEMA_REGISTRY_GROUP_ENV_KEY_NAME]
}

else:
return {
SCHEMA_REGISTRY_ENDPOINT_PARAM: "sr-playground.servicebus.windows.net",
SCHEMA_REGISTRY_FULLY_QUALIFIED_NAMESPACE_PARAM: "sr-playground.servicebus.windows.net",
SCHEMA_REGISTRY_GROUP_PARAM: "azsdk_python_test_group"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

from devtools_testutils import AzureTestCase, PowerShellPreparer

SchemaRegistryPowerShellPreparer = functools.partial(PowerShellPreparer, "schemaregistry", schemaregistry_endpoint="fake_resource.servicebus.windows.net/", schemaregistry_group="fakegroup")
SchemaRegistryPowerShellPreparer = functools.partial(PowerShellPreparer, "schemaregistry", schemaregistry_fully_qualified_namespace="fake_resource.servicebus.windows.net/", schemaregistry_group="fakegroup")

class SchemaRegistryAvroSerializerTests(AzureTestCase):

Expand Down Expand Up @@ -75,15 +75,16 @@ def test_raw_avro_serializer_negative(self):
raw_avro_object_serializer.serialize(dict_data_missing_required_field, schema)

@SchemaRegistryPowerShellPreparer()
def test_basic_sr_avro_serializer_with_auto_register_schemas(self, schemaregistry_endpoint, schemaregistry_group, **kwargs):
sr_client = self.create_basic_client(SchemaRegistryClient, endpoint=schemaregistry_endpoint)
sr_avro_serializer = SchemaRegistryAvroSerializer(sr_client, schemaregistry_group, auto_register_schemas=True)
def test_basic_sr_avro_serializer_with_auto_register_schemas(self, schemaregistry_fully_qualified_namespace, schemaregistry_group, **kwargs):
# TODO: AFTER RELEASING azure-schemaregistry=1.0.0b3, UPDATE 'endpoint' to 'fully_qualified_namespace'
sr_client = self.create_basic_client(SchemaRegistryClient, endpoint=schemaregistry_fully_qualified_namespace)
sr_avro_serializer = SchemaRegistryAvroSerializer(client=sr_client, group_name=schemaregistry_group, auto_register_schemas=True)

schema_str = """{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}"""
schema = avro.schema.parse(schema_str)

dict_data = {"name": u"Ben", "favorite_number": 7, "favorite_color": u"red"}
encoded_data = sr_avro_serializer.serialize(dict_data, schema_str)
encoded_data = sr_avro_serializer.serialize(dict_data, schema=schema_str)

assert schema_str in sr_avro_serializer._user_input_schema_cache
assert str(avro.schema.parse(schema_str)) in sr_avro_serializer._schema_to_id
Expand All @@ -102,15 +103,16 @@ def test_basic_sr_avro_serializer_with_auto_register_schemas(self, schemaregistr
sr_avro_serializer.close()

@SchemaRegistryPowerShellPreparer()
def test_basic_sr_avro_serializer_without_auto_register_schemas(self, schemaregistry_endpoint, schemaregistry_group, **kwargs):
sr_client = self.create_basic_client(SchemaRegistryClient, endpoint=schemaregistry_endpoint)
sr_avro_serializer = SchemaRegistryAvroSerializer(sr_client, schemaregistry_group)
def test_basic_sr_avro_serializer_without_auto_register_schemas(self, schemaregistry_fully_qualified_namespace, schemaregistry_group, **kwargs):
# TODO: AFTER RELEASING azure-schemaregistry=1.0.0b3, UPDATE 'endpoint' to 'fully_qualified_namespace'
sr_client = self.create_basic_client(SchemaRegistryClient, endpoint=schemaregistry_fully_qualified_namespace)
sr_avro_serializer = SchemaRegistryAvroSerializer(client=sr_client, group_name=schemaregistry_group)

schema_str = """{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}"""
schema = avro.schema.parse(schema_str)

dict_data = {"name": u"Ben", "favorite_number": 7, "favorite_color": u"red"}
encoded_data = sr_avro_serializer.serialize(dict_data, schema_str)
encoded_data = sr_avro_serializer.serialize(dict_data, schema=schema_str)

assert schema_str in sr_avro_serializer._user_input_schema_cache
assert str(avro.schema.parse(schema_str)) in sr_avro_serializer._schema_to_id
Expand Down
2 changes: 2 additions & 0 deletions sdk/schemaregistry/azure-schemaregistry/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
- `schema_definition`, which has been renamed from `schema_content`
- `format`, which has been renamed from `serialization_type`
- `endpoint` parameter in `SchemaRegistryClient` constructor has been renamed `fully_qualified_namespace`
- `location` instance variable in `SchemaProperties` has been removed.
- `Schema` and `SchemaProperties` no longer have positional parameters, as they will not be constructed by the user.

### Bugs Fixed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
# IN THE SOFTWARE.
#
# --------------------------------------------------------------------------
from typing import Any, Optional
from typing import Any


class SchemaProperties(object):
Expand All @@ -50,13 +50,12 @@ class SchemaProperties(object):

def __init__(
self,
id=None, # pylint:disable=redefined-builtin
**kwargs
):
# type: (Optional[str], Any) -> None
self.id = id
# type: (Any) -> None
self.id = kwargs.pop('id')
self.format = kwargs.get('format')
swathipil marked this conversation as resolved.
Show resolved Hide resolved
self.version = kwargs.get('version')
self.version = kwargs.pop('version')


class Schema(object):
Expand All @@ -81,9 +80,8 @@ class Schema(object):

def __init__(
self,
schema_definition,
properties,
**kwargs
):
# type: (str, SchemaProperties) -> None
self.schema_definition = schema_definition
self.properties = properties
# type: (Any) -> None
self.schema_definition = kwargs.pop("schema_definition")
self.properties = kwargs.pop("properties")
Loading