Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EventHubs] Updated blob checkpointing API #8721

Merged
merged 18 commits into from
Nov 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Release History

## 2019-12-04 1.0.0b6

**Breaking changes**

- Renamed `BlobPartitionManager` to `BlobCheckpointStore`.
- Constructor of `BlobCheckpointStore` has been updated to take the storage container details directly rather than an instance of `ContainerClient`.
- A `from_connection_string` constructor has been added for Blob Storage connection strings.
- Module `blobstoragepmaio` is now internal, all imports should be directly from `azure.eventhub.extensions.checkpointstoreblobaio`.
- `BlobCheckpointStore` now has a `close()` function for shutting down an HTTP connection pool, additionally the object can be used in a context manager to manage the connection.

## 2019-11-04 1.0.0b5

**New features**
Expand Down
42 changes: 18 additions & 24 deletions sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ This Checkpoint Store package works as a plug-in package to `EventHubConsumerCli

Please note that this is an async library, for sync version of the Azure EventHubs Checkpoint Store client library, please refer to [azure-eventhubs-checkpointstoreblob](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob).

[Source code](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio) | [Package (PyPi)](https://pypi.org/project/azure-eventhub-checkpointstoreblob-aio/) | [API reference documentation](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.aio.html#azure.eventhub.aio.PartitionManager) | [Azure Eventhubs documentation](https://docs.microsoft.com/en-us/azure/event-hubs/) | [Azure Storage documentation](https://docs.microsoft.com/en-us/azure/storage/)
[Source code](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio) | [Package (PyPi)](https://pypi.org/project/azure-eventhub-checkpointstoreblob-aio/) | [API reference documentation](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b6/azure.eventhub.aio.html#azure.eventhub.aio.CheckpointStore) | [Azure Eventhubs documentation](https://docs.microsoft.com/en-us/azure/event-hubs/) | [Azure Storage documentation](https://docs.microsoft.com/en-us/azure/storage/)

## Getting started

Expand All @@ -24,6 +24,7 @@ $ pip install --pre azure-eventhub-checkpointstoreblob-aio

- **Azure Storage Account:** You'll need to have an Azure Storage Account and create a Azure Blob Storage Block Container to store the checkpoint data with blobs. You may follow the guide [creating an Azure Block Blob Storage Account](https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blob-create-account-block-blob).


## Key concepts

### Checkpointing
Expand All @@ -48,17 +49,8 @@ storing their own offset values outside of the Event Hubs service. Within a part
sequence number and the timestamp of when it was enqueued.

## Examples
- [Create an Azure Storage Blobs `ContainerClient`](#create-an-azure-storage-blobs-containerclient)
- [Create an Azure EventHubs `EventHubConsumerClient`](#create-an-eventhubconsumerclient)
- [Consume events using a `BlobPartitionManager`](#consume-events-using-a-blobpartitionmanager-to-do-checkpoint)

### Create an Azure Storage Blobs `ContainerClient`
The easiest way to create a `ContainerClient` is to use a connection string.
```python
from azure.storage.blob.aio import ContainerClient
container_client = ContainerClient.from_connection_string("my_storageacount_connection_string", "mycontainer")
```
For other ways of creating a `ContainerClient`, go to [Blob Storage library](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/storage/azure-storage-blob) for more details.
- [Consume events using a `BlobCheckpointStore`](#consume-events-using-a-blobcheckpointstore-to-do-checkpoint)

### Create an `EventHubConsumerClient`
The easiest way to create a `EventHubConsumerClient` is to use a connection string.
Expand All @@ -68,33 +60,35 @@ eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_nam
```
For other ways of creating a `EventHubConsumerClient`, refer to [EventHubs library](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs) for more details.

### Consume events using a `BlobPartitionManager` to do checkpoint
### Consume events using a `BlobCheckpointStore` to do checkpoint
```python
import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.storage.blob.aio import ContainerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobPartitionManager
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

eventhub_connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
storage_container_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
storage_container_name = '<< STORAGE CONTAINER NAME>>'

async def do_operation(events):
async def process_event(partition_context, event):
# do some operations to the events.
pass

async def process_events(partition_context, events):
await do_operation(events)
await partition_context.update_checkpoint(events[-1])
await partition_context.update_checkpoint(event)

async def main():
storage_container_client = ContainerClient.from_connection_string(storage_container_connection_str, storage_container_name)
partition_manager = BlobPartitionManager(storage_container_client) # use the BlobPartitonManager to save
client = EventHubConsumerClient.from_connection_string(eventhub_connection_str, partition_manager=partition_manager, receive_timeout=5, retry_total=3)
checkpoint_store = BlobCheckpointStore.from_connection_string(
storage_container_connection_str,
storage_container_name
)
client = EventHubConsumerClient.from_connection_string(
eventhub_connection_str,
checkpoint_store=checkpoint_store,
retry_total=3
)

try:
await client.receive(process_events, "$default")
await client.receive(process_event, "$default")
except KeyboardInterrupt:
await client.close()

Expand All @@ -116,7 +110,7 @@ Refer to [Logging](#logging) to enable loggers for related libraries.

### Documentation

Reference documentation is available [here](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b5/azure.eventhub.aio.html#azure.eventhub.aio.PartitionManager)
Reference documentation is available [here](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/5.0.0b6/azure.eventhub.aio.html#azure.eventhub.aio.CheckpointStore)

### Logging

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from ._version import VERSION
__version__ = VERSION

from .blobstoragecsaio import BlobCheckpointStore
from ._blobstoragecsaio import BlobCheckpointStore

__all__ = [
"BlobCheckpointStore",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import Iterable, Dict, Any, Optional
import logging
from collections import defaultdict
import asyncio
from azure.eventhub import OwnershipLostError # type: ignore #pylint:disable=no-name-in-module
from azure.eventhub.aio import CheckpointStore # type: ignore
from azure.core.exceptions import ResourceModifiedError, ResourceExistsError # type: ignore
from azure.storage.blob.aio import ContainerClient, BlobClient # type: ignore

logger = logging.getLogger(__name__)
UPLOAD_DATA = ""


class BlobCheckpointStore(CheckpointStore):
"""A CheckpointStore that uses Azure Blob Storage to store the partition ownership and checkpoint data.

This class implements methods list_ownership, claim_ownership, update_checkpoint and list_checkpoints that are
defined in class azure.eventhub.aio.CheckpointStore of package azure-eventhub.

:param str blob_account_url:
The URI to the storage account.
:param container_name:
The name of the container for the blobs.
:type container_name: str
:param credential:
The credentials with which to authenticate. This is optional if the
account URL already has a SAS token. The value can be a SAS token string, an account
shared access key, or an instance of a TokenCredentials class from azure.identity.
If the URL already has a SAS token, specifying an explicit credential will take priority.
"""
def __init__(self, blob_account_url, container_name, *, credential=None, **kwargs):
# type(str, str, Optional[Any], Any) -> None
container_client = kwargs.pop('container_client', None)
self._container_client = container_client or ContainerClient(
blob_account_url, container_name, credential=credential, **kwargs
)
self._cached_blob_clients = defaultdict() # type: Dict[str, BlobClient]

@classmethod
def from_connection_string(cls, conn_str, container_name, *, credential=None, **kwargs):
# type: (str, str, Optional[Any], str) -> BlobCheckpointStore
"""Create BlobCheckpointStore from a storage connection string.

:param str conn_str:
A connection string to an Azure Storage account.
:param container_name:
The container name for the blobs.
:type container_name: str
:param credential:
The credentials with which to authenticate. This is optional if the
account URL already has a SAS token, or the connection string already has shared
access key values. The value can be a SAS token string, an account shared access
key, or an instance of a TokenCredentials class from azure.identity.
Credentials provided here will take precedence over those in the connection string.
"""
container_client = ContainerClient.from_connection_string(
conn_str,
container_name,
credential=credential,
**kwargs
)
return cls(None, None, container_client=container_client)

async def __aenter__(self):
await self._container_client.__aenter__()
return self

async def __aexit__(self, *args):
await self._container_client.__aexit__(*args)

def _get_blob_client(self, blob_name: str) -> BlobClient:
result = self._cached_blob_clients.get(blob_name)
if not result:
result = self._container_client.get_blob_client(blob_name)
self._cached_blob_clients[blob_name] = result
return result

async def _upload_ownership(self, ownership: Dict[str, Any], metadata: Dict[str, str]) -> None:
etag = ownership.get("etag")
if etag:
etag_match = {"if_match": etag}
else:
etag_match = {"if_none_match": '*'}
blob_name = "{}/{}/{}/ownership/{}".format(
ownership["fully_qualified_namespace"],
ownership["eventhub_name"],
ownership["consumer_group"],
ownership["partition_id"])
blob_name = blob_name.lower()
uploaded_blob_properties = await self._get_blob_client(blob_name).upload_blob(
data=UPLOAD_DATA,
overwrite=True,
metadata=metadata,
**etag_match
)
ownership["etag"] = uploaded_blob_properties["etag"]
ownership["last_modified_time"] = uploaded_blob_properties["last_modified"].timestamp()
ownership.update(metadata)

async def list_ownership(self, fully_qualified_namespace: str, eventhub_name: str, consumer_group: str) \
-> Iterable[Dict[str, Any]]:
try:
blob_prefix = "{}/{}/{}/ownership".format(
fully_qualified_namespace,
eventhub_name,
consumer_group)
blobs = self._container_client.list_blobs(
name_starts_with=blob_prefix.lower(),
include=['metadata'])
result = []
async for blob in blobs:
ownership = {
"fully_qualified_namespace": fully_qualified_namespace,
"eventhub_name": eventhub_name,
"consumer_group": consumer_group,
"partition_id": blob.name.split("/")[-1],
"owner_id": blob.metadata["ownerid"],
"etag": blob.etag,
"last_modified_time": blob.last_modified.timestamp() if blob.last_modified else None
}
result.append(ownership)
return result
except Exception as error: # pylint:disable=broad-except
logger.warning(
"An exception occurred during list_ownership for "
"namespace %r eventhub %r consumer group %r. "
"Exception is %r", fully_qualified_namespace, eventhub_name, consumer_group, error
)
raise

async def _claim_one_partition(self, ownership: Dict[str, Any]) -> Dict[str, Any]:
partition_id = ownership["partition_id"]
namespace = ownership["fully_qualified_namespace"]
eventhub_name = ownership["eventhub_name"]
consumer_group = ownership["consumer_group"]
owner_id = ownership["owner_id"]
metadata = {"ownerid": owner_id}
try:
await self._upload_ownership(ownership, metadata)
return ownership
except (ResourceModifiedError, ResourceExistsError):
logger.info(
"EventProcessor instance %r of namespace %r eventhub %r consumer group %r "
"lost ownership to partition %r",
owner_id, namespace, eventhub_name, consumer_group, partition_id
)
raise OwnershipLostError()
except Exception as error: # pylint:disable=broad-except
logger.warning(
"An exception occurred when EventProcessor instance %r claim_ownership for "
"namespace %r eventhub %r consumer group %r partition %r. "
"The ownership is now lost. Exception "
"is %r", owner_id, namespace, eventhub_name, consumer_group, partition_id, error
)
return ownership # Keep the ownership if an unexpected error happens

async def claim_ownership(self, ownership_list: Iterable[Dict[str, Any]]) -> Iterable[Dict[str, Any]]:
results = await asyncio.gather(
*[self._claim_one_partition(x) for x in ownership_list],
return_exceptions=True
)
return [ownership for ownership in results if not isinstance(ownership, Exception)]

async def update_checkpoint(self, checkpoint: Dict[str, Any]) -> None:
metadata = {
"offset": checkpoint['offset'],
"sequencenumber": str(checkpoint['sequence_number']),
}
blob_name = "{}/{}/{}/checkpoint/{}".format(
checkpoint['fully_qualified_namespace'],
checkpoint['eventhub_name'],
checkpoint['consumer_group'],
checkpoint['partition_id'])
blob_name = blob_name.lower()
await self._get_blob_client(blob_name).upload_blob(
data=UPLOAD_DATA,
overwrite=True,
metadata=metadata
)

async def list_checkpoints(self, fully_qualified_namespace, eventhub_name, consumer_group):
blob_prefix = "{}/{}/{}/checkpoint".format(
fully_qualified_namespace,
eventhub_name,
consumer_group)
blobs = self._container_client.list_blobs(
name_starts_with=blob_prefix.lower(),
include=['metadata'])
result = []
async for blob in blobs:
metadata = blob.metadata
checkpoint = {
"fully_qualified_namespace": fully_qualified_namespace,
"eventhub_name": eventhub_name,
"consumer_group": consumer_group,
"partition_id": blob.name.split("/")[-1],
"offset": metadata["offset"],
"sequence_number": metadata["sequencenumber"]
}
result.append(checkpoint)
return result

async def close(self) -> None:
"""Close an open HTTP session and connection."""
return await self.__aexit__()
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
# Licensed under the MIT License.
# ------------------------------------

VERSION = "1.0.0b5"
VERSION = "1.0.0b6"
Loading