Skip to content

Commit

Permalink
[SchemaRegistry] Samples for EH integration (#13884)
Browse files Browse the repository at this point in the history
* add sample for EH integration

* add samples to readme and tweak the code

* add descriptions

* mention SR and serializer in EH

* small tweak
  • Loading branch information
yunhaoling authored Sep 21, 2020
1 parent ad2142b commit 243212a
Show file tree
Hide file tree
Showing 6 changed files with 257 additions and 2 deletions.
10 changes: 10 additions & 0 deletions sdk/eventhub/azure-eventhub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,11 @@ Please take a look at the [samples](https://github.com/Azure/azure-sdk-for-pytho

Reference documentation is available [here](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/latest/azure.eventhub.html).

### Schema Registry and Avro Serializer

The EventHubs SDK integrates nicely with the [Schema Registry][schemaregistry_service] service and [Avro][avro].
For more information, please refer to [Schema Registry SDK][schemaregistry_repo] and [Schema Registry Avro Serializer SDK][schemaregistry_avroserializer_repo].

### Provide Feedback

If you encounter any bugs or have suggestions, please file an issue in the [Issues](https://github.com/Azure/azure-sdk-for-python/issues) section of the project.
Expand All @@ -429,4 +434,9 @@ PR appropriately (e.g., label, comment). Simply follow the instructions provided
This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/).
For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments.

[avro]: http://avro.apache.org/
[schemaregistry_service]: https://aka.ms/schemaregistry
[schemaregistry_repo]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/schemaregistry/azure-schemaregistry
[schemaregistry_avroserializer_repo]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/schemaregistry/azure-schemaregistry-avroserializer

![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-python/sdk/eventhub/azure-eventhub/README.png)
89 changes: 88 additions & 1 deletion sdk/schemaregistry/azure-schemaregistry-avroserializer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,14 @@ The following sections provide several code snippets covering some of the most c

- [Serialization](#serialization)
- [Deserialization](#deserialization)
- [Event Hubs Sending Integration](#event-hubs-sending-integration)
- [Event Hubs Receiving Integration](#event-hubs-receiving-integration)

### Serialization

Use `SchemaRegistryAvroSerializer.serialize` method to serialize dict data with the given avro schema.
The method would automatically register the schema to the Schema Registry Service and keep the schema cached for future serialization usage.

```python
import os
from azure.schemaregistry import SchemaRegistryClient
Expand Down Expand Up @@ -110,6 +115,9 @@ with serializer:

### Deserialization

Use `SchemaRegistryAvroSerializer.deserialize` method to deserialize raw bytes into dict data.
The method would automatically retrieve the schema from the Schema Registry Service and keep the schema cached for future deserialization usage.

```python
import os
from azure.schemaregistry import SchemaRegistryClient
Expand All @@ -128,6 +136,84 @@ with serializer:
decoded_data = serializer.deserialize(encoded_bytes)
```

### Event Hubs Sending Integration

Integration with [Event Hubs][eventhubs_repo] to send serialized avro dict data as the body of EventData.

```python
import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import SchemaRegistryAvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
endpoint = os.environ['SCHEMA_REGISTRY_ENDPOINT']
schema_group = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

schema_string = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""

schema_registry_client = SchemaRegistryClient(endpoint, token_credential)
avro_serializer = SchemaRegistryAvroSerializer(schema_registry_client, schema_group)

eventhub_producer = EventHubProducerClient.from_connection_string(
conn_str=eventhub_connection_str,
eventhub_name=eventhub_name
)

with eventhub_producer, avro_serializer:
event_data_batch = eventhub_producer.create_batch()
dict_data = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
payload_bytes = avro_serializer.serialize(data=dict_data, schema=schema_string)
event_data_batch.add(EventData(body=payload_bytes))
eventhub_producer.send_batch(event_data_batch)
```

### Event Hubs Receiving Integration

Integration with [Event Hubs][eventhubs_repo] to receive `EventData` and deserialized raw bytes into avro dict data.

```python
import os
from azure.eventhub import EventHubConsumerClient
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import SchemaRegistryAvroSerializer
from azure.identity import DefaultAzureCredential

token_credential = DefaultAzureCredential()
endpoint = os.environ['SCHEMA_REGISTRY_ENDPOINT']
schema_group = "<your-group-name>"
eventhub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']

schema_registry_client = SchemaRegistryClient(endpoint, token_credential)
avro_serializer = SchemaRegistryAvroSerializer(schema_registry_client, schema_group)

eventhub_consumer = EventHubConsumerClient.from_connection_string(
conn_str=eventhub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name,
)

def on_event(partition_context, event):
bytes_payload = b"".join(b for b in event.body)
deserialized_data = avro_serializer.deserialize(bytes_payload)

with eventhub_consumer, avro_serializer:
eventhub_consumer.receive(on_event=on_event, starting_position="-1")
```

## Troubleshooting

### General
Expand Down Expand Up @@ -201,4 +287,5 @@ contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additio
[source_code]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/schemaregistry/azure-schemaregistry-avroserializer
[change_log]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/schemaregistry/azure-schemaregistry-avroserializer/CHANGELOG.md
[schemaregistry_client]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/schemaregistry/azure-schemaregistry
[schemaregistry_service]: https://aka.ms/schemaregistry
[schemaregistry_service]: https://aka.ms/schemaregistry
[eventhubs_repo]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhub
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ Several Schema Registry Avro Serializer Python SDK samples are available to you
* [avro_serializer.py][avro_serializer_sample] - Examples for common Schema Registry Avro Serializer tasks:
* Serialize data according to the given schema
* Deserialize data
* [eventhub_send_integration.py][eventhub_send_integration_sample] - Examples for integration with EventHub in sending tasks:
* Serialize data with the given schema and send `EventData` to Event Hubs.
* [eventhub_receive_integration.py][eventhub_receive_integration_sample] - Examples for integration with EventHub in receiving tasks:
* Receive `EventData` from Event Hubs and deserialize the received bytes.

## Prerequisites
- Python 2.7, 3.5 or later.
Expand Down Expand Up @@ -47,4 +51,6 @@ what you can do with the Azure Schema Registry Avro Serializer library.

<!-- LINKS -->
[avro_serializer_sample]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/avro_serializer.py
[eventhub_send_integration_sample]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/eventhub_send_integration.py
[eventhub_receive_integration_sample]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/eventhub_receive_integration.py
[api_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-schemaregistry-avroserializer/latest/index.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#!/usr/bin/env python

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

"""
Examples to show receiving events from EventHub with SchemaRegistryAvroSerializer integrated for data deserialization.
"""

# pylint: disable=C0111
import os
from azure.eventhub import EventHubConsumerClient
from azure.identity import DefaultAzureCredential
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import SchemaRegistryAvroSerializer

EVENTHUB_CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR']
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']

SCHEMA_REGISTRY_ENDPOINT = os.environ['SCHEMA_REGISTRY_ENDPOINT']
SCHEMA_GROUP = os.environ['SCHEMA_REGISTRY_GROUP']


def on_event(partition_context, event):
print("Received event from partition: {}.".format(partition_context.partition_id))

bytes_payload = b"".join(b for b in event.body)
print('The received bytes of the EventData is {}.'.format(bytes_payload))

# Use the deserialize method to convert bytes to dict object.
# The deserialize method would extract the schema id from the payload, and automatically retrieve the Avro Schema
# from the Schema Registry Service. The schema would be cached locally for future usage.
deserialized_data = avro_serializer.deserialize(bytes_payload)
print('The dict data after deserialization is {}'.format(deserialized_data))


# create an EventHubConsumerClient instance
eventhub_consumer = EventHubConsumerClient.from_connection_string(
conn_str=EVENTHUB_CONNECTION_STR,
consumer_group='$Default',
eventhub_name=EVENTHUB_NAME,
)


# create a SchemaRegistryAvroSerializer instance
avro_serializer = SchemaRegistryAvroSerializer(
schema_registry=SchemaRegistryClient(
endpoint=SCHEMA_REGISTRY_ENDPOINT,
credential=DefaultAzureCredential()
),
schema_group=SCHEMA_GROUP
)


try:
with eventhub_consumer, avro_serializer:
eventhub_consumer.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
except KeyboardInterrupt:
print('Stopped receiving.')
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/usr/bin/env python

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

"""
Examples to show sending event to EventHub with SchemaRegistryAvroSerializer integrated for data serialization.
"""

# pylint: disable=C0111

import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.identity import DefaultAzureCredential
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.serializer.avroserializer import SchemaRegistryAvroSerializer

EVENTHUB_CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR']
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']

SCHEMA_REGISTRY_ENDPOINT = os.environ['SCHEMA_REGISTRY_ENDPOINT']
SCHEMA_GROUP = os.environ['SCHEMA_REGISTRY_GROUP']

SCHEMA_STRING = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""


def send_event_data_batch(producer, serializer):
event_data_batch = producer.create_batch()

dict_data = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
# Use the serialize method to convert dict object to bytes with the given avro schema.
# The serialize method would automatically register the schema into the Schema Registry Service and
# schema would be cached locally for future usage.
payload_bytes = serializer.serialize(data=dict_data, schema=SCHEMA_STRING)
print('The bytes of serialized dict data is {}.'.format(payload_bytes))

event_data = EventData(body=payload_bytes) # pass the bytes data to the body of an EventData
event_data_batch.add(event_data)
producer.send_batch(event_data_batch)
print('Send is done.')


# create an EventHubProducerClient instance
eventhub_producer = EventHubProducerClient.from_connection_string(
conn_str=EVENTHUB_CONNECTION_STR,
eventhub_name=EVENTHUB_NAME
)


# create a SchemaRegistryAvroSerializer instance
avro_serializer = SchemaRegistryAvroSerializer(
schema_registry=SchemaRegistryClient(
endpoint=SCHEMA_REGISTRY_ENDPOINT,
credential=DefaultAzureCredential()
),
schema_group=SCHEMA_GROUP
)


with eventhub_producer, avro_serializer:
send_event_data_batch(eventhub_producer, avro_serializer)
18 changes: 17 additions & 1 deletion sdk/schemaregistry/azure-schemaregistry/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ The following sections provide several code snippets covering some of the most c

### Register a schema

Use `SchemaRegistryClient.register_schema` method to register a schema.

```python
import os

Expand Down Expand Up @@ -83,6 +85,8 @@ with schema_registry_client:

### Get the schema by id

Get the schema content and its properties by schema id.

```python
import os

Expand All @@ -101,6 +105,8 @@ with schema_registry_client:

### Get the id of a schema

Get the schema id of a schema by schema content and its properties.

```python
import os

Expand Down Expand Up @@ -175,6 +181,13 @@ schema_registry_client.get_schema(schema_id, logging_enable=True)

Please take a look at the [samples][sr_samples] directory for detailed examples of how to use this library to register and retrieve schema to/from Schema Registry.

### Event Hubs and Avro Serializer

We provide [azure-schemaregistry-avroserializer][schemaregistry_avroserializer_pypi] library as serializer
implementation to serialize/deserialize avro data integrated with `azure-schemaregistry` for automatic schema registration and retrieval.
It integrates nicely with the [EventHubs SDK][eventhubs_repo].
For more information and sample codes, please refer to the [Azure Schema Registry Avro Serializer SDK][schemaregistry_avroserializer_repo].

## Contributing

This project welcomes contributions and suggestions. Most contributions require you to agree to a
Expand All @@ -200,4 +213,7 @@ contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additio
[api_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-schemaregistry/latest/index.html
[source_code]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/schemaregistry/azure-schemaregistry
[change_log]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/schemaregistry/azure-schemaregistry/CHANGELOG.md
[schemaregistry_service]: https://aka.ms/schemaregistry
[schemaregistry_service]: https://aka.ms/schemaregistry
[schemaregistry_avroserializer_repo]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/schemaregistry/azure-schemaregistry-avroserializer
[schemaregistry_avroserializer_pypi]: https://pypi.org/project/azure-schemaregistry-avroserializer/
[eventhubs_repo]: https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhub

0 comments on commit 243212a

Please sign in to comment.