Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SchemaRegistry] Samples for EH integration #13884

Merged
merged 5 commits into from
Sep 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions sdk/eventhub/azure-eventhub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,11 @@ Please take a look at the [samples](https://github.com/Azure/azure-sdk-for-pytho

Reference documentation is available [here](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/latest/azure.eventhub.html).

### Schema Registry and Avro Serializer

The EventHubs SDK integrates nicely with the [Schema Registry][schemaregistry_service] service and [Avro][avro].
For more information, please refer to [Schema Registry SDK][schemaregistry_repo] and [Schema Registry Avro Serializer SDK][schemaregistry_avroserializer_repo].

### Provide Feedback

If you encounter any bugs or have suggestions, please file an issue in the [Issues](https://github.com/Azure/azure-sdk-for-python/issues) section of the project.
Expand All @@ -429,4 +434,9 @@ PR appropriately (e.g., label, comment). Simply follow the instructions provided
This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/).
For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments.

[avro]: http://avro.apache.org/
[schemaregistry_service]: https://aka.ms/schemaregistry
[schemaregistry_repo]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/schemaregistry/azure-schemaregistry
[schemaregistry_avroserializer_repo]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/schemaregistry/azure-schemaregistry-avroserializer

![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-python/sdk/eventhub/azure-eventhub/README.png)
89 changes: 88 additions & 1 deletion sdk/schemaregistry/azure-schemaregistry-avroserializer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,14 @@ The following sections provide several code snippets covering some of the most c

- [Serialization](#serialization)
- [Deserialization](#deserialization)
- [Event Hubs Sending Integration](#event-hubs-sending-integration)
- [Event Hubs Receiving Integration](#event-hubs-receiving-integration)

### Serialization

Use `SchemaRegistryAvroSerializer.serialize` method to serialize dict data with the given avro schema.
The method would automatically register the schema to the Schema Registry Service and keep the schema cached for future serialization usage.

```python
import os
from azure.schemaregistry import SchemaRegistryClient
Expand Down Expand Up @@ -110,6 +115,9 @@ with serializer:

### Deserialization

Use `SchemaRegistryAvroSerializer.deserialize` method to deserialize raw bytes into dict data.
The method would automatically retrieve the schema from the Schema Registry Service and keep the schema cached for future deserialization usage.

```python
import os
from azure.schemaregistry import SchemaRegistryClient
Expand All @@ -128,6 +136,84 @@ with serializer:
decoded_data = serializer.deserialize(encoded_bytes)
```

### Event Hubs Sending Integration

Integration with [Event Hubs][eventhubs_repo] to send serialized avro dict data as the body of EventData.

```python
import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import SchemaRegistryAvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
endpoint = os.environ['SCHEMA_REGISTRY_ENDPOINT']
schema_group = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

schema_string = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""

schema_registry_client = SchemaRegistryClient(endpoint, token_credential)
avro_serializer = SchemaRegistryAvroSerializer(schema_registry_client, schema_group)

eventhub_producer = EventHubProducerClient.from_connection_string(
conn_str=eventhub_connection_str,
eventhub_name=eventhub_name
)

with eventhub_producer, avro_serializer:
event_data_batch = eventhub_producer.create_batch()
dict_data = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
payload_bytes = avro_serializer.serialize(data=dict_data, schema=schema_string)
event_data_batch.add(EventData(body=payload_bytes))
eventhub_producer.send_batch(event_data_batch)
```

### Event Hubs Receiving Integration

Integration with [Event Hubs][eventhubs_repo] to receive `EventData` and deserialized raw bytes into avro dict data.

```python
import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import SchemaRegistryAvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
endpoint = os.environ['SCHEMA_REGISTRY_ENDPOINT']
schema_group = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

schema_registry_client = SchemaRegistryClient(endpoint, token_credential)
avro_serializer = SchemaRegistryAvroSerializer(schema_registry_client, schema_group)

eventhub_consumer = EventHubConsumerClient.from_connection_string(
conn_str=eventhub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name,
)

def on_event(partition_context, event):
bytes_payload = b"".join(b for b in event.body)
deserialized_data = avro_serializer.deserialize(bytes_payload)

with eventhub_consumer, avro_serializer:
eventhub_consumer.receive(on_event=on_event, starting_position="-1")
```

## Troubleshooting

### General
Expand Down Expand Up @@ -201,4 +287,5 @@ contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additio
[source_code]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/schemaregistry/azure-schemaregistry-avroserializer
[change_log]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/schemaregistry/azure-schemaregistry-avroserializer/CHANGELOG.md
[schemaregistry_client]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/schemaregistry/azure-schemaregistry
[schemaregistry_service]: https://aka.ms/schemaregistry
[schemaregistry_service]: https://aka.ms/schemaregistry
[eventhubs_repo]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhub
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ Several Schema Registry Avro Serializer Python SDK samples are available to you
* [avro_serializer.py][avro_serializer_sample] - Examples for common Schema Registry Avro Serializer tasks:
* Serialize data according to the given schema
* Deserialize data
* [eventhub_send_integration.py][eventhub_send_integration_sample] - Examples for integration with EventHub in sending tasks:
* Serialize data with the given schema and send `EventData` to Event Hubs.
* [eventhub_receive_integration.py][eventhub_receive_integration_sample] - Examples for integration with EventHub in receiving tasks:
* Receive `EventData` from Event Hubs and deserialize the received bytes.

## Prerequisites
- Python 2.7, 3.5 or later.
Expand Down Expand Up @@ -47,4 +51,6 @@ what you can do with the Azure Schema Registry Avro Serializer library.

<!-- LINKS -->
[avro_serializer_sample]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/avro_serializer.py
[eventhub_send_integration_sample]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/eventhub_send_integration.py
[eventhub_receive_integration_sample]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/eventhub_receive_integration.py
[api_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-schemaregistry-avroserializer/latest/index.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#!/usr/bin/env python

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

"""
Examples to show receiving events from EventHub with SchemaRegistryAvroSerializer integrated for data deserialization.
"""

# pylint: disable=C0111
import os
from azure.eventhub import EventHubConsumerClient
from azure.identity import DefaultAzureCredential
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import SchemaRegistryAvroSerializer

EVENTHUB_CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR']
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']

SCHEMA_REGISTRY_ENDPOINT = os.environ['SCHEMA_REGISTRY_ENDPOINT']
SCHEMA_GROUP = os.environ['SCHEMA_REGISTRY_GROUP']


def on_event(partition_context, event):
print("Received event from partition: {}.".format(partition_context.partition_id))

bytes_payload = b"".join(b for b in event.body)
print('The received bytes of the EventData is {}.'.format(bytes_payload))

# Use the deserialize method to convert bytes to dict object.
# The deserialize method would extract the schema id from the payload, and automatically retrieve the Avro Schema
# from the Schema Registry Service. The schema would be cached locally for future usage.
deserialized_data = avro_serializer.deserialize(bytes_payload)
print('The dict data after deserialization is {}'.format(deserialized_data))


# create an EventHubConsumerClient instance
eventhub_consumer = EventHubConsumerClient.from_connection_string(
conn_str=EVENTHUB_CONNECTION_STR,
consumer_group='$Default',
eventhub_name=EVENTHUB_NAME,
)


# create a SchemaRegistryAvroSerializer instance
avro_serializer = SchemaRegistryAvroSerializer(
schema_registry=SchemaRegistryClient(
endpoint=SCHEMA_REGISTRY_ENDPOINT,
credential=DefaultAzureCredential()
),
schema_group=SCHEMA_GROUP
)


try:
with eventhub_consumer, avro_serializer:
eventhub_consumer.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
except KeyboardInterrupt:
print('Stopped receiving.')
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/usr/bin/env python

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

"""
Examples to show sending event to EventHub with SchemaRegistryAvroSerializer integrated for data serialization.
"""

# pylint: disable=C0111

import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.identity import DefaultAzureCredential
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import SchemaRegistryAvroSerializer

EVENTHUB_CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR']
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']

SCHEMA_REGISTRY_ENDPOINT = os.environ['SCHEMA_REGISTRY_ENDPOINT']
SCHEMA_GROUP = os.environ['SCHEMA_REGISTRY_GROUP']

SCHEMA_STRING = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""


def send_event_data_batch(producer, serializer):
event_data_batch = producer.create_batch()

dict_data = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
# Use the serialize method to convert dict object to bytes with the given avro schema.
# The serialize method would automatically register the schema into the Schema Registry Service and
# schema would be cached locally for future usage.
payload_bytes = serializer.serialize(data=dict_data, schema=SCHEMA_STRING)
print('The bytes of serialized dict data is {}.'.format(payload_bytes))

event_data = EventData(body=payload_bytes) # pass the bytes data to the body of an EventData
event_data_batch.add(event_data)
producer.send_batch(event_data_batch)
print('Send is done.')


# create an EventHubProducerClient instance
eventhub_producer = EventHubProducerClient.from_connection_string(
conn_str=EVENTHUB_CONNECTION_STR,
eventhub_name=EVENTHUB_NAME
)


# create a SchemaRegistryAvroSerializer instance
avro_serializer = SchemaRegistryAvroSerializer(
schema_registry=SchemaRegistryClient(
endpoint=SCHEMA_REGISTRY_ENDPOINT,
credential=DefaultAzureCredential()
),
schema_group=SCHEMA_GROUP
)


with eventhub_producer, avro_serializer:
send_event_data_batch(eventhub_producer, avro_serializer)
18 changes: 17 additions & 1 deletion sdk/schemaregistry/azure-schemaregistry/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ The following sections provide several code snippets covering some of the most c

### Register a schema

Use `SchemaRegistryClient.register_schema` method to register a schema.

```python
import os

Expand Down Expand Up @@ -83,6 +85,8 @@ with schema_registry_client:

### Get the schema by id

Get the schema content and its properties by schema id.

```python
import os

Expand All @@ -101,6 +105,8 @@ with schema_registry_client:

### Get the id of a schema

Get the schema id of a schema by schema content and its properties.

```python
import os

Expand Down Expand Up @@ -175,6 +181,13 @@ schema_registry_client.get_schema(schema_id, logging_enable=True)

Please take a look at the [samples][sr_samples] directory for detailed examples of how to use this library to register and retrieve schema to/from Schema Registry.

### Event Hubs and Avro Serializer

We provide [azure-schemaregistry-avroserializer][schemaregistry_avroserializer_pypi] library as serializer
implementation to serialize/deserialize avro data integrated with `azure-schemaregistry` for automatic schema registration and retrieval.
It integrates nicely with the [EventHubs SDK][eventhubs_repo].
For more information and sample codes, please refer to the [Azure Schema Registry Avro Serializer SDK][schemaregistry_avroserializer_repo].

## Contributing

This project welcomes contributions and suggestions. Most contributions require you to agree to a
Expand All @@ -200,4 +213,7 @@ contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additio
[api_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-schemaregistry/latest/index.html
[source_code]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/schemaregistry/azure-schemaregistry
[change_log]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/schemaregistry/azure-schemaregistry/CHANGELOG.md
[schemaregistry_service]: https://aka.ms/schemaregistry
[schemaregistry_service]: https://aka.ms/schemaregistry
[schemaregistry_avroserializer_repo]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/schemaregistry/azure-schemaregistry-avroserializer
[schemaregistry_avroserializer_pypi]: https://pypi.org/project/azure-schemaregistry-avroserializer/
[eventhubs_repo]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhub