From 27ac90fe1b6f323547fad9fbc465cfa9b42fd207 Mon Sep 17 00:00:00 2001 From: Weisu Yin Date: Thu, 18 Nov 2021 00:19:20 +0000 Subject: [PATCH 1/5] s3 functionality --- amlb/datasets/fileutils.py | 62 ++++++++++++++++++++++++++++++-------- 1 file changed, 50 insertions(+), 12 deletions(-) diff --git a/amlb/datasets/fileutils.py b/amlb/datasets/fileutils.py index bce8cbc4f..0a9d4a1cd 100644 --- a/amlb/datasets/fileutils.py +++ b/amlb/datasets/fileutils.py @@ -11,7 +11,22 @@ log = logging.getLogger(__name__) -SUPPORTED_SCHEMES = ("http", "https") +SUPPORTED_SCHEMES = ("http", "https", "s3") + + +def s3_path_to_bucket_prefix(s3_path): + s3_path_cleaned = s3_path.split('://', 1)[1] + bucket, prefix = s3_path_cleaned.split('/', 1) + + return bucket, prefix + + +def is_s3_url(path): + if type(path) != str: + return False + if (path[:2] == 's3') and ('://' in path[:6]): + return True + return False def is_valid_url(url): @@ -21,21 +36,45 @@ def is_valid_url(url): def url_exists(url): if not is_valid_url(url): return False - head_req = Request(url, method='HEAD') - try: - with urlopen(head_req) as test: - return test.status == 200 - except URLError as e: - log.error(f"Cannot access url %s: %s", url, e) - return False + if not is_s3_url(url): + head_req = Request(url, method='HEAD') + try: + with urlopen(head_req) as test: + return test.status == 200 + except URLError as e: + log.error(f"Cannot access url %s: %s", url, e) + return False + else: + import boto3 + from botocore.errorfactory import ClientError + s3 = boto3.client('s3') + bucket, key = s3_path_to_bucket_prefix(url) + try: + s3.head_object(Bucket=bucket, Key=key) + return True + except ClientError as e: + log.error(f"Cannot access url %s: %s", url, e) + return False def download_file(url, dest_path): touch(dest_path) # urlretrieve(url, filename=dest_path) - with urlopen(url) as resp, open(dest_path, 'wb') as dest: - shutil.copyfileobj(resp, dest) - + if not is_s3_url(url): + with urlopen(url) as resp, open(dest_path, 'wb') as dest: + shutil.copyfileobj(resp, dest) + else: + import boto3 + from botocore.errorfactory import ClientError + s3 = boto3.resource('s3') + bucket, key = s3_path_to_bucket_prefix(url) + try: + s3.Bucket(bucket).download_file(key, dest_path) + except ClientError as e: + if e.response['Error']['Code'] == "404": + log.error("The object does not exist.") + else: + raise def is_archive(path): return zipfile.is_zipfile(path) or tarfile.is_tarfile(path) @@ -52,4 +91,3 @@ def unarchive_file(path, dest_folder=None): with tarfile.open(path) as tf: tf.extractall(path=dest_folder) return dest - From af4f6aea02e3147d82dbd5002d9ffd74272df89f Mon Sep 17 00:00:00 2001 From: Weisu Yin Date: Thu, 18 Nov 2021 10:27:26 -0800 Subject: [PATCH 2/5] Update amlb/datasets/fileutils.py Co-authored-by: Pieter Gijsbers --- amlb/datasets/fileutils.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/amlb/datasets/fileutils.py b/amlb/datasets/fileutils.py index 0a9d4a1cd..11a74c70e 100644 --- a/amlb/datasets/fileutils.py +++ b/amlb/datasets/fileutils.py @@ -22,11 +22,7 @@ def s3_path_to_bucket_prefix(s3_path): def is_s3_url(path): - if type(path) != str: - return False - if (path[:2] == 's3') and ('://' in path[:6]): - return True - return False + return isinstance(path, str) and path.lower().startswith("s3://") def is_valid_url(url): From b3b517ca8ffaead838ddf8029f03451bb720d973 Mon Sep 17 00:00:00 2001 From: Weisu Yin Date: Thu, 18 Nov 2021 22:01:34 +0000 Subject: [PATCH 3/5] OOD --- amlb/datasets/file.py | 7 ++-- amlb/datasets/fileutils.py | 76 ++++++++++++++++++++++---------------- 2 files changed, 48 insertions(+), 35 deletions(-) diff --git a/amlb/datasets/file.py b/amlb/datasets/file.py index bcb941ab2..8e696f135 100644 --- a/amlb/datasets/file.py +++ b/amlb/datasets/file.py @@ -15,7 +15,7 @@ from ..resources import config as rconfig from ..utils import Namespace as ns, as_list, lazy_property, list_all_files, memoize, path_from_split, profile, split_path -from .fileutils import download_file, is_archive, is_valid_url, unarchive_file, url_exists +from .fileutils import is_archive, is_valid_url, unarchive_file, get_file_handler log = logging.getLogger(__name__) @@ -118,8 +118,9 @@ def _extract_train_test_paths(self, dataset, fold=None): elif is_valid_url(dataset): cached_file = os.path.join(self._cache_dir, os.path.basename(dataset)) if not os.path.exists(cached_file): # don't download if previously done - assert url_exists(dataset), f"Invalid path/url: {dataset}" - download_file(dataset, cached_file) + handler = get_file_handler(dataset) + assert handler.exists(dataset), f"Invalid path/url: {dataset}" + handler.download(dataset, dest_path=cached_file) return self._extract_train_test_paths(cached_file) else: raise ValueError(f"Invalid dataset description: {dataset}") diff --git a/amlb/datasets/fileutils.py b/amlb/datasets/fileutils.py index 11a74c70e..5f314d9d9 100644 --- a/amlb/datasets/fileutils.py +++ b/amlb/datasets/fileutils.py @@ -4,35 +4,21 @@ import tarfile from urllib.error import URLError from urllib.parse import urlparse -from urllib.request import Request, urlopen, urlretrieve +from urllib.request import Request, urlopen import zipfile from ..utils import touch log = logging.getLogger(__name__) -SUPPORTED_SCHEMES = ("http", "https", "s3") +class FileHandler: + def exists(self, url): pass + def download(self, url, dest_path): pass -def s3_path_to_bucket_prefix(s3_path): - s3_path_cleaned = s3_path.split('://', 1)[1] - bucket, prefix = s3_path_cleaned.split('/', 1) - return bucket, prefix - - -def is_s3_url(path): - return isinstance(path, str) and path.lower().startswith("s3://") - - -def is_valid_url(url): - return urlparse(url).scheme in SUPPORTED_SCHEMES - - -def url_exists(url): - if not is_valid_url(url): - return False - if not is_s3_url(url): +class HttpHandler(FileHandler): + def exists(self, url): head_req = Request(url, method='HEAD') try: with urlopen(head_req) as test: @@ -40,30 +26,32 @@ def url_exists(url): except URLError as e: log.error(f"Cannot access url %s: %s", url, e) return False - else: + + def download(self, url, dest_path): + touch(dest_path) + with urlopen(url) as resp, open(dest_path, 'wb') as dest: + shutil.copyfileobj(resp, dest) + + +class S3Handler(FileHandler): + def exists(self, url): import boto3 from botocore.errorfactory import ClientError s3 = boto3.client('s3') - bucket, key = s3_path_to_bucket_prefix(url) + bucket, key = self._s3_path_to_bucket_prefix(url) try: s3.head_object(Bucket=bucket, Key=key) return True except ClientError as e: log.error(f"Cannot access url %s: %s", url, e) return False - - -def download_file(url, dest_path): - touch(dest_path) - # urlretrieve(url, filename=dest_path) - if not is_s3_url(url): - with urlopen(url) as resp, open(dest_path, 'wb') as dest: - shutil.copyfileobj(resp, dest) - else: + + def download(self, url, dest_path): import boto3 from botocore.errorfactory import ClientError + touch(dest_path) s3 = boto3.resource('s3') - bucket, key = s3_path_to_bucket_prefix(url) + bucket, key = self._s3_path_to_bucket_prefix(url) try: s3.Bucket(bucket).download_file(key, dest_path) except ClientError as e: @@ -71,6 +59,30 @@ def download_file(url, dest_path): log.error("The object does not exist.") else: raise + + def _s3_path_to_bucket_prefix(self, s3_path): + s3_path_cleaned = s3_path.split('://', 1)[1] + bucket, prefix = s3_path_cleaned.split('/', 1) + return bucket, prefix + + +scheme_handlers = dict( + http=HttpHandler(), + https=HttpHandler(), + s3=S3Handler(), + s3a=S3Handler +) + +SUPPORTED_SCHEMES = list(scheme_handlers.keys()) + + +def get_file_handler(url): + return scheme_handlers[urlparse(url).scheme] + + +def is_valid_url(url): + return urlparse(url).scheme in SUPPORTED_SCHEMES + def is_archive(path): return zipfile.is_zipfile(path) or tarfile.is_tarfile(path) From 50918baba1849b1c18682cd36a4dce3eb0651839 Mon Sep 17 00:00:00 2001 From: Weisu Date: Tue, 7 Dec 2021 10:45:20 -0800 Subject: [PATCH 4/5] add s3n --- amlb/datasets/fileutils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/amlb/datasets/fileutils.py b/amlb/datasets/fileutils.py index 5f314d9d9..72f0b6c19 100644 --- a/amlb/datasets/fileutils.py +++ b/amlb/datasets/fileutils.py @@ -70,7 +70,8 @@ def _s3_path_to_bucket_prefix(self, s3_path): http=HttpHandler(), https=HttpHandler(), s3=S3Handler(), - s3a=S3Handler + s3a=S3Handler(), + s3n=S3Handler(), ) SUPPORTED_SCHEMES = list(scheme_handlers.keys()) From 8cee27c37eec97453afa77c506855c8b5023e83c Mon Sep 17 00:00:00 2001 From: Weisu Date: Tue, 7 Dec 2021 10:47:50 -0800 Subject: [PATCH 5/5] move boto3 import --- amlb/datasets/fileutils.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/amlb/datasets/fileutils.py b/amlb/datasets/fileutils.py index 72f0b6c19..9be9a84e3 100644 --- a/amlb/datasets/fileutils.py +++ b/amlb/datasets/fileutils.py @@ -2,6 +2,8 @@ import os import shutil import tarfile +import boto3 +from botocore.errorfactory import ClientError from urllib.error import URLError from urllib.parse import urlparse from urllib.request import Request, urlopen @@ -35,8 +37,6 @@ def download(self, url, dest_path): class S3Handler(FileHandler): def exists(self, url): - import boto3 - from botocore.errorfactory import ClientError s3 = boto3.client('s3') bucket, key = self._s3_path_to_bucket_prefix(url) try: @@ -47,8 +47,6 @@ def exists(self, url): return False def download(self, url, dest_path): - import boto3 - from botocore.errorfactory import ClientError touch(dest_path) s3 = boto3.resource('s3') bucket, key = self._s3_path_to_bucket_prefix(url)