Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Storage][ADLS] API updates #10170

Merged
merged 4 commits into from
Mar 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sdk/storage/azure-storage-file-datalake/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ file = DataLakeFileClient.from_connection_string("my_connection_string",
file_system_name="myfilesystem", file_path="myfile")

with open("./BlockDestination.txt", "wb") as my_file:
file_data = file.read_file(stream=my_file)
download = file.download_file()
download.readinto(my_file)
```

### Enumerating paths
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# license information.
# --------------------------------------------------------------------------

from ._download import StorageStreamDownloader
from ._data_lake_file_client import DataLakeFileClient
from ._data_lake_directory_client import DataLakeDirectoryClient
from ._file_system_client import FileSystemClient
Expand Down Expand Up @@ -66,4 +67,5 @@
'generate_directory_sas',
'generate_file_sas',
'VERSION',
'StorageStreamDownloader'
]
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from ._shared.uploads import IterStreamer
from ._upload_helper import upload_datalake_file
from ._generated.models import StorageErrorException
from ._download import StorageStreamDownloader
from ._path_client import PathClient
from ._serialize import get_mod_conditions, get_path_http_headers, get_access_conditions, add_metadata_headers
from ._deserialize import process_storage_error
Expand Down Expand Up @@ -502,22 +503,18 @@ def flush_data(self, offset, # type: int
except StorageErrorException as error:
process_storage_error(error)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also update any samples @xiafu-msft added recently?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiafu-msft I think the upload/download sample I updated is the only one?

def read_file(self, offset=None, # type: Optional[int]
length=None, # type: Optional[int]
stream=None, # type: Optional[IO]
**kwargs):
# type: (...) -> Union[int, byte]
"""Download a file from the service. Return the downloaded data in bytes or
write the downloaded data into user provided stream and return the written size.
def download_file(self, offset=None, length=None, **kwargs):
# type: (Optional[int], Optional[int], Any) -> StorageStreamDownloader
"""Downloads a file to the StorageStreamDownloader. The readall() method must
be used to read all the content, or readinto() must be used to download the file into
a stream.

:param int offset:
Start of byte range to use for downloading a section of the file.
Must be set if length is provided.
:param int length:
Number of bytes to read from the stream. This is optional, but
should be supplied for optimal performance.
:param int stream:
User provided stream to write the downloaded data into.
:keyword lease:
If specified, download only succeeds if the file's lease is active
and matches this ID. Required if the file has an active lease.
Expand Down Expand Up @@ -545,8 +542,8 @@ def read_file(self, offset=None, # type: Optional[int]
The timeout parameter is expressed in seconds. This method may make
multiple calls to the Azure service and the timeout will apply to
each call individually.
:returns: downloaded data or the size of data written into the provided stream
:rtype: bytes or int
:returns: A streaming object (StorageStreamDownloader)
rakshith91 marked this conversation as resolved.
Show resolved Hide resolved
:rtype: ~azure.storage.filedatalake.StorageStreamDownloader

.. admonition:: Example:

Expand All @@ -558,9 +555,7 @@ def read_file(self, offset=None, # type: Optional[int]
:caption: Return the downloaded data.
"""
downloader = self._blob_client.download_blob(offset=offset, length=length, **kwargs)
if stream:
return downloader.readinto(stream)
return downloader.readall()
return StorageStreamDownloader(downloader)

def rename_file(self, new_name, # type: str
**kwargs):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------

from ._models import FileProperties


class StorageStreamDownloader(object):
annatisch marked this conversation as resolved.
Show resolved Hide resolved
"""A streaming object to download from Azure Storage.

:ivar str name:
The name of the file being downloaded.
:ivar ~azure.storage.filedatalake.FileProperties properties:
The properties of the file being downloaded. If only a range of the data is being
downloaded, this will be reflected in the properties.
:ivar int size:
The size of the total data in the stream. This will be the byte range if speficied,
otherwise the total size of the file.
"""

def __init__(self, downloader):
self._downloader = downloader
self.name = self._downloader.name
self.properties = FileProperties._from_blob_properties(self._downloader.properties) # pylint: disable=protected-access
self.size = self._downloader.size
annatisch marked this conversation as resolved.
Show resolved Hide resolved

def __len__(self):
return self.size

def chunks(self):
return self._downloader.chunks()

def readall(self):
"""Download the contents of this file.

This operation is blocking until all data is downloaded.
:rtype: bytes or str
"""
return self._downloader.readall()

def readinto(self, stream):
"""Download the contents of this file to a stream.

:param stream:
The stream to download to. This can be an open file-handle,
or any writable stream. The stream must be seekable if the download
uses more than one parallel connection.
:returns: The number of bytes read.
:rtype: int
"""
return self._downloader.readinto(stream)
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# license information.
# --------------------------------------------------------------------------

from ._download_async import StorageStreamDownloader
from .._shared.policies_async import ExponentialRetry, LinearRetry
from ._data_lake_file_client_async import DataLakeFileClient
from ._data_lake_directory_client_async import DataLakeDirectoryClient
Expand All @@ -19,4 +20,5 @@
'DataLakeLeaseClient',
'ExponentialRetry',
'LinearRetry',
'StorageStreamDownloader'
]
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# license information.
# --------------------------------------------------------------------------

from ._download_async import StorageStreamDownloader
from ._path_client_async import PathClient
from .._data_lake_file_client import DataLakeFileClient as DataLakeFileClientBase
from .._deserialize import process_storage_error
Expand Down Expand Up @@ -370,22 +371,18 @@ async def flush_data(self, offset, # type: int
except StorageErrorException as error:
process_storage_error(error)

async def read_file(self, offset=None, # type: Optional[int]
length=None, # type: Optional[int]
stream=None, # type: Optional[IO]
**kwargs):
# type: (...) -> Union[int, byte]
"""Download a file from the service. Return the downloaded data in bytes or
write the downloaded data into user provided stream and return the written size.
async def download_file(self, offset=None, length=None, **kwargs):
# type: (Optional[int], Optional[int], Any) -> StorageStreamDownloader
"""Downloads a file to the StorageStreamDownloader. The readall() method must
be used to read all the content, or readinto() must be used to download the file into
a stream.

:param int offset:
Start of byte range to use for downloading a section of the file.
Must be set if length is provided.
:param int length:
Number of bytes to read from the stream. This is optional, but
should be supplied for optimal performance.
:param int stream:
User provided stream to write the downloaded data into.
:keyword lease:
If specified, download only succeeds if the file's lease is active
and matches this ID. Required if the file has an active lease.
Expand Down Expand Up @@ -413,8 +410,8 @@ async def read_file(self, offset=None, # type: Optional[int]
The timeout parameter is expressed in seconds. This method may make
multiple calls to the Azure service and the timeout will apply to
each call individually.
:returns: downloaded data or the size of data written into the provided stream
:rtype: bytes or int
:returns: A streaming object (StorageStreamDownloader)
:rtype: ~azure.storage.filedatalake.aio.StorageStreamDownloader

.. admonition:: Example:

Expand All @@ -426,9 +423,7 @@ async def read_file(self, offset=None, # type: Optional[int]
:caption: Return the downloaded data.
"""
downloader = await self._blob_client.download_blob(offset=offset, length=length, **kwargs)
if stream:
return await downloader.readinto(stream)
return await downloader.readall()
return StorageStreamDownloader(downloader)

async def rename_file(self, new_name, # type: str
**kwargs):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------

from .._models import FileProperties


class StorageStreamDownloader(object):
"""A streaming object to download from Azure Storage.

:ivar str name:
The name of the file being downloaded.
:ivar ~azure.storage.filedatalake.FileProperties properties:
The properties of the file being downloaded. If only a range of the data is being
downloaded, this will be reflected in the properties.
:ivar int size:
The size of the total data in the stream. This will be the byte range if speficied,
otherwise the total size of the file.
"""

def __init__(self, downloader):
self._downloader = downloader
self.name = self._downloader.name
self.properties = FileProperties._from_blob_properties(self._downloader.properties) # pylint: disable=protected-access
self.size = self._downloader.size

def __len__(self):
return self.size

def chunks(self):
return self._downloader.chunks()

async def readall(self):
"""Download the contents of this file.

This operation is blocking until all data is downloaded.
:rtype: bytes or str
"""
return await self._downloader.readall()

async def readinto(self, stream):
"""Download the contents of this file to a stream.

:param stream:
The stream to download to. This can be an open file-handle,
or any writable stream. The stream must be seekable if the download
uses more than one parallel connection.
:returns: The number of bytes read.
:rtype: int
"""
return await self._downloader.readinto(stream)
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ def upload_download_sample(filesystem_client):
# read the data back
print("Downloading data from '{}'.".format(file_name))
# [START read_file]
downloaded_bytes = file_client.read_file()
download = file_client.download_file()
downloaded_bytes = download.readall()
# [END read_file]

# verify the downloaded content
Expand All @@ -81,7 +82,8 @@ def upload_download_sample(filesystem_client):

# download the renamed file in to local file
with open(SOURCE_FILE, 'wb') as stream:
new_client.read_file(stream=stream)
download = new_client.download_file()
download.readinto(stream)

# [START delete_file]
new_client.delete_file()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ async def upload_download_sample(filesystem_client):
# read the data back
print("Downloading data from '{}'.".format(file_name))
# [START read_file]
downloaded_bytes = await file_client.read_file()
download = await file_client.download_file()
downloaded_bytes = await download.readall()
# [END read_file]

# verify the downloaded content
Expand All @@ -81,7 +82,8 @@ async def upload_download_sample(filesystem_client):

# download the renamed file in to local file
with open(SOURCE_FILE, 'wb') as stream:
await new_client.read_file(stream=stream)
download = await new_client.download_file()
await download.readinto(stream)

# [START delete_file]
await new_client.delete_file()
Expand Down
Loading