Skip to content

Commit

Permalink
[aiotools] add basename to FileStatus; rename name in FileListEntry (#…
Browse files Browse the repository at this point in the history
…14139)

The term `name` has long been ambiguous and, indeed, our Azure
implementation differed from S3 and Google. Now both FileStatus and
FileListEntry use the term `basename`. Azure has been changed to have
the same semantics as S3 and Google.

---------

Co-authored-by: jigold <jigold@users.noreply.github.com>
  • Loading branch information
danking and jigold authored Jan 12, 2024
1 parent 7ee4141 commit f49789f
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 39 deletions.
26 changes: 20 additions & 6 deletions hail/python/hailtop/aiocloud/aioaws/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,15 @@ async def __anext__(self):


class S3HeadObjectFileStatus(FileStatus):
def __init__(self, head_object_resp):
def __init__(self, head_object_resp, url: str):
self.head_object_resp = head_object_resp
self._url = url

def basename(self) -> str:
return os.path.basename(self._url.rstrip('/'))

def url(self) -> str:
return self._url

async def size(self) -> int:
return self.head_object_resp['ContentLength']
Expand All @@ -95,8 +102,15 @@ async def __getitem__(self, key: str) -> Any:


class S3ListFilesFileStatus(FileStatus):
def __init__(self, item: Dict[str, Any]):
def __init__(self, item: Dict[str, Any], url: str):
self._item = item
self._url = url

def basename(self) -> str:
return os.path.basename(self._url.rstrip('/'))

def url(self) -> str:
return self._url

async def size(self) -> int:
return self._item['Size']
Expand Down Expand Up @@ -166,8 +180,8 @@ def __init__(self, bucket: str, key: str, item: Optional[Dict[str, Any]]):
self._item = item
self._status: Optional[S3ListFilesFileStatus] = None

def name(self) -> str:
return os.path.basename(self._key)
def basename(self) -> str:
return os.path.basename(self._key.rstrip('/'))

async def url(self) -> str:
return f's3://{self._bucket}/{self._key}'
Expand All @@ -182,7 +196,7 @@ async def status(self) -> FileStatus:
if self._status is None:
if self._item is None:
raise IsADirectoryError(f's3://{self._bucket}/{self._key}')
self._status = S3ListFilesFileStatus(self._item)
self._status = S3ListFilesFileStatus(self._item, await self.url())
return self._status


Expand Down Expand Up @@ -458,7 +472,7 @@ async def statfile(self, url: str) -> FileStatus:
bucket, name = self.get_bucket_and_name(url)
try:
resp = await blocking_to_async(self._thread_pool, self._s3.head_object, Bucket=bucket, Key=name)
return S3HeadObjectFileStatus(resp)
return S3HeadObjectFileStatus(resp, url)
except botocore.exceptions.ClientError as e:
if e.response['ResponseMetadata']['HTTPStatusCode'] == 404:
raise FileNotFoundError(url) from e
Expand Down
21 changes: 15 additions & 6 deletions hail/python/hailtop/aiocloud/aioazure/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import abc
import re
import os
import asyncio
from functools import wraps
import secrets
Expand Down Expand Up @@ -236,8 +237,8 @@ def __init__(self, url: 'AzureAsyncFSURL', blob_props: Optional[BlobProperties])
self._blob_props = blob_props
self._status: Optional[AzureFileStatus] = None

def name(self) -> str:
return self._url.path
def basename(self) -> str:
return os.path.basename(self._url.base.rstrip('/'))

async def url(self) -> str:
return self._url.base
Expand All @@ -255,13 +256,20 @@ async def status(self) -> FileStatus:
if self._status is None:
if self._blob_props is None:
raise IsADirectoryError(await self.url())
self._status = AzureFileStatus(self._blob_props)
self._status = AzureFileStatus(self._blob_props, self._url)
return self._status


class AzureFileStatus(FileStatus):
def __init__(self, blob_props: BlobProperties):
def __init__(self, blob_props: BlobProperties, url: 'AzureAsyncFSURL'):
self.blob_props = blob_props
self._url = url

def basename(self) -> str:
return os.path.basename(self._url.base.rstrip('/'))

def url(self) -> str:
return str(self._url)

async def size(self) -> int:
size = self.blob_props.size
Expand Down Expand Up @@ -536,8 +544,9 @@ async def makedirs(self, url: str, exist_ok: bool = False) -> None:
@handle_public_access_error
async def statfile(self, url: str) -> FileStatus:
try:
blob_props = await self.get_blob_client(self.parse_url(url)).get_blob_properties()
return AzureFileStatus(blob_props)
parsed_url = self.parse_url(url)
blob_props = await self.get_blob_client(parsed_url).get_blob_properties()
return AzureFileStatus(blob_props, parsed_url)
except azure.core.exceptions.ResourceNotFoundError as e:
raise FileNotFoundError(url) from e

Expand Down
17 changes: 12 additions & 5 deletions hail/python/hailtop/aiocloud/aiogoogle/client/storage_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,8 +441,15 @@ def _update_params_with_user_project(self, request_kwargs, bucket):


class GetObjectFileStatus(FileStatus):
def __init__(self, items: Dict[str, str]):
def __init__(self, items: Dict[str, str], url: str):
self._items = items
self._url = url

def basename(self) -> str:
return os.path.basename(self._url.rstrip('/'))

def url(self) -> str:
return self._url

async def size(self) -> int:
return int(self._items['size'])
Expand All @@ -464,8 +471,8 @@ def __init__(self, bucket: str, name: str, items: Optional[Dict[str, Any]]):
self._items = items
self._status: Optional[GetObjectFileStatus] = None

def name(self) -> str:
return os.path.basename(self._name)
def basename(self) -> str:
return os.path.basename(self._name.rstrip('/'))

async def url(self) -> str:
return f'gs://{self._bucket}/{self._name}'
Expand All @@ -480,7 +487,7 @@ async def status(self) -> FileStatus:
if self._status is None:
if self._items is None:
raise IsADirectoryError(await self.url())
self._status = GetObjectFileStatus(self._items)
self._status = GetObjectFileStatus(self._items, await self.url())
return self._status


Expand Down Expand Up @@ -699,7 +706,7 @@ async def makedirs(self, url: str, exist_ok: bool = False) -> None:
async def statfile(self, url: str) -> GetObjectFileStatus:
try:
bucket, name = self.get_bucket_and_name(url)
return GetObjectFileStatus(await self._storage_client.get_object_metadata(bucket, name))
return GetObjectFileStatus(await self._storage_client.get_object_metadata(bucket, name), url)
except aiohttp.ClientResponseError as e:
if e.status == 404:
raise FileNotFoundError(url) from e
Expand Down
117 changes: 100 additions & 17 deletions hail/python/hailtop/aiotools/fs/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,44 @@ async def with_exception(


class FileStatus(abc.ABC):
@abc.abstractmethod
def basename(self) -> str:
"""The basename of the object.
Examples
--------
The basename of all of these objects is "file":
- s3://bucket/folder/file
- gs://bucket/folder/file
- https://account.blob.core.windows.net/container/folder/file
- https://account.blob.core.windows.net/container/folder/file?sv=2023-01-01&sr=bv&sig=abc123&sp=rcw
- /folder/file
"""

@abc.abstractmethod
def url(self) -> str:
"""The URL of the object without any query parameters.
Examples
--------
- s3://bucket/folder/file
- gs://bucket/folder/file
- https://account.blob.core.windows.net/container/folder/file
- /folder/file
Note that the following URL
https://account.blob.core.windows.net/container/folder/file?sv=2023-01-01&sr=bv&sig=abc123&sp=rcw
becomes
https://account.blob.core.windows.net/container/folder/file
"""

@abc.abstractmethod
async def size(self) -> int:
pass
Expand Down Expand Up @@ -66,17 +104,62 @@ async def __getitem__(self, key: str) -> Any:

class FileListEntry(abc.ABC):
@abc.abstractmethod
def name(self) -> str:
pass
def basename(self) -> str:
"""The basename of the object.
Examples
--------
The basename of all of these objects is "file":
- s3://bucket/folder/file
- gs://bucket/folder/file
- https://account.blob.core.windows.net/container/folder/file
- https://account.blob.core.windows.net/container/folder/file?sv=2023-01-01&sr=bv&sig=abc123&sp=rcw
- /folder/file
"""

@abc.abstractmethod
async def url(self) -> str:
pass
"""The URL of the object without any query parameters.
Examples
--------
- s3://bucket/folder/file
- gs://bucket/folder/file
- https://account.blob.core.windows.net/container/folder/file
- /folder/file
Note that the following URL
https://account.blob.core.windows.net/container/folder/file?sv=2023-01-01&sr=bv&sig=abc123&sp=rcw
becomes
https://account.blob.core.windows.net/container/folder/file
"""

async def url_maybe_trailing_slash(self) -> str:
return await self.url()

async def url_full(self) -> str:
"""The URL of the object with any query parameters.
Examples
--------
The only interesting case is for signed URLs in Azure. These are called shared signature tokens or SAS tokens.
For example, the following URL
https://account.blob.core.windows.net/container/folder/file?sv=2023-01-01&sr=bv&sig=abc123&sp=rcw
is a signed version of this URL
https://account.blob.core.windows.net/container/folder/file
"""
return await self.url()

@abc.abstractmethod
Expand All @@ -100,7 +183,7 @@ async def create_part(
pass

@abc.abstractmethod
async def __aenter__(self) -> 'MultiPartCreate':
async def __aenter__(self) -> "MultiPartCreate":
pass

@abc.abstractmethod
Expand Down Expand Up @@ -132,12 +215,12 @@ def scheme(self) -> str:
pass

@abc.abstractmethod
def with_path(self, path) -> 'AsyncFSURL':
def with_path(self, path) -> "AsyncFSURL":
pass

def with_new_path_component(self, new_path_component) -> 'AsyncFSURL':
prefix = self.path if self.path.endswith('/') else self.path + '/'
suffix = new_path_component[1:] if new_path_component.startswith('/') else new_path_component
def with_new_path_component(self, new_path_component) -> "AsyncFSURL":
prefix = self.path if self.path.endswith("/") else self.path + "/"
suffix = new_path_component[1:] if new_path_component.startswith("/") else new_path_component
return self.with_path(prefix + suffix)

@abc.abstractmethod
Expand All @@ -146,8 +229,8 @@ def __str__(self) -> str:


class AsyncFS(abc.ABC):
FILE = 'file'
DIR = 'dir'
FILE = "file"
DIR = "dir"

@staticmethod
@abc.abstractmethod
Expand Down Expand Up @@ -177,12 +260,12 @@ async def open(self, url: str) -> ReadableStream:
async def open_from(self, url: str, start: int, *, length: Optional[int] = None) -> ReadableStream:
if length == 0:
fs_url = self.parse_url(url)
if fs_url.path.endswith('/'):
file_url = str(fs_url.with_path(fs_url.path.rstrip('/')))
if fs_url.path.endswith("/"):
file_url = str(fs_url.with_path(fs_url.path.rstrip("/")))
dir_url = str(fs_url)
else:
file_url = str(fs_url)
dir_url = str(fs_url.with_path(fs_url.path + '/'))
dir_url = str(fs_url.with_path(fs_url.path + "/"))
isfile, isdir = await asyncio.gather(self.isfile(file_url), self.isdir(dir_url))
if isfile:
if isdir:
Expand Down Expand Up @@ -228,10 +311,10 @@ async def staturl(self, url: str) -> str:
pass

async def _staturl_parallel_isfile_isdir(self, url: str) -> str:
assert not url.endswith('/')
assert not url.endswith("/")

[(is_file, isfile_exc), (is_dir, isdir_exc)] = await asyncio.gather(
with_exception(self.isfile, url), with_exception(self.isdir, url + '/')
with_exception(self.isfile, url), with_exception(self.isdir, url + "/")
)
# raise exception deterministically
if isfile_exc:
Expand Down Expand Up @@ -325,7 +408,7 @@ async def exists(self, url: str) -> bool:
async def close(self) -> None:
pass

async def __aenter__(self) -> 'AsyncFS':
async def __aenter__(self) -> "AsyncFS":
return self

async def __aexit__(
Expand All @@ -334,7 +417,7 @@ async def __aexit__(
await self.close()


T = TypeVar('T', bound=AsyncFS)
T = TypeVar("T", bound=AsyncFS)


class AsyncFSFactory(abc.ABC, Generic[T]):
Expand Down
Loading

0 comments on commit f49789f

Please sign in to comment.