Skip to content

Commit

Permalink
[EventGrid] Receive Functions (#13428)
Browse files Browse the repository at this point in the history
* initial changes

* comments

* str

* static

* from_json

* Update sdk/eventgrid/azure-eventgrid/azure/eventgrid/_helpers.py
  • Loading branch information
Rakshith Bhyravabhotla authored Sep 2, 2020
1 parent 27d6421 commit d32cfe4
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 149 deletions.
4 changes: 2 additions & 2 deletions sdk/eventgrid/azure-eventgrid/azure/eventgrid/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
from ._publisher_client import EventGridPublisherClient
from ._consumer import EventGridConsumer
from ._helpers import generate_shared_access_signature
from ._models import CloudEvent, CustomEvent, EventGridEvent, DeserializedEvent, StorageBlobCreatedEventData
from ._models import CloudEvent, CustomEvent, EventGridEvent, StorageBlobCreatedEventData
from ._shared_access_signature_credential import EventGridSharedAccessSignatureCredential
from ._version import VERSION

__all__ = ['EventGridPublisherClient', 'EventGridConsumer',
'CloudEvent', 'CustomEvent', 'DeserializedEvent', 'EventGridEvent', 'StorageBlobCreatedEventData',
'CloudEvent', 'CustomEvent', 'EventGridEvent', 'StorageBlobCreatedEventData',
'generate_shared_access_signature', 'EventGridSharedAccessSignatureCredential']
__version__ = VERSION
53 changes: 36 additions & 17 deletions sdk/eventgrid/azure-eventgrid/azure/eventgrid/_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,50 @@

_LOGGER = logging.getLogger(__name__)

from ._models import DeserializedEvent
from ._models import CloudEvent, EventGridEvent

class EventGridConsumer(object):
"""
A consumer responsible for deserializing event handler messages, to allow for access to strongly typed Event objects.
"""
def decode_cloud_event(self, cloud_event, **kwargs):
# type: (Union[str, dict, bytes], Any) -> CloudEvent
"""Single event following CloudEvent schema will be parsed and returned as Deserialized Event.
:param cloud_event: The event to be deserialized.
:type cloud_event: Union[str, dict, bytes]
:rtype: CloudEvent
def deserialize_event(self, event, **kwargs):
# type: (Union[str, dict, bytes]) -> models.DeserializedEvent
"""Single event following CloudEvent/EventGridEvent schema will be parsed and returned as DeserializedEvent.
:param event: The event to be deserialized.
:type event: Union[str, dict, bytes]
:keyword str encoding: The encoding that should be used. Defaults to 'utf-8'
:rtype: models.DeserializedEvent
:raises: :class:`ValueError`, when events do not follow CloudEvent or EventGridEvent schema.
:raise: :class:`ValueError`, when events do not follow CloudEvent schema.
"""
encode = kwargs.pop('encoding', 'utf-8')
try:
if isinstance(event, six.binary_type):
event = json.loads(event.decode(encode))
elif isinstance(event, six.string_types):
event = json.loads(event)
return DeserializedEvent(event)
cloud_event = CloudEvent._from_json(cloud_event, encode)
deserialized_event = CloudEvent.deserialize(cloud_event)
CloudEvent._deserialize_data(deserialized_event, deserialized_event.type)
return deserialized_event
except Exception as err:
_LOGGER.error('Error: cannot deserialize event. Event does not have a valid format. Event must be a string, dict, or bytes following the CloudEvent/EventGridEvent schema.')
_LOGGER.error('Your event: {}'.format(event))
_LOGGER.error('Error: cannot deserialize event. Event does not have a valid format. Event must be a string, dict, or bytes following the CloudEvent schema.')
_LOGGER.error('Your event: {}'.format(cloud_event))
_LOGGER.error(err)
raise ValueError('Error: cannot deserialize event. Event does not have a valid format. Event must be a string, dict, or bytes following the CloudEvent/EventGridEvent schema.')
raise ValueError('Error: cannot deserialize event. Event does not have a valid format. Event must be a string, dict, or bytes following the CloudEvent schema.')

def decode_eventgrid_event(self, eventgrid_event, **kwargs):
# type: (Union[str, dict, bytes], Any) -> EventGridEvent
"""Single event following EventGridEvent schema will be parsed and returned as Deserialized Event.
:param eventgrid_event: The event to be deserialized.
:type eventgrid_event: Union[str, dict, bytes]
:rtype: EventGridEvent
:raise: :class:`ValueError`, when events do not follow EventGridEvent schema.
"""
encode = kwargs.pop('encoding', 'utf-8')
try:
eventgrid_event = EventGridEvent._from_json(eventgrid_event, encode)
deserialized_event = EventGridEvent.deserialize(eventgrid_event)
EventGridEvent._deserialize_data(deserialized_event, deserialized_event.event_type)
return deserialized_event
except Exception as err:
_LOGGER.error('Error: cannot deserialize event. Event does not have a valid format. Event must be a string, dict, or bytes following the CloudEvent schema.')
_LOGGER.error('Your event: {}'.format(eventgrid_event))
_LOGGER.error(err)
raise ValueError('Error: cannot deserialize event. Event does not have a valid format. Event must be a string, dict, or bytes following the CloudEvent schema.')
1 change: 1 addition & 0 deletions sdk/eventgrid/azure-eventgrid/azure/eventgrid/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from ._shared_access_signature_credential import EventGridSharedAccessSignatureCredential
from ._signature_credential_policy import EventGridSharedAccessSignatureCredentialPolicy
from . import _constants as constants
from ._event_mappings import _event_mappings

def generate_shared_access_signature(topic_hostname, shared_access_key, expiration_date_utc, **kwargs):
# type: (str, str, datetime.Datetime, Any) -> str
Expand Down
95 changes: 37 additions & 58 deletions sdk/eventgrid/azure-eventgrid/azure/eventgrid/_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,49 @@
import datetime as dt
import uuid
import json
import six
from ._generated import models
from ._generated.models import StorageBlobCreatedEventData, \
EventGridEvent as InternalEventGridEvent, \
CloudEvent as InternalCloudEvent
from ._shared.mixins import DictMixin
from ._event_mappings import _event_mappings

class CloudEvent(InternalCloudEvent): #pylint:disable=too-many-instance-attributes

class EventMixin(object):
"""
Mixin for the event models comprising of some helper methods.
"""
@staticmethod
def _deserialize_data(event, event_type):
"""
Sets the data of the desrialized event to strongly typed event object if event type exists in _event_mappings.
Otherwise, sets it to None.
:param str event_type: The event_type of the EventGridEvent object or the type of the CloudEvent object.
"""
# if system event type defined, set event.data to system event object
try:
event.data = (_event_mappings[event_type]).deserialize(event.data)
except KeyError: # else, if custom event, then event.data is dict and should be set to None
event.data = None

@staticmethod
def _from_json(event, encode):
"""
Load the event into the json
:param dict eventgrid_event: The event to be deserialized.
:type eventgrid_event: Union[str, dict, bytes]
:param str encode: The encoding to be used. Defaults to 'utf-8'
"""
if isinstance(event, six.binary_type):
event = json.loads(event.decode(encode))
elif isinstance(event, six.string_types):
event = json.loads(event)
return event


class CloudEvent(InternalCloudEvent, EventMixin): #pylint:disable=too-many-instance-attributes
"""Properties of an event published to an Event Grid topic using the CloudEvent 1.0 Schema.
All required parameters must be populated in order to send to Azure.
Expand Down Expand Up @@ -71,7 +106,7 @@ def __init__(self, source, type, **kwargs):
super(CloudEvent, self).__init__(**kwargs)


class EventGridEvent(InternalEventGridEvent):
class EventGridEvent(InternalEventGridEvent, EventMixin):
"""Properties of an event published to an Event Grid topic using the EventGrid Schema.
Variables are only populated by the server, and will be ignored when sending a request.
Expand Down Expand Up @@ -130,62 +165,6 @@ def __init__(self, subject, event_type, **kwargs):
super(EventGridEvent, self).__init__(**kwargs)


class DeserializedEvent():
"""The container for the deserialized event model and mapping of event envelope properties.
:param dict event: dict
"""

def __init__(self, event):
# type: (Any) -> None
self._model = None
self._event_dict = event

def to_json(self):
# type: () -> dict
return self._event_dict

@property
def model(self):
# type: () -> Union[CloudEvent, EventGridEvent]
"""
Returns strongly typed EventGridEvent/CloudEvent object defined by the format of the properties.
All properties of the model are strongly typed (ie. for an EventGridEvent, event_time property will return a datetime.datetime object).
model.data: Returns a system event type(StorageBlobCreated, StorageBlobDeleted, etc.). If model.type/model.event_type is not defined in the
system registry, returns None.
:raise: :class:`ValueError`, when events do not follow CloudEvent or EventGridEvent schema.
:rtype: Union[CloudEvent, EventGridEvent]
"""
if not self._model:
try:
if 'specversion' in self._event_dict.keys():
self._model = CloudEvent.deserialize(self._event_dict)
event_type = self._model.type
else:
self._model = EventGridEvent.deserialize(self._event_dict)
event_type = self._model.event_type
except:
raise ValueError("Event is not correctly formatted CloudEvent or EventGridEvent.")

self._deserialize_data(event_type)

return self._model

def _deserialize_data(self, event_type):
"""
Sets self._model.data to strongly typed event object if event type exists in _event_mappings.
Otherwise, sets self._model.data to None.
:param str event_type: The event_type of the EventGridEvent object or the type of the CloudEvent object.
"""
# if system event type defined, set model.data to system event object
try:
self._model.data = (_event_mappings[event_type]).deserialize(self._model.data)
except KeyError: # else, if custom event, then model.data is dict and should be set to None
self._model.data = None

class CustomEvent(DictMixin):
"""The wrapper class for a CustomEvent, to be used when publishing events.
:param dict args: dict
Expand Down
117 changes: 45 additions & 72 deletions sdk/eventgrid/azure-eventgrid/tests/test_eg_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,111 +32,84 @@
class EventGridConsumerTests(AzureMgmtTestCase):

# Cloud Event tests
@pytest.mark.liveTest
def test_eg_consumer_cloud_storage_dict(self, **kwargs):
client = EventGridConsumer()
deserialized_event = client.deserialize_event(cloud_storage_dict)
event_json = deserialized_event.to_json()
assert deserialized_event.model.__class__ == CloudEvent
assert deserialized_event.model.data.__class__ == StorageBlobCreatedEventData
assert event_json.__class__ == dict
deserialized_event = client.decode_cloud_event(cloud_storage_dict)
assert deserialized_event.__class__ == CloudEvent
assert deserialized_event.data.__class__ == StorageBlobCreatedEventData

@pytest.mark.liveTest
def test_eg_consumer_cloud_storage_string(self, **kwargs):
client = EventGridConsumer()
deserialized_event = client.deserialize_event(cloud_storage_string)
event_json = deserialized_event.to_json()
assert deserialized_event.model.__class__ == CloudEvent
assert deserialized_event.model.data.__class__ == StorageBlobCreatedEventData
assert event_json.__class__ == dict
deserialized_event = client.decode_cloud_event(cloud_storage_string)
assert deserialized_event.__class__ == CloudEvent
assert deserialized_event.data.__class__ == StorageBlobCreatedEventData

@pytest.mark.liveTest
def test_eg_consumer_cloud_storage_bytes(self, **kwargs):
client = EventGridConsumer()
deserialized_event = client.deserialize_event(cloud_storage_bytes)
event_json = deserialized_event.to_json()
assert deserialized_event.model.__class__ == CloudEvent
assert deserialized_event.model.data.__class__ == StorageBlobCreatedEventData
assert event_json.__class__ == dict
deserialized_event = client.decode_cloud_event(cloud_storage_bytes)
assert deserialized_event.__class__ == CloudEvent
assert deserialized_event.data.__class__ == StorageBlobCreatedEventData

@pytest.mark.liveTest

def test_eg_consumer_cloud_custom_dict(self, **kwargs):
client = EventGridConsumer()
deserialized_event = client.deserialize_event(cloud_custom_dict)
event_json = deserialized_event.to_json()
assert deserialized_event.model.__class__ == CloudEvent
assert deserialized_event.model.data is None
assert event_json.__class__ == dict
deserialized_event = client.decode_cloud_event(cloud_custom_dict)
assert deserialized_event.__class__ == CloudEvent
assert deserialized_event.data is None


@pytest.mark.liveTest
def test_eg_consumer_cloud_custom_string(self, **kwargs):
client = EventGridConsumer()
deserialized_event = client.deserialize_event(cloud_custom_string)
event_json = deserialized_event.to_json()
assert deserialized_event.model.__class__ == CloudEvent
assert deserialized_event.model.data is None
assert event_json.__class__ == dict
deserialized_event = client.decode_cloud_event(cloud_custom_string)
assert deserialized_event.__class__ == CloudEvent
assert deserialized_event.data is None


@pytest.mark.liveTest
def test_eg_consumer_cloud_custom_bytes(self, **kwargs):
client = EventGridConsumer()
deserialized_event = client.deserialize_event(cloud_custom_bytes)
event_json = deserialized_event.to_json()
assert deserialized_event.model.__class__ == CloudEvent
assert deserialized_event.model.data is None
assert event_json.__class__ == dict
deserialized_event = client.decode_cloud_event(cloud_custom_bytes)
assert deserialized_event.__class__ == CloudEvent
assert deserialized_event.data is None

# EG Event tests
@pytest.mark.liveTest

def test_eg_consumer_eg_storage_dict(self, **kwargs):
client = EventGridConsumer()
deserialized_event = client.deserialize_event(eg_storage_dict)
event_json = deserialized_event.to_json()
assert deserialized_event.model.__class__ == EventGridEvent
assert deserialized_event.model.data.__class__ == StorageBlobCreatedEventData
assert event_json.__class__ == dict
deserialized_event = client.decode_eventgrid_event(eg_storage_dict)
assert deserialized_event.__class__ == EventGridEvent
assert deserialized_event.data.__class__ == StorageBlobCreatedEventData


@pytest.mark.liveTest
def test_eg_consumer_eg_storage_string(self, **kwargs):
client = EventGridConsumer()
deserialized_event = client.deserialize_event(eg_storage_string)
event_json = deserialized_event.to_json()
assert deserialized_event.model.__class__ == EventGridEvent
assert deserialized_event.model.data.__class__ == StorageBlobCreatedEventData
assert event_json.__class__ == dict
deserialized_event = client.decode_eventgrid_event(eg_storage_string)
assert deserialized_event.__class__ == EventGridEvent
assert deserialized_event.data.__class__ == StorageBlobCreatedEventData


@pytest.mark.liveTest
def test_eg_consumer_eg_storage_bytes(self, **kwargs):
client = EventGridConsumer()
deserialized_event = client.deserialize_event(eg_storage_bytes)
event_json = deserialized_event.to_json()
assert deserialized_event.model.__class__ == EventGridEvent
assert deserialized_event.model.data.__class__ == StorageBlobCreatedEventData
assert event_json.__class__ == dict
deserialized_event = client.decode_eventgrid_event(eg_storage_bytes)
assert deserialized_event.__class__ == EventGridEvent
assert deserialized_event.data.__class__ == StorageBlobCreatedEventData

@pytest.mark.liveTest

def test_eg_consumer_eg_custom_dict(self, **kwargs):
client = EventGridConsumer()
deserialized_event = client.deserialize_event(eg_custom_dict)
event_json = deserialized_event.to_json()
assert deserialized_event.model.__class__ == EventGridEvent
assert deserialized_event.model.data is None
assert event_json.__class__ == dict
deserialized_event = client.decode_eventgrid_event(eg_custom_dict)
assert deserialized_event.__class__ == EventGridEvent
assert deserialized_event.data is None


@pytest.mark.liveTest
def test_eg_consumer_eg_custom_string(self, **kwargs):
client = EventGridConsumer()
deserialized_event = client.deserialize_event(eg_custom_string)
event_json = deserialized_event.to_json()
assert deserialized_event.model.__class__ == EventGridEvent
assert deserialized_event.model.data is None
assert event_json.__class__ == dict
deserialized_event = client.decode_eventgrid_event(eg_custom_string)
assert deserialized_event.__class__ == EventGridEvent
assert deserialized_event.data is None


@pytest.mark.liveTest
def test_eg_consumer_eg_custom_bytes(self, **kwargs):
client = EventGridConsumer()
deserialized_event = client.deserialize_event(eg_custom_bytes)
event_json = deserialized_event.to_json()
assert deserialized_event.model.__class__ == EventGridEvent
assert deserialized_event.model.data is None
assert event_json.__class__ == dict
deserialized_event = client.decode_eventgrid_event(eg_custom_bytes)
assert deserialized_event.__class__ == EventGridEvent
assert deserialized_event.data is None

0 comments on commit d32cfe4

Please sign in to comment.