Skip to content

Commit

Permalink
[EventHubs] Updated blob checkpointing API (#8721)
Browse files Browse the repository at this point in the history
* Started storage updates

* Updated async samples + tests

* Updated sync samples + tests

* Fix merge

* Reverted module name for now

* Updated baseclass

* Fix tests

* Fix pylint

* Update checkpoint

* Update checkpoint

* Some CI fixes

* Fixed changelog

* Fixed test

* Claim ownership fix

* Reverted version bump

* Feedback
  • Loading branch information
annatisch authored Nov 22, 2019
1 parent c8f46d4 commit a804add
Show file tree
Hide file tree
Showing 28 changed files with 612 additions and 473 deletions.
10 changes: 10 additions & 0 deletions sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Release History

## 2019-12-04 1.0.0b6

**Breaking changes**

- Renamed `BlobPartitionManager` to `BlobCheckpointStore`.
- Constructor of `BlobCheckpointStore` has been updated to take the storage container details directly rather than an instance of `ContainerClient`.
- A `from_connection_string` constructor has been added for Blob Storage connection strings.
- Module `blobstoragepmaio` is now internal, all imports should be directly from `azure.eventhub.extensions.checkpointstoreblobaio`.
- `BlobCheckpointStore` now has a `close()` function for shutting down an HTTP connection pool, additionally the object can be used in a context manager to manage the connection.

## 2019-11-04 1.0.0b5

**New features**
Expand Down
42 changes: 18 additions & 24 deletions sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ This Checkpoint Store package works as a plug-in package to `EventHubConsumerCli

Please note that this is an async library, for sync version of the Azure EventHubs Checkpoint Store client library, please refer to [azure-eventhubs-checkpointstoreblob](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob).

[Source code](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio) | [Package (PyPi)](https://pypi.org/project/azure-eventhub-checkpointstoreblob-aio/) | [API reference documentation](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.aio.html#azure.eventhub.aio.PartitionManager) | [Azure Eventhubs documentation](https://docs.microsoft.com/en-us/azure/event-hubs/) | [Azure Storage documentation](https://docs.microsoft.com/en-us/azure/storage/)
[Source code](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio) | [Package (PyPi)](https://pypi.org/project/azure-eventhub-checkpointstoreblob-aio/) | [API reference documentation](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b6/azure.eventhub.aio.html#azure.eventhub.aio.CheckpointStore) | [Azure Eventhubs documentation](https://docs.microsoft.com/en-us/azure/event-hubs/) | [Azure Storage documentation](https://docs.microsoft.com/en-us/azure/storage/)

## Getting started

Expand All @@ -24,6 +24,7 @@ $ pip install --pre azure-eventhub-checkpointstoreblob-aio

- **Azure Storage Account:** You'll need to have an Azure Storage Account and create a Azure Blob Storage Block Container to store the checkpoint data with blobs. You may follow the guide [creating an Azure Block Blob Storage Account](https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blob-create-account-block-blob).


## Key concepts

### Checkpointing
Expand All @@ -48,17 +49,8 @@ storing their own offset values outside of the Event Hubs service. Within a part
sequence number and the timestamp of when it was enqueued.

## Examples
- [Create an Azure Storage Blobs `ContainerClient`](#create-an-azure-storage-blobs-containerclient)
- [Create an Azure EventHubs `EventHubConsumerClient`](#create-an-eventhubconsumerclient)
- [Consume events using a `BlobPartitionManager`](#consume-events-using-a-blobpartitionmanager-to-do-checkpoint)

### Create an Azure Storage Blobs `ContainerClient`
The easiest way to create a `ContainerClient` is to use a connection string.
```python
from azure.storage.blob.aio import ContainerClient
container_client = ContainerClient.from_connection_string("my_storageacount_connection_string", "mycontainer")
```
For other ways of creating a `ContainerClient`, go to [Blob Storage library](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/storage/azure-storage-blob) for more details.
- [Consume events using a `BlobCheckpointStore`](#consume-events-using-a-blobcheckpointstore-to-do-checkpoint)

### Create an `EventHubConsumerClient`
The easiest way to create a `EventHubConsumerClient` is to use a connection string.
Expand All @@ -68,33 +60,35 @@ eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_nam
```
For other ways of creating a `EventHubConsumerClient`, refer to [EventHubs library](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs) for more details.

### Consume events using a `BlobPartitionManager` to do checkpoint
### Consume events using a `BlobCheckpointStore` to do checkpoint
```python
import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.storage.blob.aio import ContainerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobPartitionManager
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

eventhub_connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
storage_container_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
storage_container_name = '<< STORAGE CONTAINER NAME>>'

async def do_operation(events):
async def process_event(partition_context, event):
# do some operations to the events.
pass

async def process_events(partition_context, events):
await do_operation(events)
await partition_context.update_checkpoint(events[-1])
await partition_context.update_checkpoint(event)

async def main():
storage_container_client = ContainerClient.from_connection_string(storage_container_connection_str, storage_container_name)
partition_manager = BlobPartitionManager(storage_container_client) # use the BlobPartitonManager to save
client = EventHubConsumerClient.from_connection_string(eventhub_connection_str, partition_manager=partition_manager, receive_timeout=5, retry_total=3)
checkpoint_store = BlobCheckpointStore.from_connection_string(
storage_container_connection_str,
storage_container_name
)
client = EventHubConsumerClient.from_connection_string(
eventhub_connection_str,
checkpoint_store=checkpoint_store,
retry_total=3
)

try:
await client.receive(process_events, "$default")
await client.receive(process_event, "$default")
except KeyboardInterrupt:
await client.close()

Expand All @@ -116,7 +110,7 @@ Refer to [Logging](#logging) to enable loggers for related libraries.

### Documentation

Reference documentation is available [here](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.aio.html#azure.eventhub.aio.PartitionManager)
Reference documentation is available [here](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b6/azure.eventhub.aio.html#azure.eventhub.aio.CheckpointStore)

### Logging

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from ._version import VERSION
__version__ = VERSION

from .blobstoragecsaio import BlobCheckpointStore
from ._blobstoragecsaio import BlobCheckpointStore

__all__ = [
"BlobCheckpointStore",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import Iterable, Dict, Any, Optional
import logging
from collections import defaultdict
import asyncio
from azure.eventhub import OwnershipLostError # type: ignore #pylint:disable=no-name-in-module
from azure.eventhub.aio import CheckpointStore # type: ignore
from azure.core.exceptions import ResourceModifiedError, ResourceExistsError # type: ignore
from azure.storage.blob.aio import ContainerClient, BlobClient # type: ignore

logger = logging.getLogger(__name__)
UPLOAD_DATA = ""


class BlobCheckpointStore(CheckpointStore):
"""A CheckpointStore that uses Azure Blob Storage to store the partition ownership and checkpoint data.
This class implements methods list_ownership, claim_ownership, update_checkpoint and list_checkpoints that are
defined in class azure.eventhub.aio.CheckpointStore of package azure-eventhub.
:param str blob_account_url:
The URI to the storage account.
:param container_name:
The name of the container for the blobs.
:type container_name: str
:param credential:
The credentials with which to authenticate. This is optional if the
account URL already has a SAS token. The value can be a SAS token string, an account
shared access key, or an instance of a TokenCredentials class from azure.identity.
If the URL already has a SAS token, specifying an explicit credential will take priority.
"""
def __init__(self, blob_account_url, container_name, *, credential=None, **kwargs):
# type(str, str, Optional[Any], Any) -> None
container_client = kwargs.pop('container_client', None)
self._container_client = container_client or ContainerClient(
blob_account_url, container_name, credential=credential, **kwargs
)
self._cached_blob_clients = defaultdict() # type: Dict[str, BlobClient]

@classmethod
def from_connection_string(cls, conn_str, container_name, *, credential=None, **kwargs):
# type: (str, str, Optional[Any], str) -> BlobCheckpointStore
"""Create BlobCheckpointStore from a storage connection string.
:param str conn_str:
A connection string to an Azure Storage account.
:param container_name:
The container name for the blobs.
:type container_name: str
:param credential:
The credentials with which to authenticate. This is optional if the
account URL already has a SAS token, or the connection string already has shared
access key values. The value can be a SAS token string, an account shared access
key, or an instance of a TokenCredentials class from azure.identity.
Credentials provided here will take precedence over those in the connection string.
"""
container_client = ContainerClient.from_connection_string(
conn_str,
container_name,
credential=credential,
**kwargs
)
return cls(None, None, container_client=container_client)

async def __aenter__(self):
await self._container_client.__aenter__()
return self

async def __aexit__(self, *args):
await self._container_client.__aexit__(*args)

def _get_blob_client(self, blob_name: str) -> BlobClient:
result = self._cached_blob_clients.get(blob_name)
if not result:
result = self._container_client.get_blob_client(blob_name)
self._cached_blob_clients[blob_name] = result
return result

async def _upload_ownership(self, ownership: Dict[str, Any], metadata: Dict[str, str]) -> None:
etag = ownership.get("etag")
if etag:
etag_match = {"if_match": etag}
else:
etag_match = {"if_none_match": '*'}
blob_name = "{}/{}/{}/ownership/{}".format(
ownership["fully_qualified_namespace"],
ownership["eventhub_name"],
ownership["consumer_group"],
ownership["partition_id"])
blob_name = blob_name.lower()
uploaded_blob_properties = await self._get_blob_client(blob_name).upload_blob(
data=UPLOAD_DATA,
overwrite=True,
metadata=metadata,
**etag_match
)
ownership["etag"] = uploaded_blob_properties["etag"]
ownership["last_modified_time"] = uploaded_blob_properties["last_modified"].timestamp()
ownership.update(metadata)

async def list_ownership(self, fully_qualified_namespace: str, eventhub_name: str, consumer_group: str) \
-> Iterable[Dict[str, Any]]:
try:
blob_prefix = "{}/{}/{}/ownership".format(
fully_qualified_namespace,
eventhub_name,
consumer_group)
blobs = self._container_client.list_blobs(
name_starts_with=blob_prefix.lower(),
include=['metadata'])
result = []
async for blob in blobs:
ownership = {
"fully_qualified_namespace": fully_qualified_namespace,
"eventhub_name": eventhub_name,
"consumer_group": consumer_group,
"partition_id": blob.name.split("/")[-1],
"owner_id": blob.metadata["ownerid"],
"etag": blob.etag,
"last_modified_time": blob.last_modified.timestamp() if blob.last_modified else None
}
result.append(ownership)
return result
except Exception as error: # pylint:disable=broad-except
logger.warning(
"An exception occurred during list_ownership for "
"namespace %r eventhub %r consumer group %r. "
"Exception is %r", fully_qualified_namespace, eventhub_name, consumer_group, error
)
raise

async def _claim_one_partition(self, ownership: Dict[str, Any]) -> Dict[str, Any]:
partition_id = ownership["partition_id"]
namespace = ownership["fully_qualified_namespace"]
eventhub_name = ownership["eventhub_name"]
consumer_group = ownership["consumer_group"]
owner_id = ownership["owner_id"]
metadata = {"ownerid": owner_id}
try:
await self._upload_ownership(ownership, metadata)
return ownership
except (ResourceModifiedError, ResourceExistsError):
logger.info(
"EventProcessor instance %r of namespace %r eventhub %r consumer group %r "
"lost ownership to partition %r",
owner_id, namespace, eventhub_name, consumer_group, partition_id
)
raise OwnershipLostError()
except Exception as error: # pylint:disable=broad-except
logger.warning(
"An exception occurred when EventProcessor instance %r claim_ownership for "
"namespace %r eventhub %r consumer group %r partition %r. "
"The ownership is now lost. Exception "
"is %r", owner_id, namespace, eventhub_name, consumer_group, partition_id, error
)
return ownership # Keep the ownership if an unexpected error happens

async def claim_ownership(self, ownership_list: Iterable[Dict[str, Any]]) -> Iterable[Dict[str, Any]]:
results = await asyncio.gather(
*[self._claim_one_partition(x) for x in ownership_list],
return_exceptions=True
)
return [ownership for ownership in results if not isinstance(ownership, Exception)]

async def update_checkpoint(self, checkpoint: Dict[str, Any]) -> None:
metadata = {
"offset": checkpoint['offset'],
"sequencenumber": str(checkpoint['sequence_number']),
}
blob_name = "{}/{}/{}/checkpoint/{}".format(
checkpoint['fully_qualified_namespace'],
checkpoint['eventhub_name'],
checkpoint['consumer_group'],
checkpoint['partition_id'])
blob_name = blob_name.lower()
await self._get_blob_client(blob_name).upload_blob(
data=UPLOAD_DATA,
overwrite=True,
metadata=metadata
)

async def list_checkpoints(self, fully_qualified_namespace, eventhub_name, consumer_group):
blob_prefix = "{}/{}/{}/checkpoint".format(
fully_qualified_namespace,
eventhub_name,
consumer_group)
blobs = self._container_client.list_blobs(
name_starts_with=blob_prefix.lower(),
include=['metadata'])
result = []
async for blob in blobs:
metadata = blob.metadata
checkpoint = {
"fully_qualified_namespace": fully_qualified_namespace,
"eventhub_name": eventhub_name,
"consumer_group": consumer_group,
"partition_id": blob.name.split("/")[-1],
"offset": metadata["offset"],
"sequence_number": metadata["sequencenumber"]
}
result.append(checkpoint)
return result

async def close(self) -> None:
"""Close an open HTTP session and connection."""
return await self.__aexit__()
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
# Licensed under the MIT License.
# ------------------------------------

VERSION = "1.0.0b5"
VERSION = "1.0.0b6"
Loading

0 comments on commit a804add

Please sign in to comment.