Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fs] support hfs.ls on a bucket #14176

Merged
merged 14 commits into from
Feb 6, 2024
7 changes: 5 additions & 2 deletions hail/python/hail/backend/local_backend.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Optional, Union, Tuple, List
from contextlib import ExitStack
import os
import sys

Expand Down Expand Up @@ -31,6 +32,7 @@ def __init__(
gcs_requester_pays_project: Optional[str] = None,
gcs_requester_pays_buckets: Optional[str] = None,
):
self._exit_stack = ExitStack()
assert gcs_requester_pays_project is not None or gcs_requester_pays_buckets is None

spark_home = find_spark_home()
Expand Down Expand Up @@ -59,6 +61,7 @@ def __init__(
die_on_exit=True,
)
self._gateway = JavaGateway(gateway_parameters=GatewayParameters(port=port, auto_convert=True))
self._exit_stack.callback(self._gateway.shutdown)

hail_package = getattr(self._gateway.jvm, 'is').hail

Expand All @@ -75,7 +78,7 @@ def __init__(

super(LocalBackend, self).__init__(self._gateway.jvm, jbackend, jhc)

self._fs = RouterFS()
self._fs = self._exit_stack.enter_context(RouterFS())
self._logger = None

self._initialize_flags({})
Expand Down Expand Up @@ -108,7 +111,7 @@ def register_ir_function(

def stop(self):
super().stop()
self._gateway.shutdown()
self._exit_stack.close()
uninstall_exception_handler()

@property
Expand Down
10 changes: 7 additions & 3 deletions hail/python/hail/backend/service_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ async def create(
gcs_requester_pays_configuration: Optional[GCSRequesterPaysConfiguration] = None,
gcs_bucket_allow_list: Optional[List[str]] = None,
):
async_exit_stack = AsyncExitStack()
billing_project = configuration_of(ConfigVariable.BATCH_BILLING_PROJECT, billing_project, None)
if billing_project is None:
raise ValueError(
Expand All @@ -221,9 +222,11 @@ async def create(
gcs_kwargs={'gcs_requester_pays_configuration': gcs_requester_pays_configuration},
gcs_bucket_allow_list=gcs_bucket_allow_list,
)
async_exit_stack.push_async_callback(async_fs.close)
sync_fs = RouterFS(async_fs)
if batch_client is None:
batch_client = await BatchClient.create(billing_project, _token=credentials_token)
async_exit_stack.push_async_callback(batch_client.close)
batch_attributes: Dict[str, str] = dict()
remote_tmpdir = get_remote_tmpdir('ServiceBackend', remote_tmpdir=remote_tmpdir)

Expand Down Expand Up @@ -288,6 +291,7 @@ async def create(
worker_cores=worker_cores,
worker_memory=worker_memory,
regions=regions,
async_exit_stack=async_exit_stack,
)
sb._initialize_flags(flags)
return sb
Expand All @@ -308,6 +312,7 @@ def __init__(
worker_cores: Optional[Union[int, str]],
worker_memory: Optional[str],
regions: List[str],
async_exit_stack: AsyncExitStack,
):
super(ServiceBackend, self).__init__()
self.billing_project = billing_project
Expand All @@ -329,6 +334,7 @@ def __init__(
self.regions = regions

self._batch: Batch = self._create_batch()
self._async_exit_stack = async_exit_stack

def _create_batch(self) -> Batch:
return self._batch_client.create_batch(attributes=self.batch_attributes)
Expand Down Expand Up @@ -362,9 +368,7 @@ def stop(self):
hail_event_loop().run_until_complete(self._stop())

async def _stop(self):
async with AsyncExitStack() as stack:
stack.push_async_callback(self._async_fs.close)
stack.push_async_callback(self._batch_client.close)
await self._async_exit_stack.aclose()
self.functions = []
self._registered_ir_function_names = set()

Expand Down
24 changes: 22 additions & 2 deletions hail/python/hailtop/aiocloud/aioaws/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
AsyncFSURL,
MultiPartCreate,
FileAndDirectoryError,
IsABucketError,
)
from hailtop.aiotools.fs.exceptions import UnexpectedEOFError
from hailtop.aiotools.fs.stream import (
Expand Down Expand Up @@ -325,6 +326,9 @@ def __init__(self, bucket: str, path: str):
self._bucket = bucket
self._path = path

def __repr__(self):
return f'S3AsyncFSURL({self._bucket}, {self._path})'

@property
def bucket_parts(self) -> List[str]:
return [self._bucket]
Expand All @@ -344,6 +348,9 @@ def scheme(self) -> str:
def with_path(self, path) -> 'S3AsyncFSURL':
return S3AsyncFSURL(self._bucket, path)

def with_root_path(self) -> 'S3AsyncFSURL':
return self.with_path('')

def __str__(self) -> str:
return f's3://{self._bucket}/{self._path}'

Expand Down Expand Up @@ -424,6 +431,8 @@ def get_bucket_and_name(url: str) -> Tuple[str, str]:

async def open(self, url: str) -> ReadableStream:
bucket, name = self.get_bucket_and_name(url)
if name == '':
raise IsABucketError(url)
try:
resp = await blocking_to_async(self._thread_pool, self._s3.get_object, Bucket=bucket, Key=name)
return blocking_readable_stream_to_async(self._thread_pool, cast(BinaryIO, resp['Body']))
Expand Down Expand Up @@ -490,6 +499,8 @@ async def create(self, url: str, *, retry_writes: bool = True) -> S3CreateManage
# complete before the write can begin (unlike the current
# code, that copies 128MB parts in 256KB chunks).
bucket, name = self.get_bucket_and_name(url)
if name == '':
raise IsABucketError(url)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think I'd rather have this handled with a decorator (or at least a separate method). Is it bad DX to have these methods accept a S3AsyncFSURL and have a decorator on top tha converts str to the the URL object, optionally erroring if there's no path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, my main gripe with decorators is that they tend to mangle the types. I tried writing a decorator with precise types in this commit. I'm not sure why this fails:

pyright fs.py
/Users/dking/projects/hail/hail/python/hailtop/aiocloud/aioaws/fs.py
  /Users/dking/projects/hail/hail/python/hailtop/aiocloud/aioaws/fs.py:460:15 - error: "open" overrides method of same name in class "AsyncFS" with incompatible type "StrOrURLMethod[(), Coroutine[Any, Any, ReadableStream]]" (reportIncompatibleMethodOverride)
1 error, 0 warnings, 0 informations 

It works fine though. I can read stuff from S3.

In [1]: from hailtop.aiocloud.aioaws import S3AsyncFS
   ...: fs = S3AsyncFS()
   ...: (await (await fs.open('s3://gnomad-public-us-east-1/release/4.0/constraint/README.txt')).read())[:100]
Out[1]: b'\xef\xbb\xbfConstraint field descriptions\r\n\r\nDescriptions of columns in the gnomAD v4 constraint metrics tsv.'

Maybe I'm overcomplicating what you're asking, but there seems to be two issues:

  1. If the URL is in the open method's signature either we make FS generic (ugh) or we assert that the URL is the right URL everywhere.
  2. Preserving precise type information requires a fair bit of code (even if we can resolve (1)).

I was able to write a decorator that had no types and pyright didn't complain, but I also lost all type hinting on fs.open. IMO, between types and not repeating myself, I'd go with types.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me try something using inheritance instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something along these lines may work and won't be too bad, but we'll need to play the same trick where the RouterAsyncFSURL has a reference to its file system. It will be a rather large change and I (1) would rather not tie that to this PR and (2) have committed to limiting my programming substantially in this last month. What do you think of leaving this as a follow up cleanup?

d39557b

Copy link
Contributor

@daniel-goldstein daniel-goldstein Feb 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hrmph, ya while I don't hate the last option in principle I'd rather not get that involved here. Can you just wrap the error throwing in a method so that most of these call-sites become one-liners and we can punt on the refactor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@daniel-goldstein OK, this was quite a few lines of code change, so worth a careful re-review.

return S3CreateManager(self, bucket, name)

async def multi_part_create(self, sema: asyncio.Semaphore, url: str, num_parts: int) -> MultiPartCreate:
Expand All @@ -504,6 +515,8 @@ async def makedirs(self, url: str, exist_ok: bool = False) -> None:

async def statfile(self, url: str) -> FileStatus:
bucket, name = self.get_bucket_and_name(url)
if name == '':
raise IsABucketError(url)
try:
resp = await blocking_to_async(self._thread_pool, self._s3.head_object, Bucket=bucket, Key=name)
return S3HeadObjectFileStatus(resp, url)
Expand Down Expand Up @@ -579,8 +592,10 @@ async def staturl(self, url: str) -> str:
return await self._staturl_parallel_isfile_isdir(url)

async def isfile(self, url: str) -> bool:
bucket, name = self.get_bucket_and_name(url)
if name == '':
return False
try:
bucket, name = self.get_bucket_and_name(url)
await blocking_to_async(self._thread_pool, self._s3.head_object, Bucket=bucket, Key=name)
return True
except botocore.exceptions.ClientError as e:
Expand All @@ -589,6 +604,9 @@ async def isfile(self, url: str) -> bool:
raise e

async def isdir(self, url: str) -> bool:
_, name = self.get_bucket_and_name(url)
if name == '':
raise IsABucketError(url)
try:
async for _ in await self.listfiles(url, recursive=True):
return True
Expand All @@ -597,8 +615,10 @@ async def isdir(self, url: str) -> bool:
return False

async def remove(self, url: str) -> None:
bucket, name = self.get_bucket_and_name(url)
if name == '':
raise IsABucketError(url)
try:
bucket, name = self.get_bucket_and_name(url)
await blocking_to_async(self._thread_pool, self._s3.delete_object, Bucket=bucket, Key=name)
except self._s3.exceptions.NoSuchKey as e:
raise FileNotFoundError(url) from e
Expand Down
28 changes: 24 additions & 4 deletions hail/python/hailtop/aiocloud/aioazure/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
FileStatus,
FileAndDirectoryError,
UnexpectedEOFError,
IsABucketError,
)

from .credentials import AzureCredentials
Expand Down Expand Up @@ -298,6 +299,9 @@ def __init__(self, account: str, container: str, path: str, query: Optional[str]
self._path = path
self._query = query

def __repr__(self):
return f'AzureAsyncFSURL({self._account}, {self._container}, {self._path}, {self._query})'

@property
def bucket_parts(self) -> List[str]:
return [self._account, self._container]
Expand Down Expand Up @@ -326,6 +330,9 @@ def base(self) -> str:
def with_path(self, path) -> 'AzureAsyncFSURL':
return self.__class__(self._account, self._container, path, self._query)

def with_root_path(self) -> 'AzureAsyncFSURL':
return self.with_path('')

def __str__(self) -> str:
return self.base if not self._query else f'{self.base}?{self._query}'

Expand Down Expand Up @@ -513,9 +520,12 @@ def get_container_client(self, url: AzureAsyncFSURL) -> ContainerClient:

@handle_public_access_error
async def open(self, url: str) -> ReadableStream:
parsed_url = self.parse_url(url)
if parsed_url.path == '':
raise IsABucketError(url)
if not await self.exists(url):
raise FileNotFoundError
client = self.get_blob_client(self.parse_url(url))
client = self.get_blob_client(parsed_url)
return AzureReadableStream(client, url)

@handle_public_access_error
Expand All @@ -527,7 +537,10 @@ async def _open_from(self, url: str, start: int, *, length: Optional[int] = None
return AzureReadableStream(client, url, offset=start, length=length)

async def create(self, url: str, *, retry_writes: bool = True) -> AsyncContextManager[WritableStream]: # pylint: disable=unused-argument
return AzureCreateManager(self.get_blob_client(self.parse_url(url)))
parsed_url = self.parse_url(url)
if parsed_url.path == '':
raise IsABucketError(url)
return AzureCreateManager(self.get_blob_client(parsed_url))

async def multi_part_create(self, sema: asyncio.Semaphore, url: str, num_parts: int) -> MultiPartCreate:
client = self.get_blob_client(self.parse_url(url))
Expand All @@ -546,6 +559,8 @@ async def isfile(self, url: str) -> bool:
@handle_public_access_error
async def isdir(self, url: str) -> bool:
fs_url = self.parse_url(url)
if fs_url.path == '':
raise IsABucketError(url)
assert not fs_url.path or fs_url.path.endswith('/'), fs_url.path
client = self.get_container_client(fs_url)
async for _ in client.walk_blobs(name_starts_with=fs_url.path, include=['metadata'], delimiter='/'):
Expand All @@ -560,8 +575,10 @@ async def makedirs(self, url: str, exist_ok: bool = False) -> None:

@handle_public_access_error
async def statfile(self, url: str) -> FileStatus:
parsed_url = self.parse_url(url)
if parsed_url.path == '':
raise IsABucketError(url)
try:
parsed_url = self.parse_url(url)
blob_props = await self.get_blob_client(parsed_url).get_blob_properties()
return AzureFileStatus(blob_props, parsed_url)
except azure.core.exceptions.ResourceNotFoundError as e:
Expand Down Expand Up @@ -639,7 +656,10 @@ async def staturl(self, url: str) -> str:

async def remove(self, url: str) -> None:
try:
await self.get_blob_client(self.parse_url(url)).delete_blob()
parsed_url = self.parse_url(url)
if parsed_url.path == '':
raise IsABucketError(url)
await self.get_blob_client(parsed_url).delete_blob()
except azure.core.exceptions.ResourceNotFoundError as e:
raise FileNotFoundError(url) from e

Expand Down
17 changes: 17 additions & 0 deletions hail/python/hailtop/aiocloud/aiogoogle/client/storage_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
FileAndDirectoryError,
MultiPartCreate,
UnexpectedEOFError,
IsABucketError,
)
from hailtop.aiotools import FeedableAsyncIterable, WriteBuffer

Expand Down Expand Up @@ -578,6 +579,9 @@ def __init__(self, bucket: str, path: str):
self._bucket = bucket
self._path = path

def __repr__(self):
return f'GoogleStorageAsyncFSURL({self._bucket}, {self._path})'

@property
def bucket_parts(self) -> List[str]:
return [self._bucket]
Expand All @@ -597,6 +601,9 @@ def scheme(self) -> str:
def with_path(self, path) -> 'GoogleStorageAsyncFSURL':
return GoogleStorageAsyncFSURL(self._bucket, path)

def with_root_path(self) -> 'GoogleStorageAsyncFSURL':
return self.with_path('')

def __str__(self) -> str:
return f'gs://{self._bucket}/{self._path}'

Expand Down Expand Up @@ -674,6 +681,8 @@ def get_bucket_and_name(url: str) -> Tuple[str, str]:

async def open(self, url: str) -> GetObjectStream:
bucket, name = self.get_bucket_and_name(url)
if name == '':
raise IsABucketError(url)
return await self._storage_client.get_object(bucket, name)

async def _open_from(self, url: str, start: int, *, length: Optional[int] = None) -> GetObjectStream:
Expand All @@ -686,6 +695,8 @@ async def _open_from(self, url: str, start: int, *, length: Optional[int] = None

async def create(self, url: str, *, retry_writes: bool = True) -> WritableStream:
bucket, name = self.get_bucket_and_name(url)
if name == '':
raise IsABucketError(url)
params = {'uploadType': 'resumable' if retry_writes else 'media'}
return await self._storage_client.insert_object(bucket, name, params=params)

Expand All @@ -706,6 +717,8 @@ async def makedirs(self, url: str, exist_ok: bool = False) -> None:
async def statfile(self, url: str) -> GetObjectFileStatus:
try:
bucket, name = self.get_bucket_and_name(url)
if name == '':
raise IsABucketError(url)
return GetObjectFileStatus(await self._storage_client.get_object_metadata(bucket, name), url)
except aiohttp.ClientResponseError as e:
if e.status == 404:
Expand Down Expand Up @@ -798,6 +811,8 @@ async def isfile(self, url: str) -> bool:

async def isdir(self, url: str) -> bool:
bucket, name = self.get_bucket_and_name(url)
if name == '':
raise IsABucketError(url)
assert not name or name.endswith('/'), name
params = {'prefix': name, 'delimiter': '/', 'includeTrailingDelimiter': 'true', 'maxResults': 1}
async for page in await self._storage_client.list_objects(bucket, params=params):
Expand All @@ -808,6 +823,8 @@ async def isdir(self, url: str) -> bool:

async def remove(self, url: str) -> None:
bucket, name = self.get_bucket_and_name(url)
if name == '':
raise IsABucketError(url)
try:
await self._storage_client.delete_object(bucket, name)
except aiohttp.ClientResponseError as e:
Expand Down
2 changes: 2 additions & 0 deletions hail/python/hailtop/aiotools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
MultiPartCreate,
FileAndDirectoryError,
UnexpectedEOFError,
IsABucketError,
Copier,
ReadableStream,
WritableStream,
Expand Down Expand Up @@ -33,6 +34,7 @@
'FileAndDirectoryError',
'MultiPartCreate',
'UnexpectedEOFError',
'IsABucketError',
'WeightedSemaphore',
'WriteBuffer',
'Copier',
Expand Down
3 changes: 2 additions & 1 deletion hail/python/hailtop/aiotools/fs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from .fs import AsyncFS, AsyncFSURL, AsyncFSFactory, MultiPartCreate, FileListEntry, FileStatus
from .copier import Copier, CopyReport, SourceCopier, SourceReport, Transfer, TransferReport
from .exceptions import UnexpectedEOFError, FileAndDirectoryError
from .exceptions import UnexpectedEOFError, FileAndDirectoryError, IsABucketError
from .stream import (
ReadableStream,
EmptyReadableStream,
Expand Down Expand Up @@ -29,4 +29,5 @@
'FileStatus',
'FileAndDirectoryError',
'UnexpectedEOFError',
'IsABucketError',
]
4 changes: 4 additions & 0 deletions hail/python/hailtop/aiotools/fs/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@ class UnexpectedEOFError(Exception):

class FileAndDirectoryError(Exception):
pass


class IsABucketError(FileNotFoundError):
pass
Loading
Loading