Skip to content

Commit

Permalink
use datalake set_expiry operation
Browse files Browse the repository at this point in the history
  • Loading branch information
xiafu-msft committed Sep 11, 2020
1 parent 9bc70d4 commit c6a8d7d
Show file tree
Hide file tree
Showing 13 changed files with 129 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ def _download_blob_options(self, offset=None, length=None, **kwargs):
'lease_access_conditions': access_conditions,
'modified_access_conditions': mod_conditions,
'cpk_info': cpk_info,
'cls': deserialize_blob_stream,
'cls': kwargs.pop('cls', None) or deserialize_blob_stream,
'max_concurrency':kwargs.pop('max_concurrency', 1),
'encoding': kwargs.pop('encoding', None),
'timeout': kwargs.pop('timeout', None),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
DirectoryProperties,
FileProperties,
PathProperties,
PathPropertiesPaged,
LeaseProperties,
ContentSettings,
AccountSasPermissions,
Expand All @@ -32,6 +31,7 @@
DelimitedJsonDialect,
DataLakeFileQueryError
)
from ._list_paths_helper import PathPropertiesPaged
from ._shared_access_signature import generate_account_sas, generate_file_system_sas, generate_directory_sas, \
generate_file_sas

Expand Down Expand Up @@ -60,7 +60,6 @@
'DirectoryProperties',
'FileProperties',
'PathProperties',
'PathPropertiesPaged',
'LeaseProperties',
'ContentSettings',
'AccountSasPermissions',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
from ._deserialize import deserialize_dir_properties
from ._shared.base_client import parse_connection_str
from ._data_lake_file_client import DataLakeFileClient
from ._models import DirectoryProperties
Expand Down Expand Up @@ -230,7 +231,7 @@ def get_directory_properties(self, **kwargs):
:dedent: 4
:caption: Getting the properties for a file/directory.
"""
return self._get_path_properties(cls=DirectoryProperties._deserialize_dir_properties, **kwargs) # pylint: disable=protected-access
return self._get_path_properties(cls=deserialize_dir_properties, **kwargs) # pylint: disable=protected-access

def rename_directory(self, new_name, # type: str
**kwargs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from ._download import StorageStreamDownloader
from ._path_client import PathClient
from ._serialize import get_mod_conditions, get_path_http_headers, get_access_conditions, add_metadata_headers
from ._deserialize import process_storage_error
from ._deserialize import process_storage_error, deserialize_file_properties
from ._models import FileProperties, DataLakeFileQueryError


Expand Down Expand Up @@ -240,7 +240,7 @@ def get_file_properties(self, **kwargs):
:dedent: 4
:caption: Getting the properties for a file.
"""
return self._get_path_properties(cls=FileProperties._deserialize_file_properties, **kwargs) # pylint: disable=protected-access
return self._get_path_properties(cls=deserialize_file_properties, **kwargs) # pylint: disable=protected-access

def set_file_expiry(self, expiry_options, # type: str
expires_on=None, # type: Optional[Union[datetime, int]]
Expand All @@ -258,7 +258,8 @@ def set_file_expiry(self, expiry_options, # type: str
The timeout parameter is expressed in seconds.
:rtype: None
"""
return self._blob_client._client.blob.set_expiry(expiry_options, expires_on=expires_on, **kwargs) # pylint: disable=protected-access
return self._datalake_client_for_blob_operation.path\
.set_expiry(expiry_options, expires_on=expires_on, **kwargs) # pylint: disable=protected-access

def _upload_options( # pylint:disable=too-many-statements
self, data, # type: Union[Iterable[AnyStr], IO[AnyStr]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from azure.core.pipeline.policies import ContentDecodePolicy
from azure.core.exceptions import HttpResponseError, DecodeError, ResourceModifiedError, ClientAuthenticationError, \
ResourceNotFoundError, ResourceExistsError
from ._models import FileProperties, DirectoryProperties, LeaseProperties
from ._shared.models import StorageErrorCode

if TYPE_CHECKING:
Expand All @@ -20,6 +21,45 @@
_LOGGER = logging.getLogger(__name__)


def deserialize_dir_properties(response, obj, headers):
metadata = deserialize_metadata(response, obj, headers)
dir_properties = DirectoryProperties(
metadata=metadata,
**headers
)
return dir_properties


def deserialize_file_properties(response, obj, headers):
metadata = deserialize_metadata(response, obj, headers)
file_properties = FileProperties(
metadata=metadata,
**headers
)
if 'Content-Range' in headers:
if 'x-ms-blob-content-md5' in headers:
file_properties.content_settings.content_md5 = headers['x-ms-blob-content-md5']
else:
file_properties.content_settings.content_md5 = None
return file_properties


def from_blob_properties(blob_properties):
file_props = FileProperties()
file_props.name = blob_properties.name
file_props.etag = blob_properties.etag
file_props.deleted = blob_properties.deleted
file_props.metadata = blob_properties.metadata
file_props.lease = blob_properties.lease
file_props.lease.__class__ = LeaseProperties
file_props.last_modified = blob_properties.last_modified
file_props.creation_time = blob_properties.creation_time
file_props.size = blob_properties.size
file_props.deleted_time = blob_properties.deleted_time
file_props.remaining_retention_days = blob_properties.remaining_retention_days
file_props.content_settings = blob_properties.content_settings
return file_props

def normalize_headers(headers):
normalized = {}
for key, value in headers.items():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------

from ._models import FileProperties
from ._deserialize import from_blob_properties


class StorageStreamDownloader(object):
Expand All @@ -23,7 +22,7 @@ class StorageStreamDownloader(object):
def __init__(self, downloader):
self._downloader = downloader
self.name = self._downloader.name
self.properties = FileProperties._from_blob_properties(self._downloader.properties) # pylint: disable=protected-access
self.properties = from_blob_properties(self._downloader.properties) # pylint: disable=protected-access
self.size = self._downloader.size

def __len__(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
from azure.storage.blob import ContainerClient
from ._shared.base_client import StorageAccountHostsMixin, parse_query, parse_connection_str
from ._serialize import convert_dfs_url_to_blob_url
from ._models import LocationMode, FileSystemProperties, PathPropertiesPaged, PublicAccess
from ._models import LocationMode, FileSystemProperties, PublicAccess
from ._list_paths_helper import PathPropertiesPaged
from ._data_lake_file_client import DataLakeFileClient
from ._data_lake_directory_client import DataLakeDirectoryClient
from ._data_lake_lease import DataLakeLeaseClient
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from azure.core.paging import PageIterator
from ._generated.models import StorageErrorException
from ._models import PathProperties
from ._deserialize import return_headers_and_deserialized_path_list
from ._generated.models import Path
from ._shared.response_handlers import process_storage_error


class PathPropertiesPaged(PageIterator):
"""An Iterable of Path properties.
:ivar str path: Filters the results to return only paths under the specified path.
:ivar int results_per_page: The maximum number of results retrieved per API call.
:ivar str continuation_token: The continuation token to retrieve the next page of results.
:ivar list(~azure.storage.filedatalake.PathProperties) current_page: The current page of listed results.
:param callable command: Function to retrieve the next page of items.
:param str path: Filters the results to return only paths under the specified path.
:param int max_results: The maximum number of psths to retrieve per
call.
:param str continuation_token: An opaque continuation token.
"""
def __init__(
self, command,
recursive,
path=None,
max_results=None,
continuation_token=None,
upn=None):
super(PathPropertiesPaged, self).__init__(
get_next=self._get_next_cb,
extract_data=self._extract_data_cb,
continuation_token=continuation_token or ""
)
self._command = command
self.recursive = recursive
self.results_per_page = max_results
self.path = path
self.upn = upn
self.current_page = None
self.path_list = None

def _get_next_cb(self, continuation_token):
try:
return self._command(
self.recursive,
continuation=continuation_token or None,
path=self.path,
max_results=self.results_per_page,
upn=self.upn,
cls=return_headers_and_deserialized_path_list)
except StorageErrorException as error:
process_storage_error(error)

def _extract_data_cb(self, get_next_return):
self.path_list, self._response = get_next_return
self.current_page = [self._build_item(item) for item in self.path_list]

return self._response['continuation'] or None, self.current_page

@staticmethod
def _build_item(item):
if isinstance(item, PathProperties):
return item
if isinstance(item, Path):
path = PathProperties._from_generated(item) # pylint: disable=protected-access
return path
return item
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
# pylint: disable=super-init-not-called, too-many-lines
from enum import Enum

from azure.core.paging import PageIterator
from azure.storage.blob import LeaseProperties as BlobLeaseProperties
from azure.storage.blob import AccountSasPermissions as BlobAccountSasPermissions
from azure.storage.blob import ResourceTypes as BlobResourceTypes
Expand All @@ -17,12 +16,8 @@
from azure.storage.blob import AccessPolicy as BlobAccessPolicy
from azure.storage.blob import DelimitedTextDialect as BlobDelimitedTextDialect
from azure.storage.blob import DelimitedJsonDialect as BlobDelimitedJSON
from azure.storage.blob._generated.models import StorageErrorException
from azure.storage.blob._models import ContainerPropertiesPaged
from ._deserialize import return_headers_and_deserialized_path_list, deserialize_metadata
from ._generated.models import Path
from ._shared.models import DictMixin
from ._shared.response_handlers import process_storage_error


class FileSystemProperties(object):
Expand Down Expand Up @@ -141,15 +136,6 @@ def __init__(self, **kwargs):
self.deleted_time = None
self.remaining_retention_days = None

@classmethod
def _deserialize_dir_properties(cls, response, obj, headers):
metadata = deserialize_metadata(response, obj, headers)
dir_properties = cls(
metadata=metadata,
**headers
)
return dir_properties


class FileProperties(DictMixin):
"""
Expand Down Expand Up @@ -183,20 +169,6 @@ def __init__(self, **kwargs):
self.remaining_retention_days = None
self.content_settings = ContentSettings(**kwargs)

@classmethod
def _deserialize_file_properties(cls, response, obj, headers):
metadata = deserialize_metadata(response, obj, headers)
file_properties = cls(
metadata=metadata,
**headers
)
if 'Content-Range' in headers:
if 'x-ms-blob-content-md5' in headers:
file_properties.content_settings.content_md5 = headers['x-ms-blob-content-md5']
else:
file_properties.content_settings.content_md5 = None
return file_properties


class PathProperties(object):
"""Path properties listed by get_paths api.
Expand Down Expand Up @@ -242,68 +214,6 @@ def _from_generated(cls, generated):
return path_prop


class PathPropertiesPaged(PageIterator):
"""An Iterable of Path properties.
:ivar str path: Filters the results to return only paths under the specified path.
:ivar int results_per_page: The maximum number of results retrieved per API call.
:ivar str continuation_token: The continuation token to retrieve the next page of results.
:ivar list(~azure.storage.filedatalake.PathProperties) current_page: The current page of listed results.
:param callable command: Function to retrieve the next page of items.
:param str path: Filters the results to return only paths under the specified path.
:param int max_results: The maximum number of psths to retrieve per
call.
:param str continuation_token: An opaque continuation token.
"""
def __init__(
self, command,
recursive,
path=None,
max_results=None,
continuation_token=None,
upn=None):
super(PathPropertiesPaged, self).__init__(
get_next=self._get_next_cb,
extract_data=self._extract_data_cb,
continuation_token=continuation_token or ""
)
self._command = command
self.recursive = recursive
self.results_per_page = max_results
self.path = path
self.upn = upn
self.current_page = None
self.path_list = None

def _get_next_cb(self, continuation_token):
try:
return self._command(
self.recursive,
continuation=continuation_token or None,
path=self.path,
max_results=self.results_per_page,
upn=self.upn,
cls=return_headers_and_deserialized_path_list)
except StorageErrorException as error:
process_storage_error(error)

def _extract_data_cb(self, get_next_return):
self.path_list, self._response = get_next_return
self.current_page = [self._build_item(item) for item in self.path_list]

return self._response['continuation'] or None, self.current_page

@staticmethod
def _build_item(item):
if isinstance(item, PathProperties):
return item
if isinstance(item, Path):
path = PathProperties._from_generated(item) # pylint: disable=protected-access
return path
return item


class LeaseProperties(BlobLeaseProperties):
"""DataLake Lease Properties.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ def __init__(
# ADLS doesn't support secondary endpoint, make sure it's empty
self._hosts[LocationMode.SECONDARY] = ""
self._client = DataLakeStorageClient(self.url, file_system_name, path_name, pipeline=self._pipeline)
self._datalake_client_for_blob_operation = DataLakeStorageClient(self._blob_client.url,
file_system_name, path_name,
pipeline=self._pipeline)

def __exit__(self, *args):
self._blob_client.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from ._data_lake_file_client_async import DataLakeFileClient
from .._data_lake_directory_client import DataLakeDirectoryClient as DataLakeDirectoryClientBase
from .._models import DirectoryProperties
from .._deserialize import deserialize_dir_properties
from ._path_client_async import PathClient


Expand Down Expand Up @@ -200,7 +201,7 @@ async def get_directory_properties(self, **kwargs):
:dedent: 4
:caption: Getting the properties for a file/directory.
"""
return await self._get_path_properties(cls=DirectoryProperties._deserialize_dir_properties, **kwargs) # pylint: disable=protected-access
return await self._get_path_properties(cls=deserialize_dir_properties, **kwargs) # pylint: disable=protected-access

async def rename_directory(self, new_name, # type: str
**kwargs):
Expand Down
Loading

0 comments on commit c6a8d7d

Please sign in to comment.