Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs] Port azure storage blob code into checkpointstore #9950

Merged
merged 36 commits into from
Mar 6, 2020
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
cb13bd8
Increment version
Feb 14, 2020
38e9f65
Update Development Status
Feb 14, 2020
aa30bc0
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Feb 15, 2020
c4710ea
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Feb 20, 2020
1e2d6a6
Vendor storage blob code into checkpointstore
Feb 21, 2020
c0db2d4
Disable eventhub checkpointstore mypy check because storage code isn'…
Feb 21, 2020
053f073
Merge branch 'master' of github.com:Azure/azure-sdk-for-python
Feb 24, 2020
fb61190
Add storage aio stuffs to keep storage the same as the sync version
Feb 25, 2020
7a7e36f
change import azure.storage to relative path
Feb 25, 2020
09b381f
Comment setup dependency for vendored storage blob
Feb 25, 2020
6f34562
Remove storage and added msrest to checkpointstoreblob in shared requ…
Feb 25, 2020
ebad9a2
Add extra require to vendored storage
Feb 25, 2020
0260763
override azure-core for checkpointstoreblob
Feb 25, 2020
e60e3cd
Merge branch 'master' into eventhubs_vendor_strg
Feb 25, 2020
d6bcb6e
override azure-core for checkpointstoreblob
Feb 25, 2020
b72463d
Update change log
Feb 27, 2020
856e829
Update README
Feb 27, 2020
2043ea8
Update README for different version of storage service api
Mar 2, 2020
cfb9939
Update sample for different version of storage service api
Mar 2, 2020
c776c5f
Update docstring
Mar 3, 2020
83ce82c
Restore to storage original version
Mar 4, 2020
8eb3fe1
Remove api_version in docstring
Mar 4, 2020
7f59a05
Add x-ms-version in sample code
Mar 4, 2020
8a1b7d8
Update Change Log
Mar 4, 2020
7e69ba8
use x-ms-version instead of api_version
Mar 4, 2020
da4a1b2
Update README back to use api_version
Mar 4, 2020
2f21f2a
Add api_version param
Mar 4, 2020
b456078
Update api_version to change log
Mar 4, 2020
c9f9f86
Remove storage version
Mar 4, 2020
08cccd5
Add storage api version sample
Mar 4, 2020
f15bb72
small grammar fix
Mar 4, 2020
fbe82e6
Fixes from code review
Mar 5, 2020
a50a002
Fixes from code review
Mar 5, 2020
76e26c3
Correct BlobPartitionManager to BlobCheckpointStore
Mar 5, 2020
ab1d4ab
put headers into kwargs
Mar 5, 2020
defe600
Remove "Azure Stack" from readme
Mar 5, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions eng/tox/mypy_hard_failure_packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,5 @@
MYPY_HARD_FAILURE_OPTED = [
"azure-core",
"azure-eventhub",
"azure-eventhub-checkpointstoreblob",
"azure-eventhub-checkpointstoreblob-aio",
"azure-ai-textanalytics"
]
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## 1.0.1 (Unreleased)

**New features**
- Param `api_version` of `BlobCheckpointStore` now supports older versions of Azure Storage Service API.

## 1.0.0 (2020-01-13)
Stable release. No new features or API changes.

Expand Down
16 changes: 15 additions & 1 deletion sdk/eventhub/azure-eventhub-checkpointstoreblob-aio/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,20 @@ if __name__ == '__main__':
loop.run_until_complete(main())
```

#### Use `BlobCheckpointStore` with a different version of Azure Storage Service API
Some environments have different versions of Azure Storage Service API. For instance, Azure Stack may use 2017-11-09.
`BlobCheckpointStore` by default uses the Storage Service API version 2019-07-07. To use it against a different
version, specify that version number when you create the `BlobCheckpointStore` object.

```python
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
checkpoint_store = BlobCheckpointStore.from_connection_string(
storage_connection_str,
container_name,
api_version="2017-11-09"
)
```

## Troubleshooting

### General
Expand All @@ -116,7 +130,7 @@ Reference documentation is available [here](https://azuresdkdocs.blob.core.windo

- Enable `azure.eventhub.extensions.checkpointstoreblobaio` logger to collect traces from the library.
- Enable `azure.eventhub` logger to collect traces from the main azure-eventhub library.
- Enable `azure.storage.blob` logger to collect traces from azure storage blob library.
- Enable `azure.eventhub.extensions.checkpointstoreblobaio._vendor.storage` logger to collect traces from azure storage blob library.
- Enable `uamqp` logger to collect traces from the underlying uAMQP library.
- Enable AMQP frame level trace by setting `logging_enable=True` when creating the client.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
from azure.eventhub.exceptions import OwnershipLostError # type: ignore
from azure.eventhub.aio import CheckpointStore # type: ignore # pylint: disable=no-name-in-module
from azure.core.exceptions import ResourceModifiedError, ResourceExistsError # type: ignore
from azure.storage.blob.aio import ContainerClient, BlobClient # type: ignore
from ._vendor.storage.blob.aio import ContainerClient, BlobClient
from ._vendor.storage.blob._shared.base_client import parse_connection_str


logger = logging.getLogger(__name__)
UPLOAD_DATA = ""
Expand All @@ -31,21 +33,33 @@ class BlobCheckpointStore(CheckpointStore):
account URL already has a SAS token. The value can be a SAS token string, an account
shared access key, or an instance of a TokenCredentials class from azure.identity.
If the URL already has a SAS token, specifying an explicit credential will take priority.
:keyword str api_version:
The Storage API version to use for requests. Default value is '2019-07-07'.
:keyword str secondary_hostname:
The hostname of the secondary endpoint.
"""

def __init__(self, blob_account_url, container_name, *, credential=None, **kwargs):
# type(str, str, Optional[Any], Any) -> None
container_client = kwargs.pop("container_client", None)
self._container_client = container_client or ContainerClient(
blob_account_url, container_name, credential=credential, **kwargs
)
self._container_client = kwargs.pop("container_client", None)
if not self._container_client:
api_version = kwargs.pop("api_version", None)
if api_version:
headers = kwargs.get("headers")
if headers:
headers["x-ms-version"] = api_version
else:
kwargs["headers"] = {"x-ms-version": api_version}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could simplify:

headers = kwargs.get("headers") or {}
headers["x-ms-version"] = api_version

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to this

self._container_client = ContainerClient(
blob_account_url, container_name, credential=credential, **kwargs
)
self._cached_blob_clients = defaultdict() # type: Dict[str, BlobClient]

@classmethod
def from_connection_string(
cls, conn_str, container_name, *, credential=None, **kwargs
):
# type: (str, str, Optional[Any], str) -> BlobCheckpointStore
# type: (str, str, Any, Optional[Any], Any) -> BlobCheckpointStore
"""Create BlobCheckpointStore from a storage connection string.

:param str conn_str:
Expand All @@ -59,11 +73,18 @@ def from_connection_string(
access key values. The value can be a SAS token string, an account shared access
key, or an instance of a TokenCredentials class from azure.identity.
Credentials provided here will take precedence over those in the connection string.
:keyword str api_version:
YijunXieMS marked this conversation as resolved.
Show resolved Hide resolved
The Storage API version to use for requests. Default value is '2019-07-07'.
:keyword str secondary_hostname:
The hostname of the secondary endpoint.
:returns: A blob checkpoint store.
:rtype: ~azure.eventhub.extensions.checkpointstoreblobaio.BlobCheckpointStore
"""
container_client = ContainerClient.from_connection_string(
conn_str, container_name, credential=credential, **kwargs
)
return cls(None, None, container_client=container_client)
account_url, secondary, credential = parse_connection_str(conn_str, credential, 'blob')
if 'secondary_hostname' not in kwargs:
kwargs['secondary_hostname'] = secondary

return cls(account_url, container_name, credential=credential, **kwargs)

async def __aenter__(self):
await self._container_client.__aenter__()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# -----------------------------------------------------------------------------------
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__path__ = __import__('pkgutil').extend_path(__path__, __name__) # type: str
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------

import os

from typing import Union, Iterable, AnyStr, IO, Any, Dict # pylint: disable=unused-import
from ._version import VERSION
from ._blob_client import BlobClient
from ._container_client import ContainerClient
from ._blob_service_client import BlobServiceClient
from ._lease import BlobLeaseClient
from ._download import StorageStreamDownloader
from ._shared_access_signature import generate_account_sas, generate_container_sas, generate_blob_sas
from ._shared.policies import ExponentialRetry, LinearRetry
from ._shared.response_handlers import PartialBatchErrorException
from ._shared.models import(
LocationMode,
ResourceTypes,
AccountSasPermissions,
StorageErrorCode,
UserDelegationKey
)
from ._generated.models import (
RehydratePriority
)
from ._models import (
BlobType,
BlockState,
StandardBlobTier,
PremiumPageBlobTier,
SequenceNumberAction,
PublicAccess,
BlobAnalyticsLogging,
Metrics,
RetentionPolicy,
StaticWebsite,
CorsRule,
ContainerProperties,
BlobProperties,
LeaseProperties,
ContentSettings,
CopyProperties,
BlobBlock,
PageRange,
AccessPolicy,
ContainerSasPermissions,
BlobSasPermissions,
CustomerProvidedEncryptionKey,
ContainerEncryptionScope
)

__version__ = VERSION


def upload_blob_to_url(
blob_url, # type: str
data, # type: Union[Iterable[AnyStr], IO[AnyStr]]
credential=None, # type: Any
**kwargs):
# type: (...) -> Dict[str, Any]
"""Upload data to a given URL

The data will be uploaded as a block blob.

:param str blob_url:
The full URI to the blob. This can also include a SAS token.
:param data:
The data to upload. This can be bytes, text, an iterable or a file-like object.
:type data: bytes or str or Iterable
:param credential:
The credentials with which to authenticate. This is optional if the
blob URL already has a SAS token. The value can be a SAS token string, an account
shared access key, or an instance of a TokenCredentials class from azure.identity.
If the URL already has a SAS token, specifying an explicit credential will take priority.
:keyword bool overwrite:
Whether the blob to be uploaded should overwrite the current data.
If True, upload_blob_to_url will overwrite any existing data. If set to False, the
operation will fail with a ResourceExistsError.
:keyword int max_concurrency:
The number of parallel connections with which to download.
:keyword int length:
Number of bytes to read from the stream. This is optional, but
should be supplied for optimal performance.
:keyword dict(str,str) metadata:
Name-value pairs associated with the blob as metadata.
:keyword bool validate_content:
If true, calculates an MD5 hash for each chunk of the blob. The storage
service checks the hash of the content that has arrived with the hash
that was sent. This is primarily valuable for detecting bitflips on
the wire if using http instead of https as https (the default) will
already validate. Note that this MD5 hash is not stored with the
blob. Also note that if enabled, the memory-efficient upload algorithm
will not be used, because computing the MD5 hash requires buffering
entire blocks, and doing so defeats the purpose of the memory-efficient algorithm.
:keyword str encoding:
Encoding to use if text is supplied as input. Defaults to UTF-8.
:returns: Blob-updated property dict (Etag and last modified)
:rtype: dict(str, Any)
"""
with BlobClient.from_blob_url(blob_url, credential=credential) as client:
return client.upload_blob(data=data, blob_type=BlobType.BlockBlob, **kwargs)


def _download_to_stream(client, handle, **kwargs):
"""Download data to specified open file-handle."""
stream = client.download_blob(**kwargs)
stream.readinto(handle)


def download_blob_from_url(
blob_url, # type: str
output, # type: str
credential=None, # type: Any
**kwargs):
# type: (...) -> None
"""Download the contents of a blob to a local file or stream.

:param str blob_url:
The full URI to the blob. This can also include a SAS token.
:param output:
Where the data should be downloaded to. This could be either a file path to write to,
or an open IO handle to write to.
:type output: str or writable stream.
:param credential:
The credentials with which to authenticate. This is optional if the
blob URL already has a SAS token or the blob is public. The value can be a SAS token string,
an account shared access key, or an instance of a TokenCredentials class from azure.identity.
If the URL already has a SAS token, specifying an explicit credential will take priority.
:keyword bool overwrite:
Whether the local file should be overwritten if it already exists. The default value is
`False` - in which case a ValueError will be raised if the file already exists. If set to
`True`, an attempt will be made to write to the existing file. If a stream handle is passed
in, this value is ignored.
:keyword int max_concurrency:
The number of parallel connections with which to download.
:keyword int offset:
Start of byte range to use for downloading a section of the blob.
Must be set if length is provided.
:keyword int length:
Number of bytes to read from the stream. This is optional, but
should be supplied for optimal performance.
:keyword bool validate_content:
If true, calculates an MD5 hash for each chunk of the blob. The storage
service checks the hash of the content that has arrived with the hash
that was sent. This is primarily valuable for detecting bitflips on
the wire if using http instead of https as https (the default) will
already validate. Note that this MD5 hash is not stored with the
blob. Also note that if enabled, the memory-efficient upload algorithm
will not be used, because computing the MD5 hash requires buffering
entire blocks, and doing so defeats the purpose of the memory-efficient algorithm.
:rtype: None
"""
overwrite = kwargs.pop('overwrite', False)
with BlobClient.from_blob_url(blob_url, credential=credential) as client:
if hasattr(output, 'write'):
_download_to_stream(client, output, **kwargs)
else:
if not overwrite and os.path.isfile(output):
raise ValueError("The file '{}' already exists.".format(output))
with open(output, 'wb') as file_handle:
_download_to_stream(client, file_handle, **kwargs)


__all__ = [
'upload_blob_to_url',
'download_blob_from_url',
'BlobServiceClient',
'ContainerClient',
'BlobClient',
'BlobType',
'BlobLeaseClient',
'StorageErrorCode',
'UserDelegationKey',
'ExponentialRetry',
'LinearRetry',
'LocationMode',
'BlockState',
'StandardBlobTier',
'PremiumPageBlobTier',
'SequenceNumberAction',
'PublicAccess',
'BlobAnalyticsLogging',
'Metrics',
'RetentionPolicy',
'StaticWebsite',
'CorsRule',
'ContainerProperties',
'BlobProperties',
'LeaseProperties',
'ContentSettings',
'CopyProperties',
'BlobBlock',
'PageRange',
'AccessPolicy',
'ContainerSasPermissions',
'BlobSasPermissions',
'ResourceTypes',
'AccountSasPermissions',
'StorageStreamDownloader',
'CustomerProvidedEncryptionKey',
'RehydratePriority',
'generate_account_sas',
'generate_container_sas',
'generate_blob_sas',
'PartialBatchErrorException',
'ContainerEncryptionScope'
]
Loading