From d68f662ec6a997b74ea4d241708728da03a62c95 Mon Sep 17 00:00:00 2001 From: kyleknap Date: Wed, 13 Jan 2016 09:36:56 -0800 Subject: [PATCH 1/8] Add the setup files --- setup.cfg | 6 ++++++ setup.py | 54 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+) create mode 100644 setup.cfg create mode 100644 setup.py diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 00000000..ff997167 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,6 @@ +[wheel] +universal = 1 + +[metadata] +requires-dist = + futures>=2.2.0,<4.0.0; python_version=="2.6" or python_version=="2.7" diff --git a/setup.py b/setup.py new file mode 100644 index 00000000..35f44a21 --- /dev/null +++ b/setup.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python +import os +import re +import sys + +from setuptools import setup, find_packages + + +ROOT = os.path.dirname(__file__) +VERSION_RE = re.compile(r'''__version__ = ['"]([0-9.]+)['"]''') + + +requires = [] + + +if sys.version_info[0] == 2: + # concurrent.futures is only in python3, so for + # python2 we need to install the backport. + requires.append('futures>=2.2.0,<4.0.0') + + +def get_version(): + init = open(os.path.join(ROOT, 's3transfer', '__init__.py')).read() + return VERSION_RE.search(init).group(1) + + +setup( + name='s3transfer', + version=get_version(), + description='An Amazon S3 Transfer Manager', + long_description=open('README.rst').read(), + author='Amazon Web Services', + url='https://github.com/boto/s3transfer', + packages=find_packages(exclude=['tests*']), + install_requires=requires, + extras_require={ + ':python_version=="2.6" or python_version=="2.7"': [ + 'futures>=2.2.0,<4.0.0'] + }, + license="Apache License 2.0", + classifiers=( + 'Development Status :: 1 - Planning' + 'Intended Audience :: Developers', + 'Natural Language :: English', + 'License :: OSI Approved :: Apache Software License', + 'Programming Language :: Python', + 'Programming Language :: Python :: 2.6', + 'Programming Language :: Python :: 2.7', + 'Programming Language :: Python :: 3', + 'Programming Language :: Python :: 3.3', + 'Programming Language :: Python :: 3.4', + 'Programming Language :: Python :: 3.5', + ), +) From c44e98136a59aefdd0d99ffed8f7c110d773fab5 Mon Sep 17 00:00:00 2001 From: kyleknap Date: Wed, 13 Jan 2016 09:46:27 -0800 Subject: [PATCH 2/8] Port boto3 code to legacy module --- s3transfer/__init__.py | 14 + s3transfer/legacy.py | 728 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 742 insertions(+) create mode 100644 s3transfer/__init__.py create mode 100644 s3transfer/legacy.py diff --git a/s3transfer/__init__.py b/s3transfer/__init__.py new file mode 100644 index 00000000..0ac1cb38 --- /dev/null +++ b/s3transfer/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +__author__ = 'Amazon Web Services' +__version__ = '0.0.0' diff --git a/s3transfer/legacy.py b/s3transfer/legacy.py new file mode 100644 index 00000000..2fd26e13 --- /dev/null +++ b/s3transfer/legacy.py @@ -0,0 +1,728 @@ +# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +"""Abstractions over S3's upload/download operations. + +This module provides high level abstractions for efficient +uploads/downloads. It handles several things for the user: + +* Automatically switching to multipart transfers when + a file is over a specific size threshold +* Uploading/downloading a file in parallel +* Throttling based on max bandwidth +* Progress callbacks to monitor transfers +* Retries. While botocore handles retries for streaming uploads, + it is not possible for it to handle retries for streaming + downloads. This module handles retries for both cases so + you don't need to implement any retry logic yourself. + +This module has a reasonable set of defaults. It also allows you +to configure many aspects of the transfer process including: + +* Multipart threshold size +* Max parallel downloads +* Max bandwidth +* Socket timeouts +* Retry amounts + +There is no support for s3->s3 multipart copies at this +time. + + +.. _ref_s3transfer_usage: + +Usage +===== + +The simplest way to use this module is: + +.. code-block:: python + + client = boto3.client('s3', 'us-west-2') + transfer = S3Transfer(client) + # Upload /tmp/myfile to s3://bucket/key + transfer.upload_file('/tmp/myfile', 'bucket', 'key') + + # Download s3://bucket/key to /tmp/myfile + transfer.download_file('bucket', 'key', '/tmp/myfile') + +The ``upload_file`` and ``download_file`` methods also accept +``**kwargs``, which will be forwarded through to the corresponding +client operation. Here are a few examples using ``upload_file``:: + + # Making the object public + transfer.upload_file('/tmp/myfile', 'bucket', 'key', + extra_args={'ACL': 'public-read'}) + + # Setting metadata + transfer.upload_file('/tmp/myfile', 'bucket', 'key', + extra_args={'Metadata': {'a': 'b', 'c': 'd'}}) + + # Setting content type + transfer.upload_file('/tmp/myfile.json', 'bucket', 'key', + extra_args={'ContentType': "application/json"}) + + +The ``S3Transfer`` clas also supports progress callbacks so you can +provide transfer progress to users. Both the ``upload_file`` and +``download_file`` methods take an optional ``callback`` parameter. +Here's an example of how to print a simple progress percentage +to the user: + +.. code-block:: python + + class ProgressPercentage(object): + def __init__(self, filename): + self._filename = filename + self._size = float(os.path.getsize(filename)) + self._seen_so_far = 0 + self._lock = threading.Lock() + + def __call__(self, bytes_amount): + # To simplify we'll assume this is hooked up + # to a single filename. + with self._lock: + self._seen_so_far += bytes_amount + percentage = (self._seen_so_far / self._size) * 100 + sys.stdout.write( + "\r%s %s / %s (%.2f%%)" % (self._filename, self._seen_so_far, + self._size, percentage)) + sys.stdout.flush() + + + transfer = S3Transfer(boto3.client('s3', 'us-west-2')) + # Upload /tmp/myfile to s3://bucket/key and print upload progress. + transfer.upload_file('/tmp/myfile', 'bucket', 'key', + callback=ProgressPercentage('/tmp/myfile')) + + + +You can also provide a TransferConfig object to the S3Transfer +object that gives you more fine grained control over the +transfer. For example: + +.. code-block:: python + + client = boto3.client('s3', 'us-west-2') + config = TransferConfig( + multipart_threshold=8 * 1024 * 1024, + max_concurrency=10, + num_download_attempts=10, + ) + transfer = S3Transfer(client, config) + transfer.upload_file('/tmp/foo', 'bucket', 'key') + + +""" +import os +import math +import functools +import logging +import socket +import threading +import random +import string +import boto3 +from concurrent import futures + +from botocore.compat import six +from botocore.vendored.requests.packages.urllib3.exceptions import \ + ReadTimeoutError +from botocore.exceptions import IncompleteReadError + +import boto3.compat +from boto3.exceptions import RetriesExceededError, S3UploadFailedError + + +logger = logging.getLogger(__name__) +queue = six.moves.queue + +MB = 1024 * 1024 +SHUTDOWN_SENTINEL = object() + + +def random_file_extension(num_digits=8): + return ''.join(random.choice(string.hexdigits) for _ in range(num_digits)) + + +def disable_upload_callbacks(request, operation_name, **kwargs): + if operation_name in ['PutObject', 'UploadPart'] and \ + hasattr(request.body, 'disable_callback'): + request.body.disable_callback() + + +def enable_upload_callbacks(request, operation_name, **kwargs): + if operation_name in ['PutObject', 'UploadPart'] and \ + hasattr(request.body, 'enable_callback'): + request.body.enable_callback() + + +class QueueShutdownError(Exception): + pass + + +class ReadFileChunk(object): + def __init__(self, fileobj, start_byte, chunk_size, full_file_size, + callback=None, enable_callback=True): + """ + + Given a file object shown below: + + |___________________________________________________| + 0 | | full_file_size + |----chunk_size---| + start_byte + + :type fileobj: file + :param fileobj: File like object + + :type start_byte: int + :param start_byte: The first byte from which to start reading. + + :type chunk_size: int + :param chunk_size: The max chunk size to read. Trying to read + pass the end of the chunk size will behave like you've + reached the end of the file. + + :type full_file_size: int + :param full_file_size: The entire content length associated + with ``fileobj``. + + :type callback: function(amount_read) + :param callback: Called whenever data is read from this object. + + """ + self._fileobj = fileobj + self._start_byte = start_byte + self._size = self._calculate_file_size( + self._fileobj, requested_size=chunk_size, + start_byte=start_byte, actual_file_size=full_file_size) + self._fileobj.seek(self._start_byte) + self._amount_read = 0 + self._callback = callback + self._callback_enabled = enable_callback + + @classmethod + def from_filename(cls, filename, start_byte, chunk_size, callback=None, + enable_callback=True): + """Convenience factory function to create from a filename. + + :type start_byte: int + :param start_byte: The first byte from which to start reading. + + :type chunk_size: int + :param chunk_size: The max chunk size to read. Trying to read + pass the end of the chunk size will behave like you've + reached the end of the file. + + :type full_file_size: int + :param full_file_size: The entire content length associated + with ``fileobj``. + + :type callback: function(amount_read) + :param callback: Called whenever data is read from this object. + + :type enable_callback: bool + :param enable_callback: Indicate whether to invoke callback + during read() calls. + + :rtype: ``ReadFileChunk`` + :return: A new instance of ``ReadFileChunk`` + + """ + f = open(filename, 'rb') + file_size = os.fstat(f.fileno()).st_size + return cls(f, start_byte, chunk_size, file_size, callback, + enable_callback) + + def _calculate_file_size(self, fileobj, requested_size, start_byte, + actual_file_size): + max_chunk_size = actual_file_size - start_byte + return min(max_chunk_size, requested_size) + + def read(self, amount=None): + if amount is None: + amount_to_read = self._size - self._amount_read + else: + amount_to_read = min(self._size - self._amount_read, amount) + data = self._fileobj.read(amount_to_read) + self._amount_read += len(data) + if self._callback is not None and self._callback_enabled: + self._callback(len(data)) + return data + + def enable_callback(self): + self._callback_enabled = True + + def disable_callback(self): + self._callback_enabled = False + + def seek(self, where): + self._fileobj.seek(self._start_byte + where) + if self._callback is not None and self._callback_enabled: + # To also rewind the callback() for an accurate progress report + self._callback(where - self._amount_read) + self._amount_read = where + + def close(self): + self._fileobj.close() + + def tell(self): + return self._amount_read + + def __len__(self): + # __len__ is defined because requests will try to determine the length + # of the stream to set a content length. In the normal case + # of the file it will just stat the file, but we need to change that + # behavior. By providing a __len__, requests will use that instead + # of stat'ing the file. + return self._size + + def __enter__(self): + return self + + def __exit__(self, *args, **kwargs): + self.close() + + def __iter__(self): + # This is a workaround for http://bugs.python.org/issue17575 + # Basically httplib will try to iterate over the contents, even + # if its a file like object. This wasn't noticed because we've + # already exhausted the stream so iterating over the file immediately + # stops, which is what we're simulating here. + return iter([]) + + +class StreamReaderProgress(object): + """Wrapper for a read only stream that adds progress callbacks.""" + def __init__(self, stream, callback=None): + self._stream = stream + self._callback = callback + + def read(self, *args, **kwargs): + value = self._stream.read(*args, **kwargs) + if self._callback is not None: + self._callback(len(value)) + return value + + +class OSUtils(object): + def get_file_size(self, filename): + return os.path.getsize(filename) + + def open_file_chunk_reader(self, filename, start_byte, size, callback): + return ReadFileChunk.from_filename(filename, start_byte, + size, callback, + enable_callback=False) + + def open(self, filename, mode): + return open(filename, mode) + + def remove_file(self, filename): + """Remove a file, noop if file does not exist.""" + # Unlike os.remove, if the file does not exist, + # then this method does nothing. + try: + os.remove(filename) + except OSError: + pass + + def rename_file(self, current_filename, new_filename): + boto3.compat.rename_file(current_filename, new_filename) + + +class MultipartUploader(object): + # These are the extra_args that need to be forwarded onto + # subsequent upload_parts. + UPLOAD_PART_ARGS = [ + 'SSECustomerKey', + 'SSECustomerAlgorithm', + 'SSECustomerKeyMD5', + 'RequestPayer', + ] + + def __init__(self, client, config, osutil, + executor_cls=futures.ThreadPoolExecutor): + self._client = client + self._config = config + self._os = osutil + self._executor_cls = executor_cls + + def _extra_upload_part_args(self, extra_args): + # Only the args in UPLOAD_PART_ARGS actually need to be passed + # onto the upload_part calls. + upload_parts_args = {} + for key, value in extra_args.items(): + if key in self.UPLOAD_PART_ARGS: + upload_parts_args[key] = value + return upload_parts_args + + def upload_file(self, filename, bucket, key, callback, extra_args): + response = self._client.create_multipart_upload(Bucket=bucket, + Key=key, **extra_args) + upload_id = response['UploadId'] + try: + parts = self._upload_parts(upload_id, filename, bucket, key, + callback, extra_args) + except Exception as e: + logger.debug("Exception raised while uploading parts, " + "aborting multipart upload.", exc_info=True) + self._client.abort_multipart_upload( + Bucket=bucket, Key=key, UploadId=upload_id) + raise S3UploadFailedError( + "Failed to upload %s to %s: %s" % ( + filename, '/'.join([bucket, key]), e)) + self._client.complete_multipart_upload( + Bucket=bucket, Key=key, UploadId=upload_id, + MultipartUpload={'Parts': parts}) + + def _upload_parts(self, upload_id, filename, bucket, key, callback, + extra_args): + upload_parts_extra_args = self._extra_upload_part_args(extra_args) + parts = [] + part_size = self._config.multipart_chunksize + num_parts = int( + math.ceil(self._os.get_file_size(filename) / float(part_size))) + max_workers = self._config.max_concurrency + with self._executor_cls(max_workers=max_workers) as executor: + upload_partial = functools.partial( + self._upload_one_part, filename, bucket, key, upload_id, + part_size, upload_parts_extra_args, callback) + for part in executor.map(upload_partial, range(1, num_parts + 1)): + parts.append(part) + return parts + + def _upload_one_part(self, filename, bucket, key, + upload_id, part_size, extra_args, + callback, part_number): + open_chunk_reader = self._os.open_file_chunk_reader + with open_chunk_reader(filename, part_size * (part_number - 1), + part_size, callback) as body: + response = self._client.upload_part( + Bucket=bucket, Key=key, + UploadId=upload_id, PartNumber=part_number, Body=body, + **extra_args) + etag = response['ETag'] + return {'ETag': etag, 'PartNumber': part_number} + + +class ShutdownQueue(queue.Queue): + """A queue implementation that can be shutdown. + + Shutting down a queue means that this class adds a + trigger_shutdown method that will trigger all subsequent + calls to put() to fail with a ``QueueShutdownError``. + + It purposefully deviates from queue.Queue, and is *not* meant + to be a drop in replacement for ``queue.Queue``. + + """ + def _init(self, maxsize): + self._shutdown = False + self._shutdown_lock = threading.Lock() + # queue.Queue is an old style class so we don't use super(). + return queue.Queue._init(self, maxsize) + + def trigger_shutdown(self): + with self._shutdown_lock: + self._shutdown = True + logger.debug("The IO queue is now shutdown.") + + def put(self, item): + # Note: this is not sufficient, it's still possible to deadlock! + # Need to hook into the condition vars used by this class. + with self._shutdown_lock: + if self._shutdown: + raise QueueShutdownError("Cannot put item to queue when " + "queue has been shutdown.") + return queue.Queue.put(self, item) + + +class MultipartDownloader(object): + def __init__(self, client, config, osutil, + executor_cls=futures.ThreadPoolExecutor): + self._client = client + self._config = config + self._os = osutil + self._executor_cls = executor_cls + self._ioqueue = ShutdownQueue(self._config.max_io_queue) + + def download_file(self, bucket, key, filename, object_size, + extra_args, callback=None): + with self._executor_cls(max_workers=2) as controller: + # 1 thread for the future that manages the uploading of files + # 1 thread for the future that manages IO writes. + download_parts_handler = functools.partial( + self._download_file_as_future, + bucket, key, filename, object_size, callback) + parts_future = controller.submit(download_parts_handler) + + io_writes_handler = functools.partial( + self._perform_io_writes, filename) + io_future = controller.submit(io_writes_handler) + results = futures.wait([parts_future, io_future], + return_when=futures.FIRST_EXCEPTION) + self._process_future_results(results) + + def _process_future_results(self, futures): + finished, unfinished = futures + for future in finished: + future.result() + + def _download_file_as_future(self, bucket, key, filename, object_size, + callback): + part_size = self._config.multipart_chunksize + num_parts = int(math.ceil(object_size / float(part_size))) + max_workers = self._config.max_concurrency + download_partial = functools.partial( + self._download_range, bucket, key, filename, + part_size, num_parts, callback) + try: + with self._executor_cls(max_workers=max_workers) as executor: + list(executor.map(download_partial, range(num_parts))) + finally: + self._ioqueue.put(SHUTDOWN_SENTINEL) + + def _calculate_range_param(self, part_size, part_index, num_parts): + start_range = part_index * part_size + if part_index == num_parts - 1: + end_range = '' + else: + end_range = start_range + part_size - 1 + range_param = 'bytes=%s-%s' % (start_range, end_range) + return range_param + + def _download_range(self, bucket, key, filename, + part_size, num_parts, callback, part_index): + try: + range_param = self._calculate_range_param( + part_size, part_index, num_parts) + + max_attempts = self._config.num_download_attempts + last_exception = None + for i in range(max_attempts): + try: + logger.debug("Making get_object call.") + response = self._client.get_object( + Bucket=bucket, Key=key, Range=range_param) + streaming_body = StreamReaderProgress( + response['Body'], callback) + buffer_size = 1024 * 16 + current_index = part_size * part_index + for chunk in iter(lambda: streaming_body.read(buffer_size), + b''): + self._ioqueue.put((current_index, chunk)) + current_index += len(chunk) + return + except (socket.timeout, socket.error, + ReadTimeoutError, IncompleteReadError) as e: + logger.debug("Retrying exception caught (%s), " + "retrying request, (attempt %s / %s)", e, i, + max_attempts, exc_info=True) + last_exception = e + continue + raise RetriesExceededError(last_exception) + finally: + logger.debug("EXITING _download_range for part: %s", part_index) + + def _perform_io_writes(self, filename): + with self._os.open(filename, 'wb') as f: + while True: + task = self._ioqueue.get() + if task is SHUTDOWN_SENTINEL: + logger.debug("Shutdown sentinel received in IO handler, " + "shutting down IO handler.") + return + else: + try: + offset, data = task + f.seek(offset) + f.write(data) + except Exception as e: + logger.debug("Caught exception in IO thread: %s", + e, exc_info=True) + self._ioqueue.trigger_shutdown() + raise + + +class TransferConfig(object): + def __init__(self, + multipart_threshold=8 * MB, + max_concurrency=10, + multipart_chunksize=8 * MB, + num_download_attempts=5, + max_io_queue=100): + self.multipart_threshold = multipart_threshold + self.max_concurrency = max_concurrency + self.multipart_chunksize = multipart_chunksize + self.num_download_attempts = num_download_attempts + self.max_io_queue = max_io_queue + + +class S3Transfer(object): + + ALLOWED_DOWNLOAD_ARGS = [ + 'VersionId', + 'SSECustomerAlgorithm', + 'SSECustomerKey', + 'SSECustomerKeyMD5', + 'RequestPayer', + ] + + ALLOWED_UPLOAD_ARGS = [ + 'ACL', + 'CacheControl', + 'ContentDisposition', + 'ContentEncoding', + 'ContentLanguage', + 'ContentType', + 'Expires', + 'GrantFullControl', + 'GrantRead', + 'GrantReadACP', + 'GrantWriteACL', + 'Metadata', + 'RequestPayer', + 'ServerSideEncryption', + 'StorageClass', + 'SSECustomerAlgorithm', + 'SSECustomerKey', + 'SSECustomerKeyMD5', + 'SSEKMSKeyId', + ] + + def __init__(self, client, config=None, osutil=None): + self._client = client + if config is None: + config = TransferConfig() + self._config = config + if osutil is None: + osutil = OSUtils() + self._osutil = osutil + + def upload_file(self, filename, bucket, key, + callback=None, extra_args=None): + """Upload a file to an S3 object. + + Variants have also been injected into S3 client, Bucket and Object. + You don't have to use S3Transfer.upload_file() directly. + """ + if extra_args is None: + extra_args = {} + self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS) + events = self._client.meta.events + events.register_first('request-created.s3', + disable_upload_callbacks, + unique_id='s3upload-callback-disable') + events.register_last('request-created.s3', + enable_upload_callbacks, + unique_id='s3upload-callback-enable') + if self._osutil.get_file_size(filename) >= \ + self._config.multipart_threshold: + self._multipart_upload(filename, bucket, key, callback, extra_args) + else: + self._put_object(filename, bucket, key, callback, extra_args) + + def _put_object(self, filename, bucket, key, callback, extra_args): + # We're using open_file_chunk_reader so we can take advantage of the + # progress callback functionality. + open_chunk_reader = self._osutil.open_file_chunk_reader + with open_chunk_reader(filename, 0, + self._osutil.get_file_size(filename), + callback=callback) as body: + self._client.put_object(Bucket=bucket, Key=key, Body=body, + **extra_args) + + def download_file(self, bucket, key, filename, extra_args=None, + callback=None): + """Download an S3 object to a file. + + Variants have also been injected into S3 client, Bucket and Object. + You don't have to use S3Transfer.download_file() directly. + """ + # This method will issue a ``head_object`` request to determine + # the size of the S3 object. This is used to determine if the + # object is downloaded in parallel. + if extra_args is None: + extra_args = {} + self._validate_all_known_args(extra_args, self.ALLOWED_DOWNLOAD_ARGS) + object_size = self._object_size(bucket, key, extra_args) + temp_filename = filename + os.extsep + random_file_extension() + try: + self._download_file(bucket, key, temp_filename, object_size, + extra_args, callback) + except Exception: + logger.debug("Exception caught in download_file, removing partial " + "file: %s", temp_filename, exc_info=True) + self._osutil.remove_file(temp_filename) + raise + else: + self._osutil.rename_file(temp_filename, filename) + + def _download_file(self, bucket, key, filename, object_size, + extra_args, callback): + if object_size >= self._config.multipart_threshold: + self._ranged_download(bucket, key, filename, object_size, + extra_args, callback) + else: + self._get_object(bucket, key, filename, extra_args, callback) + + def _validate_all_known_args(self, actual, allowed): + for kwarg in actual: + if kwarg not in allowed: + raise ValueError( + "Invalid extra_args key '%s', " + "must be one of: %s" % ( + kwarg, ', '.join(allowed))) + + def _ranged_download(self, bucket, key, filename, object_size, + extra_args, callback): + downloader = MultipartDownloader(self._client, self._config, + self._osutil) + downloader.download_file(bucket, key, filename, object_size, + extra_args, callback) + + def _get_object(self, bucket, key, filename, extra_args, callback): + # precondition: num_download_attempts > 0 + max_attempts = self._config.num_download_attempts + last_exception = None + for i in range(max_attempts): + try: + return self._do_get_object(bucket, key, filename, + extra_args, callback) + except (socket.timeout, socket.error, + ReadTimeoutError, IncompleteReadError) as e: + # TODO: we need a way to reset the callback if the + # download failed. + logger.debug("Retrying exception caught (%s), " + "retrying request, (attempt %s / %s)", e, i, + max_attempts, exc_info=True) + last_exception = e + continue + raise RetriesExceededError(last_exception) + + def _do_get_object(self, bucket, key, filename, extra_args, callback): + response = self._client.get_object(Bucket=bucket, Key=key, + **extra_args) + streaming_body = StreamReaderProgress( + response['Body'], callback) + with self._osutil.open(filename, 'wb') as f: + for chunk in iter(lambda: streaming_body.read(8192), b''): + f.write(chunk) + + def _object_size(self, bucket, key, extra_args): + return self._client.head_object( + Bucket=bucket, Key=key, **extra_args)['ContentLength'] + + def _multipart_upload(self, filename, bucket, key, callback, extra_args): + uploader = MultipartUploader(self._client, self._config, self._osutil) + uploader.upload_file(filename, bucket, key, callback, extra_args) From db4116980e50584c9d2e3d3f086b11f8822d8d40 Mon Sep 17 00:00:00 2001 From: kyleknap Date: Wed, 13 Jan 2016 10:14:23 -0800 Subject: [PATCH 3/8] Add unit test and remove boto3 references --- requirements-text.txt | 7 + s3transfer/compat.py | 31 ++ s3transfer/exceptions.py | 20 ++ s3transfer/legacy.py | 7 +- setup.cfg | 1 + setup.py | 4 +- tests/__init__.py | 16 + tests/unit/__init__.py | 12 + tests/unit/test_legacy.py | 731 ++++++++++++++++++++++++++++++++++++++ 9 files changed, 824 insertions(+), 5 deletions(-) create mode 100644 requirements-text.txt create mode 100644 s3transfer/compat.py create mode 100644 s3transfer/exceptions.py create mode 100644 tests/__init__.py create mode 100644 tests/unit/__init__.py create mode 100644 tests/unit/test_legacy.py diff --git a/requirements-text.txt b/requirements-text.txt new file mode 100644 index 00000000..61988542 --- /dev/null +++ b/requirements-text.txt @@ -0,0 +1,7 @@ +-e git://github.com/boto/botocore.git@develop#egg=botocore +nose==1.3.3 +mock==1.3.0 +wheel==0.24.0 +# Note you need at least pip --version of 6.0 or +# higher to be able to pick on these version specifiers. +unittest2==0.5.1; python_version == '2.6' diff --git a/s3transfer/compat.py b/s3transfer/compat.py new file mode 100644 index 00000000..afe3400f --- /dev/null +++ b/s3transfer/compat.py @@ -0,0 +1,31 @@ +# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +import sys +import os +import errno + + +if sys.platform.startswith('win'): + def rename_file(current_filename, new_filename): + try: + os.remove(new_filename) + except OSError as e: + if not e.errno == errno.ENOENT: + # We only want to a ignore trying to remove + # a file that does not exist. If it fails + # for any other reason we should be propagating + # that exception. + raise + os.rename(current_filename, new_filename) +else: + rename_file = os.rename diff --git a/s3transfer/exceptions.py b/s3transfer/exceptions.py new file mode 100644 index 00000000..a04be027 --- /dev/null +++ b/s3transfer/exceptions.py @@ -0,0 +1,20 @@ +# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +class RetriesExceededError(Exception): + def __init__(self, last_exception, msg='Max Retries Exceeded'): + super(RetriesExceededError, self).__init__(msg) + self.last_exception = last_exception + + +class S3UploadFailedError(Exception): + pass diff --git a/s3transfer/legacy.py b/s3transfer/legacy.py index 2fd26e13..4d5e7172 100644 --- a/s3transfer/legacy.py +++ b/s3transfer/legacy.py @@ -131,7 +131,6 @@ def __call__(self, bytes_amount): import threading import random import string -import boto3 from concurrent import futures from botocore.compat import six @@ -139,8 +138,8 @@ def __call__(self, bytes_amount): ReadTimeoutError from botocore.exceptions import IncompleteReadError -import boto3.compat -from boto3.exceptions import RetriesExceededError, S3UploadFailedError +import s3transfer.compat +from s3transfer.exceptions import RetriesExceededError, S3UploadFailedError logger = logging.getLogger(__name__) @@ -337,7 +336,7 @@ def remove_file(self, filename): pass def rename_file(self, current_filename, new_filename): - boto3.compat.rename_file(current_filename, new_filename) + s3transfer.compat.rename_file(current_filename, new_filename) class MultipartUploader(object): diff --git a/setup.cfg b/setup.cfg index ff997167..aa2f8b0f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -3,4 +3,5 @@ universal = 1 [metadata] requires-dist = + botocore>=1.3.0,<2.0.0 futures>=2.2.0,<4.0.0; python_version=="2.6" or python_version=="2.7" diff --git a/setup.py b/setup.py index 35f44a21..89467a50 100644 --- a/setup.py +++ b/setup.py @@ -10,7 +10,9 @@ VERSION_RE = re.compile(r'''__version__ = ['"]([0-9.]+)['"]''') -requires = [] +requires = [ + 'botocore>=1.3.0,<2.0.0', +] if sys.version_info[0] == 2: diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 00000000..42933a3f --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,16 @@ +# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the 'License'). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the 'license' file accompanying this file. This file is +# distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +try: + import unittest2 as unittest +except ImportError: + import unittest diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py new file mode 100644 index 00000000..79ef91c6 --- /dev/null +++ b/tests/unit/__init__.py @@ -0,0 +1,12 @@ +# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the 'License'). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the 'license' file accompanying this file. This file is +# distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. diff --git a/tests/unit/test_legacy.py b/tests/unit/test_legacy.py new file mode 100644 index 00000000..3dd4481f --- /dev/null +++ b/tests/unit/test_legacy.py @@ -0,0 +1,731 @@ +# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the 'License'). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the 'license' file accompanying this file. This file is +# distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +import os +import tempfile +import shutil +import socket +from tests import unittest +from contextlib import closing + +import mock +from botocore.vendored import six +from concurrent import futures + +from s3transfer.exceptions import RetriesExceededError +from s3transfer.exceptions import S3UploadFailedError +from s3transfer.legacy import ReadFileChunk, StreamReaderProgress +from s3transfer.legacy import S3Transfer +from s3transfer.legacy import OSUtils, TransferConfig +from s3transfer.legacy import MultipartDownloader, MultipartUploader +from s3transfer.legacy import ShutdownQueue +from s3transfer.legacy import QueueShutdownError +from s3transfer.legacy import random_file_extension +from s3transfer.legacy import disable_upload_callbacks, enable_upload_callbacks + + +class InMemoryOSLayer(OSUtils): + def __init__(self, filemap): + self.filemap = filemap + + def get_file_size(self, filename): + return len(self.filemap[filename]) + + def open_file_chunk_reader(self, filename, start_byte, size, callback): + return closing(six.BytesIO(self.filemap[filename])) + + def open(self, filename, mode): + if 'wb' in mode: + fileobj = six.BytesIO() + self.filemap[filename] = fileobj + return closing(fileobj) + else: + return closing(self.filemap[filename]) + + def remove_file(self, filename): + if filename in self.filemap: + del self.filemap[filename] + + def rename_file(self, current_filename, new_filename): + if current_filename in self.filemap: + self.filemap[new_filename] = self.filemap.pop( + current_filename) + + +class SequentialExecutor(object): + def __init__(self, max_workers): + pass + + def __enter__(self): + return self + + def __exit__(self, *args, **kwargs): + pass + + # The real map() interface actually takes *args, but we specifically do + # _not_ use this interface. + def map(self, function, args): + results = [] + for arg in args: + results.append(function(arg)) + return results + + def submit(self, function): + future = futures.Future() + future.set_result(function()) + return future + + +class TestOSUtils(unittest.TestCase): + def setUp(self): + self.tempdir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.tempdir) + + def test_get_file_size(self): + with mock.patch('os.path.getsize') as m: + OSUtils().get_file_size('myfile') + m.assert_called_with('myfile') + + def test_open_file_chunk_reader(self): + with mock.patch('s3transfer.legacy.ReadFileChunk') as m: + OSUtils().open_file_chunk_reader('myfile', 0, 100, None) + m.from_filename.assert_called_with('myfile', 0, 100, + None, enable_callback=False) + + def test_open_file(self): + fileobj = OSUtils().open(os.path.join(self.tempdir, 'foo'), 'w') + self.assertTrue(hasattr(fileobj, 'write')) + + def test_remove_file_ignores_errors(self): + with mock.patch('os.remove') as remove: + remove.side_effect = OSError('fake error') + OSUtils().remove_file('foo') + remove.assert_called_with('foo') + + def test_remove_file_proxies_remove_file(self): + with mock.patch('os.remove') as remove: + OSUtils().remove_file('foo') + remove.assert_called_with('foo') + + def test_rename_file(self): + with mock.patch('s3transfer.compat.rename_file') as rename_file: + OSUtils().rename_file('foo', 'newfoo') + rename_file.assert_called_with('foo', 'newfoo') + + +class TestReadFileChunk(unittest.TestCase): + def setUp(self): + self.tempdir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.tempdir) + + def test_read_entire_chunk(self): + filename = os.path.join(self.tempdir, 'foo') + with open(filename, 'wb') as f: + f.write(b'onetwothreefourfivesixseveneightnineten') + chunk = ReadFileChunk.from_filename( + filename, start_byte=0, chunk_size=3) + self.assertEqual(chunk.read(), b'one') + self.assertEqual(chunk.read(), b'') + + def test_read_with_amount_size(self): + filename = os.path.join(self.tempdir, 'foo') + with open(filename, 'wb') as f: + f.write(b'onetwothreefourfivesixseveneightnineten') + chunk = ReadFileChunk.from_filename( + filename, start_byte=11, chunk_size=4) + self.assertEqual(chunk.read(1), b'f') + self.assertEqual(chunk.read(1), b'o') + self.assertEqual(chunk.read(1), b'u') + self.assertEqual(chunk.read(1), b'r') + self.assertEqual(chunk.read(1), b'') + + def test_reset_stream_emulation(self): + filename = os.path.join(self.tempdir, 'foo') + with open(filename, 'wb') as f: + f.write(b'onetwothreefourfivesixseveneightnineten') + chunk = ReadFileChunk.from_filename( + filename, start_byte=11, chunk_size=4) + self.assertEqual(chunk.read(), b'four') + chunk.seek(0) + self.assertEqual(chunk.read(), b'four') + + def test_read_past_end_of_file(self): + filename = os.path.join(self.tempdir, 'foo') + with open(filename, 'wb') as f: + f.write(b'onetwothreefourfivesixseveneightnineten') + chunk = ReadFileChunk.from_filename( + filename, start_byte=36, chunk_size=100000) + self.assertEqual(chunk.read(), b'ten') + self.assertEqual(chunk.read(), b'') + self.assertEqual(len(chunk), 3) + + def test_tell_and_seek(self): + filename = os.path.join(self.tempdir, 'foo') + with open(filename, 'wb') as f: + f.write(b'onetwothreefourfivesixseveneightnineten') + chunk = ReadFileChunk.from_filename( + filename, start_byte=36, chunk_size=100000) + self.assertEqual(chunk.tell(), 0) + self.assertEqual(chunk.read(), b'ten') + self.assertEqual(chunk.tell(), 3) + chunk.seek(0) + self.assertEqual(chunk.tell(), 0) + + def test_file_chunk_supports_context_manager(self): + filename = os.path.join(self.tempdir, 'foo') + with open(filename, 'wb') as f: + f.write(b'abc') + with ReadFileChunk.from_filename(filename, + start_byte=0, + chunk_size=2) as chunk: + val = chunk.read() + self.assertEqual(val, b'ab') + + def test_iter_is_always_empty(self): + # This tests the workaround for the httplib bug (see + # the source for more info). + filename = os.path.join(self.tempdir, 'foo') + open(filename, 'wb').close() + chunk = ReadFileChunk.from_filename( + filename, start_byte=0, chunk_size=10) + self.assertEqual(list(chunk), []) + + +class TestReadFileChunkWithCallback(TestReadFileChunk): + def setUp(self): + super(TestReadFileChunkWithCallback, self).setUp() + self.filename = os.path.join(self.tempdir, 'foo') + with open(self.filename, 'wb') as f: + f.write(b'abc') + self.amounts_seen = [] + + def callback(self, amount): + self.amounts_seen.append(amount) + + def test_callback_is_invoked_on_read(self): + chunk = ReadFileChunk.from_filename( + self.filename, start_byte=0, chunk_size=3, callback=self.callback) + chunk.read(1) + chunk.read(1) + chunk.read(1) + self.assertEqual(self.amounts_seen, [1, 1, 1]) + + def test_callback_can_be_disabled(self): + chunk = ReadFileChunk.from_filename( + self.filename, start_byte=0, chunk_size=3, callback=self.callback) + chunk.disable_callback() + # Now reading from the ReadFileChunk should not invoke + # the callback. + chunk.read() + self.assertEqual(self.amounts_seen, []) + + def test_callback_will_also_be_triggered_by_seek(self): + chunk = ReadFileChunk.from_filename( + self.filename, start_byte=0, chunk_size=3, callback=self.callback) + chunk.read(2) + chunk.seek(0) + chunk.read(2) + chunk.seek(1) + chunk.read(2) + self.assertEqual(self.amounts_seen, [2, -2, 2, -1, 2]) + + +class TestStreamReaderProgress(unittest.TestCase): + + def test_proxies_to_wrapped_stream(self): + original_stream = six.StringIO('foobarbaz') + wrapped = StreamReaderProgress(original_stream) + self.assertEqual(wrapped.read(), 'foobarbaz') + + def test_callback_invoked(self): + amounts_seen = [] + + def callback(amount): + amounts_seen.append(amount) + + original_stream = six.StringIO('foobarbaz') + wrapped = StreamReaderProgress(original_stream, callback) + self.assertEqual(wrapped.read(), 'foobarbaz') + self.assertEqual(amounts_seen, [9]) + + +class TestMultipartUploader(unittest.TestCase): + def test_multipart_upload_uses_correct_client_calls(self): + client = mock.Mock() + uploader = MultipartUploader( + client, TransferConfig(), + InMemoryOSLayer({'filename': b'foobar'}), SequentialExecutor) + client.create_multipart_upload.return_value = {'UploadId': 'upload_id'} + client.upload_part.return_value = {'ETag': 'first'} + + uploader.upload_file('filename', 'bucket', 'key', None, {}) + + # We need to check both the sequence of calls (create/upload/complete) + # as well as the params passed between the calls, including + # 1. The upload_id was plumbed through + # 2. The collected etags were added to the complete call. + client.create_multipart_upload.assert_called_with( + Bucket='bucket', Key='key') + # Should be two parts. + client.upload_part.assert_called_with( + Body=mock.ANY, Bucket='bucket', + UploadId='upload_id', Key='key', PartNumber=1) + client.complete_multipart_upload.assert_called_with( + MultipartUpload={'Parts': [{'PartNumber': 1, 'ETag': 'first'}]}, + Bucket='bucket', + UploadId='upload_id', + Key='key') + + def test_multipart_upload_injects_proper_kwargs(self): + client = mock.Mock() + uploader = MultipartUploader( + client, TransferConfig(), + InMemoryOSLayer({'filename': b'foobar'}), SequentialExecutor) + client.create_multipart_upload.return_value = {'UploadId': 'upload_id'} + client.upload_part.return_value = {'ETag': 'first'} + + extra_args = { + 'SSECustomerKey': 'fakekey', + 'SSECustomerAlgorithm': 'AES256', + 'StorageClass': 'REDUCED_REDUNDANCY' + } + uploader.upload_file('filename', 'bucket', 'key', None, extra_args) + + client.create_multipart_upload.assert_called_with( + Bucket='bucket', Key='key', + # The initial call should inject all the storage class params. + SSECustomerKey='fakekey', + SSECustomerAlgorithm='AES256', + StorageClass='REDUCED_REDUNDANCY') + client.upload_part.assert_called_with( + Body=mock.ANY, Bucket='bucket', + UploadId='upload_id', Key='key', PartNumber=1, + # We only have to forward certain **extra_args in subsequent + # UploadPart calls. + SSECustomerKey='fakekey', + SSECustomerAlgorithm='AES256', + ) + client.complete_multipart_upload.assert_called_with( + MultipartUpload={'Parts': [{'PartNumber': 1, 'ETag': 'first'}]}, + Bucket='bucket', + UploadId='upload_id', + Key='key') + + def test_multipart_upload_is_aborted_on_error(self): + # If the create_multipart_upload succeeds and any upload_part + # fails, then abort_multipart_upload will be called. + client = mock.Mock() + uploader = MultipartUploader( + client, TransferConfig(), + InMemoryOSLayer({'filename': b'foobar'}), SequentialExecutor) + client.create_multipart_upload.return_value = {'UploadId': 'upload_id'} + client.upload_part.side_effect = Exception( + "Some kind of error occurred.") + + with self.assertRaises(S3UploadFailedError): + uploader.upload_file('filename', 'bucket', 'key', None, {}) + + client.abort_multipart_upload.assert_called_with( + Bucket='bucket', Key='key', UploadId='upload_id') + + +class TestMultipartDownloader(unittest.TestCase): + + maxDiff = None + + def test_multipart_download_uses_correct_client_calls(self): + client = mock.Mock() + response_body = b'foobarbaz' + client.get_object.return_value = {'Body': six.BytesIO(response_body)} + + downloader = MultipartDownloader(client, TransferConfig(), + InMemoryOSLayer({}), + SequentialExecutor) + downloader.download_file('bucket', 'key', 'filename', + len(response_body), {}) + + client.get_object.assert_called_with( + Range='bytes=0-', + Bucket='bucket', + Key='key' + ) + + def test_multipart_download_with_multiple_parts(self): + client = mock.Mock() + response_body = b'foobarbaz' + client.get_object.return_value = {'Body': six.BytesIO(response_body)} + # For testing purposes, we're testing with a multipart threshold + # of 4 bytes and a chunksize of 4 bytes. Given b'foobarbaz', + # this should result in 3 calls. In python slices this would be: + # r[0:4], r[4:8], r[8:9]. But the Range param will be slightly + # different because they use inclusive ranges. + config = TransferConfig(multipart_threshold=4, + multipart_chunksize=4) + + downloader = MultipartDownloader(client, config, + InMemoryOSLayer({}), + SequentialExecutor) + downloader.download_file('bucket', 'key', 'filename', + len(response_body), {}) + + # We're storing these in **extra because the assertEqual + # below is really about verifying we have the correct value + # for the Range param. + extra = {'Bucket': 'bucket', 'Key': 'key'} + self.assertEqual(client.get_object.call_args_list, + # Note these are inclusive ranges. + [mock.call(Range='bytes=0-3', **extra), + mock.call(Range='bytes=4-7', **extra), + mock.call(Range='bytes=8-', **extra)]) + + def test_retry_on_failures_from_stream_reads(self): + # If we get an exception during a call to the response body's .read() + # method, we should retry the request. + client = mock.Mock() + response_body = b'foobarbaz' + stream_with_errors = mock.Mock() + stream_with_errors.read.side_effect = [ + socket.error("fake error"), + response_body + ] + client.get_object.return_value = {'Body': stream_with_errors} + config = TransferConfig(multipart_threshold=4, + multipart_chunksize=4) + + downloader = MultipartDownloader(client, config, + InMemoryOSLayer({}), + SequentialExecutor) + downloader.download_file('bucket', 'key', 'filename', + len(response_body), {}) + + # We're storing these in **extra because the assertEqual + # below is really about verifying we have the correct value + # for the Range param. + extra = {'Bucket': 'bucket', 'Key': 'key'} + self.assertEqual(client.get_object.call_args_list, + # The first call to range=0-3 fails because of the + # side_effect above where we make the .read() raise a + # socket.error. + # The second call to range=0-3 then succeeds. + [mock.call(Range='bytes=0-3', **extra), + mock.call(Range='bytes=0-3', **extra), + mock.call(Range='bytes=4-7', **extra), + mock.call(Range='bytes=8-', **extra)]) + + def test_exception_raised_on_exceeded_retries(self): + client = mock.Mock() + response_body = b'foobarbaz' + stream_with_errors = mock.Mock() + stream_with_errors.read.side_effect = socket.error("fake error") + client.get_object.return_value = {'Body': stream_with_errors} + config = TransferConfig(multipart_threshold=4, + multipart_chunksize=4) + + downloader = MultipartDownloader(client, config, + InMemoryOSLayer({}), + SequentialExecutor) + with self.assertRaises(RetriesExceededError): + downloader.download_file('bucket', 'key', 'filename', + len(response_body), {}) + + def test_io_thread_failure_triggers_shutdown(self): + client = mock.Mock() + response_body = b'foobarbaz' + client.get_object.return_value = {'Body': six.BytesIO(response_body)} + os_layer = mock.Mock() + mock_fileobj = mock.MagicMock() + mock_fileobj.__enter__.return_value = mock_fileobj + mock_fileobj.write.side_effect = Exception("fake IO error") + os_layer.open.return_value = mock_fileobj + + downloader = MultipartDownloader(client, TransferConfig(), + os_layer, SequentialExecutor) + # We're verifying that the exception raised from the IO future + # propogates back up via download_file(). + with self.assertRaisesRegexp(Exception, "fake IO error"): + downloader.download_file('bucket', 'key', 'filename', + len(response_body), {}) + + def test_download_futures_fail_triggers_shutdown(self): + class FailedDownloadParts(SequentialExecutor): + def __init__(self, max_workers): + self.is_first = True + + def submit(self, function): + future = super(FailedDownloadParts, self).submit(function) + if self.is_first: + # This is the download_parts_thread. + future.set_exception( + Exception("fake download parts error")) + self.is_first = False + return future + + client = mock.Mock() + response_body = b'foobarbaz' + client.get_object.return_value = {'Body': six.BytesIO(response_body)} + + downloader = MultipartDownloader(client, TransferConfig(), + InMemoryOSLayer({}), + FailedDownloadParts) + with self.assertRaisesRegexp(Exception, "fake download parts error"): + downloader.download_file('bucket', 'key', 'filename', + len(response_body), {}) + + +class TestS3Transfer(unittest.TestCase): + def setUp(self): + self.client = mock.Mock() + self.random_file_patch = mock.patch( + 's3transfer.legacy.random_file_extension') + self.random_file = self.random_file_patch.start() + self.random_file.return_value = 'RANDOM' + + def tearDown(self): + self.random_file_patch.stop() + + def test_callback_handlers_register_on_put_item(self): + osutil = InMemoryOSLayer({'smallfile': b'foobar'}) + transfer = S3Transfer(self.client, osutil=osutil) + transfer.upload_file('smallfile', 'bucket', 'key') + events = self.client.meta.events + events.register_first.assert_called_with( + 'request-created.s3', + disable_upload_callbacks, + unique_id='s3upload-callback-disable', + ) + events.register_last.assert_called_with( + 'request-created.s3', + enable_upload_callbacks, + unique_id='s3upload-callback-enable', + ) + + def test_upload_below_multipart_threshold_uses_put_object(self): + fake_files = { + 'smallfile': b'foobar', + } + osutil = InMemoryOSLayer(fake_files) + transfer = S3Transfer(self.client, osutil=osutil) + transfer.upload_file('smallfile', 'bucket', 'key') + self.client.put_object.assert_called_with( + Bucket='bucket', Key='key', Body=mock.ANY + ) + + def test_extra_args_on_uploaded_passed_to_api_call(self): + extra_args = {'ACL': 'public-read'} + fake_files = { + 'smallfile': b'hello world' + } + osutil = InMemoryOSLayer(fake_files) + transfer = S3Transfer(self.client, osutil=osutil) + transfer.upload_file('smallfile', 'bucket', 'key', + extra_args=extra_args) + self.client.put_object.assert_called_with( + Bucket='bucket', Key='key', Body=mock.ANY, + ACL='public-read' + ) + + def test_uses_multipart_upload_when_over_threshold(self): + with mock.patch('s3transfer.legacy.MultipartUploader') as uploader: + fake_files = { + 'smallfile': b'foobar', + } + osutil = InMemoryOSLayer(fake_files) + config = TransferConfig(multipart_threshold=2, + multipart_chunksize=2) + transfer = S3Transfer(self.client, osutil=osutil, config=config) + transfer.upload_file('smallfile', 'bucket', 'key') + + uploader.return_value.upload_file.assert_called_with( + 'smallfile', 'bucket', 'key', None, {}) + + def test_uses_multipart_download_when_over_threshold(self): + with mock.patch('s3transfer.legacy.MultipartDownloader') as downloader: + osutil = InMemoryOSLayer({}) + over_multipart_threshold = 100 * 1024 * 1024 + transfer = S3Transfer(self.client, osutil=osutil) + callback = mock.sentinel.CALLBACK + self.client.head_object.return_value = { + 'ContentLength': over_multipart_threshold, + } + transfer.download_file('bucket', 'key', 'filename', + callback=callback) + + downloader.return_value.download_file.assert_called_with( + # Note how we're downloading to a temorary random file. + 'bucket', 'key', 'filename.RANDOM', over_multipart_threshold, + {}, callback) + + def test_download_file_with_invalid_extra_args(self): + below_threshold = 20 + osutil = InMemoryOSLayer({}) + transfer = S3Transfer(self.client, osutil=osutil) + self.client.head_object.return_value = { + 'ContentLength': below_threshold} + with self.assertRaises(ValueError): + transfer.download_file('bucket', 'key', '/tmp/smallfile', + extra_args={'BadValue': 'foo'}) + + def test_upload_file_with_invalid_extra_args(self): + osutil = InMemoryOSLayer({}) + transfer = S3Transfer(self.client, osutil=osutil) + bad_args = {"WebsiteRedirectLocation": "/foo"} + with self.assertRaises(ValueError): + transfer.upload_file('bucket', 'key', '/tmp/smallfile', + extra_args=bad_args) + + def test_download_file_fowards_extra_args(self): + extra_args = { + 'SSECustomerKey': 'foo', + 'SSECustomerAlgorithm': 'AES256', + } + below_threshold = 20 + osutil = InMemoryOSLayer({'smallfile': b'hello world'}) + transfer = S3Transfer(self.client, osutil=osutil) + self.client.head_object.return_value = { + 'ContentLength': below_threshold} + self.client.get_object.return_value = { + 'Body': six.BytesIO(b'foobar') + } + transfer.download_file('bucket', 'key', '/tmp/smallfile', + extra_args=extra_args) + + # Note that we need to invoke the HeadObject call + # and the PutObject call with the extra_args. + # This is necessary. Trying to HeadObject an SSE object + # will return a 400 if you don't provide the required + # params. + self.client.get_object.assert_called_with( + Bucket='bucket', Key='key', SSECustomerAlgorithm='AES256', + SSECustomerKey='foo') + + def test_get_object_stream_is_retried_and_succeeds(self): + below_threshold = 20 + osutil = InMemoryOSLayer({'smallfile': b'hello world'}) + transfer = S3Transfer(self.client, osutil=osutil) + self.client.head_object.return_value = { + 'ContentLength': below_threshold} + self.client.get_object.side_effect = [ + # First request fails. + socket.error("fake error"), + # Second succeeds. + {'Body': six.BytesIO(b'foobar')} + ] + transfer.download_file('bucket', 'key', '/tmp/smallfile') + + self.assertEqual(self.client.get_object.call_count, 2) + + def test_get_object_stream_uses_all_retries_and_errors_out(self): + below_threshold = 20 + osutil = InMemoryOSLayer({}) + transfer = S3Transfer(self.client, osutil=osutil) + self.client.head_object.return_value = { + 'ContentLength': below_threshold} + # Here we're raising an exception every single time, which + # will exhaust our retry count and propogate a + # RetriesExceededError. + self.client.get_object.side_effect = socket.error("fake error") + with self.assertRaises(RetriesExceededError): + transfer.download_file('bucket', 'key', 'smallfile') + + self.assertEqual(self.client.get_object.call_count, 5) + # We should have also cleaned up the in progress file + # we were downloading to. + self.assertEqual(osutil.filemap, {}) + + def test_download_below_multipart_threshold(self): + below_threshold = 20 + osutil = InMemoryOSLayer({'smallfile': b'hello world'}) + transfer = S3Transfer(self.client, osutil=osutil) + self.client.head_object.return_value = { + 'ContentLength': below_threshold} + self.client.get_object.return_value = { + 'Body': six.BytesIO(b'foobar') + } + transfer.download_file('bucket', 'key', 'smallfile') + + self.client.get_object.assert_called_with(Bucket='bucket', Key='key') + + def test_can_create_with_just_client(self): + transfer = S3Transfer(client=mock.Mock()) + self.assertIsInstance(transfer, S3Transfer) + + +class TestShutdownQueue(unittest.TestCase): + def test_handles_normal_put_get_requests(self): + q = ShutdownQueue() + q.put('foo') + self.assertEqual(q.get(), 'foo') + + def test_put_raises_error_on_shutdown(self): + q = ShutdownQueue() + q.trigger_shutdown() + with self.assertRaises(QueueShutdownError): + q.put('foo') + + +class TestRandomFileExtension(unittest.TestCase): + def test_has_proper_length(self): + self.assertEqual( + len(random_file_extension(num_digits=4)), 4) + + +class TestCallbackHandlers(unittest.TestCase): + def setUp(self): + self.request = mock.Mock() + + def test_disable_request_on_put_object(self): + disable_upload_callbacks(self.request, + 'PutObject') + self.request.body.disable_callback.assert_called_with() + + def test_disable_request_on_upload_part(self): + disable_upload_callbacks(self.request, + 'UploadPart') + self.request.body.disable_callback.assert_called_with() + + def test_enable_object_on_put_object(self): + enable_upload_callbacks(self.request, + 'PutObject') + self.request.body.enable_callback.assert_called_with() + + def test_enable_object_on_upload_part(self): + enable_upload_callbacks(self.request, + 'UploadPart') + self.request.body.enable_callback.assert_called_with() + + def test_dont_disable_if_missing_interface(self): + del self.request.body.disable_callback + disable_upload_callbacks(self.request, + 'PutObject') + self.assertEqual(self.request.body.method_calls, []) + + def test_dont_enable_if_missing_interface(self): + del self.request.body.enable_callback + enable_upload_callbacks(self.request, + 'PutObject') + self.assertEqual(self.request.body.method_calls, []) + + def test_dont_disable_if_wrong_operation(self): + disable_upload_callbacks(self.request, + 'OtherOperation') + self.assertFalse( + self.request.body.disable_callback.called) + + def test_dont_enable_if_wrong_operation(self): + enable_upload_callbacks(self.request, + 'OtherOperation') + self.assertFalse( + self.request.body.enable_callback.called) From 10d2e3e6605f6fd66e2961e9df1d173d3fa43450 Mon Sep 17 00:00:00 2001 From: kyleknap Date: Wed, 13 Jan 2016 10:53:49 -0800 Subject: [PATCH 4/8] Add integration tests --- tests/integration/__init__.py | 12 ++ tests/integration/test_legacy.py | 352 +++++++++++++++++++++++++++++++ 2 files changed, 364 insertions(+) create mode 100644 tests/integration/__init__.py create mode 100644 tests/integration/test_legacy.py diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 00000000..79ef91c6 --- /dev/null +++ b/tests/integration/__init__.py @@ -0,0 +1,12 @@ +# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the 'License'). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the 'license' file accompanying this file. This file is +# distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. diff --git a/tests/integration/test_legacy.py b/tests/integration/test_legacy.py new file mode 100644 index 00000000..379dd507 --- /dev/null +++ b/tests/integration/test_legacy.py @@ -0,0 +1,352 @@ +# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +import os +import threading +import math +import tempfile +import shutil +import hashlib +import string + +from tests import unittest +import botocore.session +from botocore.compat import six +from botocore.client import Config + +import s3transfer.legacy + + +urlopen = six.moves.urllib.request.urlopen + + +def assert_files_equal(first, second): + if os.path.getsize(first) != os.path.getsize(second): + raise AssertionError("Files are not equal: %s, %s" % (first, second)) + first_md5 = md5_checksum(first) + second_md5 = md5_checksum(second) + if first_md5 != second_md5: + raise AssertionError( + "Files are not equal: %s(md5=%s) != %s(md5=%s)" % ( + first, first_md5, second, second_md5)) + + +def md5_checksum(filename): + checksum = hashlib.md5() + with open(filename, 'rb') as f: + for chunk in iter(lambda: f.read(8192), b''): + checksum.update(chunk) + return checksum.hexdigest() + + +def random_bucket_name(prefix='boto3-transfer', num_chars=10): + base = string.ascii_lowercase + string.digits + random_bytes = bytearray(os.urandom(num_chars)) + return prefix + ''.join([base[b % len(base)] for b in random_bytes]) + + +class FileCreator(object): + def __init__(self): + self.rootdir = tempfile.mkdtemp() + + def remove_all(self): + shutil.rmtree(self.rootdir) + + def create_file(self, filename, contents, mode='w'): + """Creates a file in a tmpdir + ``filename`` should be a relative path, e.g. "foo/bar/baz.txt" + It will be translated into a full path in a tmp dir. + ``mode`` is the mode the file should be opened either as ``w`` or + `wb``. + Returns the full path to the file. + """ + full_path = os.path.join(self.rootdir, filename) + if not os.path.isdir(os.path.dirname(full_path)): + os.makedirs(os.path.dirname(full_path)) + with open(full_path, mode) as f: + f.write(contents) + return full_path + + def create_file_with_size(self, filename, filesize): + filename = self.create_file(filename, contents='') + chunksize = 8192 + with open(filename, 'wb') as f: + for i in range(int(math.ceil(filesize / float(chunksize)))): + f.write(b'a' * chunksize) + return filename + + def append_file(self, filename, contents): + """Append contents to a file + ``filename`` should be a relative path, e.g. "foo/bar/baz.txt" + It will be translated into a full path in a tmp dir. + Returns the full path to the file. + """ + full_path = os.path.join(self.rootdir, filename) + if not os.path.isdir(os.path.dirname(full_path)): + os.makedirs(os.path.dirname(full_path)) + with open(full_path, 'a') as f: + f.write(contents) + return full_path + + def full_path(self, filename): + """Translate relative path to full path in temp dir. + f.full_path('foo/bar.txt') -> /tmp/asdfasd/foo/bar.txt + """ + return os.path.join(self.rootdir, filename) + + +class TestS3Transfers(unittest.TestCase): + """Tests for the high level s3transfer.legacy module.""" + + @classmethod + def setUpClass(cls): + cls.region = 'us-west-2' + cls.session = botocore.session.get_session() + cls.client = cls.session.create_client('s3', cls.region) + cls.bucket_name = random_bucket_name() + cls.client.create_bucket( + Bucket=cls.bucket_name, + CreateBucketConfiguration={'LocationConstraint': cls.region}) + + def setUp(self): + self.files = FileCreator() + + def tearDown(self): + self.files.remove_all() + + @classmethod + def tearDownClass(cls): + cls.client.delete_bucket(Bucket=cls.bucket_name) + + def delete_object(self, key): + self.client.delete_object( + Bucket=self.bucket_name, + Key=key) + + def object_exists(self, key): + self.client.head_object(Bucket=self.bucket_name, + Key=key) + return True + + def create_s3_transfer(self, config=None): + return s3transfer.legacy.S3Transfer(self.client, + config=config) + + def assert_has_public_read_acl(self, response): + grants = response['Grants'] + public_read = [g['Grantee'].get('URI', '') for g in grants + if g['Permission'] == 'READ'] + self.assertIn('groups/global/AllUsers', public_read[0]) + + def test_upload_below_threshold(self): + config = s3transfer.legacy.TransferConfig( + multipart_threshold=2 * 1024 * 1024) + transfer = self.create_s3_transfer(config) + filename = self.files.create_file_with_size( + 'foo.txt', filesize=1024 * 1024) + transfer.upload_file(filename, self.bucket_name, + 'foo.txt') + self.addCleanup(self.delete_object, 'foo.txt') + + self.assertTrue(self.object_exists('foo.txt')) + + def test_upload_above_threshold(self): + config = s3transfer.legacy.TransferConfig( + multipart_threshold=2 * 1024 * 1024) + transfer = self.create_s3_transfer(config) + filename = self.files.create_file_with_size( + '20mb.txt', filesize=20 * 1024 * 1024) + transfer.upload_file(filename, self.bucket_name, + '20mb.txt') + self.addCleanup(self.delete_object, '20mb.txt') + self.assertTrue(self.object_exists('20mb.txt')) + + def test_upload_file_above_threshold_with_acl(self): + config = s3transfer.legacy.TransferConfig( + multipart_threshold=5 * 1024 * 1024) + transfer = self.create_s3_transfer(config) + filename = self.files.create_file_with_size( + '6mb.txt', filesize=6 * 1024 * 1024) + extra_args = {'ACL': 'public-read'} + transfer.upload_file(filename, self.bucket_name, + '6mb.txt', extra_args=extra_args) + self.addCleanup(self.delete_object, '6mb.txt') + + self.assertTrue(self.object_exists('6mb.txt')) + response = self.client.get_object_acl( + Bucket=self.bucket_name, Key='6mb.txt') + self.assert_has_public_read_acl(response) + + def test_upload_file_above_threshold_with_ssec(self): + key_bytes = os.urandom(32) + extra_args = { + 'SSECustomerKey': key_bytes, + 'SSECustomerAlgorithm': 'AES256', + } + config = s3transfer.legacy.TransferConfig( + multipart_threshold=5 * 1024 * 1024) + transfer = self.create_s3_transfer(config) + filename = self.files.create_file_with_size( + '6mb.txt', filesize=6 * 1024 * 1024) + transfer.upload_file(filename, self.bucket_name, + '6mb.txt', extra_args=extra_args) + self.addCleanup(self.delete_object, '6mb.txt') + # A head object will fail if it has a customer key + # associated with it and it's not provided in the HeadObject + # request so we can use this to verify our functionality. + response = self.client.head_object( + Bucket=self.bucket_name, + Key='6mb.txt', **extra_args) + self.assertEqual(response['SSECustomerAlgorithm'], 'AES256') + + def test_progress_callback_on_upload(self): + self.amount_seen = 0 + lock = threading.Lock() + + def progress_callback(amount): + with lock: + self.amount_seen += amount + + transfer = self.create_s3_transfer() + filename = self.files.create_file_with_size( + '20mb.txt', filesize=20 * 1024 * 1024) + transfer.upload_file(filename, self.bucket_name, + '20mb.txt', callback=progress_callback) + self.addCleanup(self.delete_object, '20mb.txt') + + # The callback should have been called enough times such that + # the total amount of bytes we've seen (via the "amount" + # arg to the callback function) should be the size + # of the file we uploaded. + self.assertEqual(self.amount_seen, 20 * 1024 * 1024) + + def test_callback_called_once_with_sigv4(self): + # Verify #98, where the callback was being invoked + # twice when using signature version 4. + self.amount_seen = 0 + lock = threading.Lock() + + def progress_callback(amount): + with lock: + self.amount_seen += amount + + client = self.session.create_client( + 's3', self.region, + config=Config(signature_version='s3v4')) + transfer = s3transfer.legacy.S3Transfer(client) + filename = self.files.create_file_with_size( + '10mb.txt', filesize=10 * 1024 * 1024) + transfer.upload_file(filename, self.bucket_name, + '10mb.txt', callback=progress_callback) + self.addCleanup(self.delete_object, '10mb.txt') + + self.assertEqual(self.amount_seen, 10 * 1024 * 1024) + + def test_can_send_extra_params_on_upload(self): + transfer = self.create_s3_transfer() + filename = self.files.create_file_with_size('foo.txt', filesize=1024) + transfer.upload_file(filename, self.bucket_name, + 'foo.txt', extra_args={'ACL': 'public-read'}) + self.addCleanup(self.delete_object, 'foo.txt') + + response = self.client.get_object_acl( + Bucket=self.bucket_name, Key='foo.txt') + self.assert_has_public_read_acl(response) + + def test_can_configure_threshold(self): + config = s3transfer.legacy.TransferConfig( + multipart_threshold=6 * 1024 * 1024 + ) + transfer = self.create_s3_transfer(config) + filename = self.files.create_file_with_size( + 'foo.txt', filesize=8 * 1024 * 1024) + transfer.upload_file(filename, self.bucket_name, + 'foo.txt') + self.addCleanup(self.delete_object, 'foo.txt') + + self.assertTrue(self.object_exists('foo.txt')) + + def test_can_send_extra_params_on_download(self): + # We're picking the customer provided sse feature + # of S3 to test the extra_args functionality of + # S3. + key_bytes = os.urandom(32) + extra_args = { + 'SSECustomerKey': key_bytes, + 'SSECustomerAlgorithm': 'AES256', + } + self.client.put_object(Bucket=self.bucket_name, + Key='foo.txt', + Body=b'hello world', + **extra_args) + self.addCleanup(self.delete_object, 'foo.txt') + transfer = self.create_s3_transfer() + + download_path = os.path.join(self.files.rootdir, 'downloaded.txt') + transfer.download_file(self.bucket_name, 'foo.txt', + download_path, extra_args=extra_args) + with open(download_path, 'rb') as f: + self.assertEqual(f.read(), b'hello world') + + def test_progress_callback_on_download(self): + self.amount_seen = 0 + lock = threading.Lock() + + def progress_callback(amount): + with lock: + self.amount_seen += amount + + transfer = self.create_s3_transfer() + filename = self.files.create_file_with_size( + '20mb.txt', filesize=20 * 1024 * 1024) + with open(filename, 'rb') as f: + self.client.put_object(Bucket=self.bucket_name, + Key='20mb.txt', Body=f) + self.addCleanup(self.delete_object, '20mb.txt') + + download_path = os.path.join(self.files.rootdir, 'downloaded.txt') + transfer.download_file(self.bucket_name, '20mb.txt', + download_path, callback=progress_callback) + + self.assertEqual(self.amount_seen, 20 * 1024 * 1024) + + def test_download_below_threshold(self): + transfer = self.create_s3_transfer() + + filename = self.files.create_file_with_size( + 'foo.txt', filesize=1024 * 1024) + with open(filename, 'rb') as f: + self.client.put_object(Bucket=self.bucket_name, + Key='foo.txt', + Body=f) + self.addCleanup(self.delete_object, 'foo.txt') + + download_path = os.path.join(self.files.rootdir, 'downloaded.txt') + transfer.download_file(self.bucket_name, 'foo.txt', + download_path) + assert_files_equal(filename, download_path) + + def test_download_above_threshold(self): + transfer = self.create_s3_transfer() + + filename = self.files.create_file_with_size( + 'foo.txt', filesize=20 * 1024 * 1024) + with open(filename, 'rb') as f: + self.client.put_object(Bucket=self.bucket_name, + Key='foo.txt', + Body=f) + self.addCleanup(self.delete_object, 'foo.txt') + + download_path = os.path.join(self.files.rootdir, 'downloaded.txt') + transfer.download_file(self.bucket_name, 'foo.txt', + download_path) + assert_files_equal(filename, download_path) From c4f363c077df43742f47ae5fedc97d46a2ca1bd1 Mon Sep 17 00:00:00 2001 From: kyleknap Date: Wed, 13 Jan 2016 11:00:01 -0800 Subject: [PATCH 5/8] Add some testing automation --- .coveragerc | 4 ++++ .travis.yml | 11 +++++++++++ requirements-text.txt | 7 ------- scripts/ci/install | 29 +++++++++++++++++++++++++++++ scripts/ci/run-integ-tests | 20 ++++++++++++++++++++ scripts/ci/run-tests | 20 ++++++++++++++++++++ tox.ini | 11 +++++++++++ 7 files changed, 95 insertions(+), 7 deletions(-) create mode 100644 .coveragerc create mode 100644 .travis.yml delete mode 100644 requirements-text.txt create mode 100755 scripts/ci/install create mode 100755 scripts/ci/run-integ-tests create mode 100755 scripts/ci/run-tests create mode 100644 tox.ini diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 00000000..80e5f02d --- /dev/null +++ b/.coveragerc @@ -0,0 +1,4 @@ +[run] +branch = True +include = + s3transfer/* diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 00000000..afd045d9 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,11 @@ +language: python +python: + - "2.6" + - "2.7" + - "3.3" + - "3.4" + - "3.5" +sudo: false +install: + - python scripts/ci/install +script: python scripts/ci/run-tests diff --git a/requirements-text.txt b/requirements-text.txt deleted file mode 100644 index 61988542..00000000 --- a/requirements-text.txt +++ /dev/null @@ -1,7 +0,0 @@ --e git://github.com/boto/botocore.git@develop#egg=botocore -nose==1.3.3 -mock==1.3.0 -wheel==0.24.0 -# Note you need at least pip --version of 6.0 or -# higher to be able to pick on these version specifiers. -unittest2==0.5.1; python_version == '2.6' diff --git a/scripts/ci/install b/scripts/ci/install new file mode 100755 index 00000000..4515cc25 --- /dev/null +++ b/scripts/ci/install @@ -0,0 +1,29 @@ +#!/usr/bin/env python +import os +import sys +from subprocess import check_call +import shutil + +_dname = os.path.dirname + +REPO_ROOT = _dname(_dname(_dname(os.path.abspath(__file__)))) +os.chdir(REPO_ROOT) + + +def run(command): + return check_call(command, shell=True) + + +try: + # Has the form "major.minor" + python_version = os.environ['PYTHON_VERSION'] +except KeyError: + python_version = '.'.join([str(i) for i in sys.version_info[:2]]) + +run('pip install -r requirements-test.txt') +run('pip install coverage') +if os.path.isdir('dist') and os.listdir('dist'): + shutil.rmtree('dist') +run('python setup.py bdist_wheel') +wheel_dist = os.listdir('dist')[0] +run('pip install %s' % (os.path.join('dist', wheel_dist))) diff --git a/scripts/ci/run-integ-tests b/scripts/ci/run-integ-tests new file mode 100755 index 00000000..3118bde9 --- /dev/null +++ b/scripts/ci/run-integ-tests @@ -0,0 +1,20 @@ +#!/usr/bin/env python +# Don't run tests from the root repo dir. +# We want to ensure we're importing from the installed +# binary package not from the CWD. + +import os +from subprocess import check_call + +_dname = os.path.dirname + +REPO_ROOT = _dname(_dname(_dname(os.path.abspath(__file__)))) +os.chdir(os.path.join(REPO_ROOT, 'tests')) + + +def run(command): + return check_call(command, shell=True) + + +run('nosetests --with-xunit --cover-erase --with-coverage ' + '--cover-package s3transfer --cover-xml -v integration') diff --git a/scripts/ci/run-tests b/scripts/ci/run-tests new file mode 100755 index 00000000..c9fcdc87 --- /dev/null +++ b/scripts/ci/run-tests @@ -0,0 +1,20 @@ +#!/usr/bin/env python +# Don't run tests from the root repo dir. +# We want to ensure we're importing from the installed +# binary package not from the CWD. + +import os +from subprocess import check_call + +_dname = os.path.dirname + +REPO_ROOT = _dname(_dname(_dname(os.path.abspath(__file__)))) +os.chdir(os.path.join(REPO_ROOT, 'tests')) + + +def run(command): + return check_call(command, shell=True) + + +run('nosetests --with-coverage --cover-erase --cover-package s3transfer ' + '--with-xunit --cover-xml -v unit/') diff --git a/tox.ini b/tox.ini new file mode 100644 index 00000000..062af501 --- /dev/null +++ b/tox.ini @@ -0,0 +1,11 @@ +[tox] +envlist = py26,py27,py33,py34,py35 + +# Comment to build sdist and install into virtualenv +# This is helpful to test installation but takes extra time +skipsdist = True + +[testenv] +commands = + {toxinidir}/scripts/ci/install + {toxinidir}/scripts/ci/run-tests From 83e61923b66bde6d65ad4862f8fc24c23b65d08b Mon Sep 17 00:00:00 2001 From: kyleknap Date: Wed, 13 Jan 2016 12:21:41 -0800 Subject: [PATCH 6/8] Add a gitignore file --- .gitignore | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..7e61a9a5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,42 @@ +*.py[co] +*.DS_Store + +# Packages +*.egg +*.egg-info +dist +build +eggs +parts +var +sdist +develop-eggs +.installed.cfg + +# Installer logs +pip-log.txt + +# Unit test / coverage reports +.coverage +.tox +.cache + +#Translations +*.mo + +#Mr Developer +.mr.developer.cfg + +# Emacs backup files +*~ + +# Eclipse IDE +/.project +/.pydevproject + +# IDEA IDE +.idea* +src/ + +# Completions Index +completions.idx From 58adc903a13d8f737060bca7d1576652c59e5d36 Mon Sep 17 00:00:00 2001 From: kyleknap Date: Wed, 13 Jan 2016 12:22:28 -0800 Subject: [PATCH 7/8] Add a requirements file for testing --- requirements-test.txt | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 requirements-test.txt diff --git a/requirements-test.txt b/requirements-test.txt new file mode 100644 index 00000000..61988542 --- /dev/null +++ b/requirements-test.txt @@ -0,0 +1,7 @@ +-e git://github.com/boto/botocore.git@develop#egg=botocore +nose==1.3.3 +mock==1.3.0 +wheel==0.24.0 +# Note you need at least pip --version of 6.0 or +# higher to be able to pick on these version specifiers. +unittest2==0.5.1; python_version == '2.6' From 6e4b06fb0c2b05873b7b5eb3e86c97c52ee6e880 Mon Sep 17 00:00:00 2001 From: kyleknap Date: Fri, 15 Jan 2016 13:07:07 -0800 Subject: [PATCH 8/8] Move legacy module to __init__ --- s3transfer/__init__.py | 716 +++++++++++++++++ s3transfer/legacy.py | 727 ------------------ .../{test_legacy.py => test_s3transfer.py} | 18 +- .../{test_legacy.py => test_s3transfer.py} | 24 +- 4 files changed, 737 insertions(+), 748 deletions(-) delete mode 100644 s3transfer/legacy.py rename tests/integration/{test_legacy.py => test_s3transfer.py} (96%) rename tests/unit/{test_legacy.py => test_s3transfer.py} (97%) diff --git a/s3transfer/__init__.py b/s3transfer/__init__.py index 0ac1cb38..d4a362be 100644 --- a/s3transfer/__init__.py +++ b/s3transfer/__init__.py @@ -10,5 +10,721 @@ # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. +"""Abstractions over S3's upload/download operations. + +This module provides high level abstractions for efficient +uploads/downloads. It handles several things for the user: + +* Automatically switching to multipart transfers when + a file is over a specific size threshold +* Uploading/downloading a file in parallel +* Throttling based on max bandwidth +* Progress callbacks to monitor transfers +* Retries. While botocore handles retries for streaming uploads, + it is not possible for it to handle retries for streaming + downloads. This module handles retries for both cases so + you don't need to implement any retry logic yourself. + +This module has a reasonable set of defaults. It also allows you +to configure many aspects of the transfer process including: + +* Multipart threshold size +* Max parallel downloads +* Max bandwidth +* Socket timeouts +* Retry amounts + +There is no support for s3->s3 multipart copies at this +time. + + +.. _ref_s3transfer_usage: + +Usage +===== + +The simplest way to use this module is: + +.. code-block:: python + + client = boto3.client('s3', 'us-west-2') + transfer = S3Transfer(client) + # Upload /tmp/myfile to s3://bucket/key + transfer.upload_file('/tmp/myfile', 'bucket', 'key') + + # Download s3://bucket/key to /tmp/myfile + transfer.download_file('bucket', 'key', '/tmp/myfile') + +The ``upload_file`` and ``download_file`` methods also accept +``**kwargs``, which will be forwarded through to the corresponding +client operation. Here are a few examples using ``upload_file``:: + + # Making the object public + transfer.upload_file('/tmp/myfile', 'bucket', 'key', + extra_args={'ACL': 'public-read'}) + + # Setting metadata + transfer.upload_file('/tmp/myfile', 'bucket', 'key', + extra_args={'Metadata': {'a': 'b', 'c': 'd'}}) + + # Setting content type + transfer.upload_file('/tmp/myfile.json', 'bucket', 'key', + extra_args={'ContentType': "application/json"}) + + +The ``S3Transfer`` clas also supports progress callbacks so you can +provide transfer progress to users. Both the ``upload_file`` and +``download_file`` methods take an optional ``callback`` parameter. +Here's an example of how to print a simple progress percentage +to the user: + +.. code-block:: python + + class ProgressPercentage(object): + def __init__(self, filename): + self._filename = filename + self._size = float(os.path.getsize(filename)) + self._seen_so_far = 0 + self._lock = threading.Lock() + + def __call__(self, bytes_amount): + # To simplify we'll assume this is hooked up + # to a single filename. + with self._lock: + self._seen_so_far += bytes_amount + percentage = (self._seen_so_far / self._size) * 100 + sys.stdout.write( + "\r%s %s / %s (%.2f%%)" % (self._filename, self._seen_so_far, + self._size, percentage)) + sys.stdout.flush() + + + transfer = S3Transfer(boto3.client('s3', 'us-west-2')) + # Upload /tmp/myfile to s3://bucket/key and print upload progress. + transfer.upload_file('/tmp/myfile', 'bucket', 'key', + callback=ProgressPercentage('/tmp/myfile')) + + + +You can also provide a TransferConfig object to the S3Transfer +object that gives you more fine grained control over the +transfer. For example: + +.. code-block:: python + + client = boto3.client('s3', 'us-west-2') + config = TransferConfig( + multipart_threshold=8 * 1024 * 1024, + max_concurrency=10, + num_download_attempts=10, + ) + transfer = S3Transfer(client, config) + transfer.upload_file('/tmp/foo', 'bucket', 'key') + + +""" +import os +import math +import functools +import logging +import socket +import threading +import random +import string +from concurrent import futures + +from botocore.compat import six +from botocore.vendored.requests.packages.urllib3.exceptions import \ + ReadTimeoutError +from botocore.exceptions import IncompleteReadError + +import s3transfer.compat +from s3transfer.exceptions import RetriesExceededError, S3UploadFailedError + + __author__ = 'Amazon Web Services' __version__ = '0.0.0' + +logger = logging.getLogger(__name__) +queue = six.moves.queue + +MB = 1024 * 1024 +SHUTDOWN_SENTINEL = object() + + +def random_file_extension(num_digits=8): + return ''.join(random.choice(string.hexdigits) for _ in range(num_digits)) + + +def disable_upload_callbacks(request, operation_name, **kwargs): + if operation_name in ['PutObject', 'UploadPart'] and \ + hasattr(request.body, 'disable_callback'): + request.body.disable_callback() + + +def enable_upload_callbacks(request, operation_name, **kwargs): + if operation_name in ['PutObject', 'UploadPart'] and \ + hasattr(request.body, 'enable_callback'): + request.body.enable_callback() + + +class QueueShutdownError(Exception): + pass + + +class ReadFileChunk(object): + def __init__(self, fileobj, start_byte, chunk_size, full_file_size, + callback=None, enable_callback=True): + """ + + Given a file object shown below: + + |___________________________________________________| + 0 | | full_file_size + |----chunk_size---| + start_byte + + :type fileobj: file + :param fileobj: File like object + + :type start_byte: int + :param start_byte: The first byte from which to start reading. + + :type chunk_size: int + :param chunk_size: The max chunk size to read. Trying to read + pass the end of the chunk size will behave like you've + reached the end of the file. + + :type full_file_size: int + :param full_file_size: The entire content length associated + with ``fileobj``. + + :type callback: function(amount_read) + :param callback: Called whenever data is read from this object. + + """ + self._fileobj = fileobj + self._start_byte = start_byte + self._size = self._calculate_file_size( + self._fileobj, requested_size=chunk_size, + start_byte=start_byte, actual_file_size=full_file_size) + self._fileobj.seek(self._start_byte) + self._amount_read = 0 + self._callback = callback + self._callback_enabled = enable_callback + + @classmethod + def from_filename(cls, filename, start_byte, chunk_size, callback=None, + enable_callback=True): + """Convenience factory function to create from a filename. + + :type start_byte: int + :param start_byte: The first byte from which to start reading. + + :type chunk_size: int + :param chunk_size: The max chunk size to read. Trying to read + pass the end of the chunk size will behave like you've + reached the end of the file. + + :type full_file_size: int + :param full_file_size: The entire content length associated + with ``fileobj``. + + :type callback: function(amount_read) + :param callback: Called whenever data is read from this object. + + :type enable_callback: bool + :param enable_callback: Indicate whether to invoke callback + during read() calls. + + :rtype: ``ReadFileChunk`` + :return: A new instance of ``ReadFileChunk`` + + """ + f = open(filename, 'rb') + file_size = os.fstat(f.fileno()).st_size + return cls(f, start_byte, chunk_size, file_size, callback, + enable_callback) + + def _calculate_file_size(self, fileobj, requested_size, start_byte, + actual_file_size): + max_chunk_size = actual_file_size - start_byte + return min(max_chunk_size, requested_size) + + def read(self, amount=None): + if amount is None: + amount_to_read = self._size - self._amount_read + else: + amount_to_read = min(self._size - self._amount_read, amount) + data = self._fileobj.read(amount_to_read) + self._amount_read += len(data) + if self._callback is not None and self._callback_enabled: + self._callback(len(data)) + return data + + def enable_callback(self): + self._callback_enabled = True + + def disable_callback(self): + self._callback_enabled = False + + def seek(self, where): + self._fileobj.seek(self._start_byte + where) + if self._callback is not None and self._callback_enabled: + # To also rewind the callback() for an accurate progress report + self._callback(where - self._amount_read) + self._amount_read = where + + def close(self): + self._fileobj.close() + + def tell(self): + return self._amount_read + + def __len__(self): + # __len__ is defined because requests will try to determine the length + # of the stream to set a content length. In the normal case + # of the file it will just stat the file, but we need to change that + # behavior. By providing a __len__, requests will use that instead + # of stat'ing the file. + return self._size + + def __enter__(self): + return self + + def __exit__(self, *args, **kwargs): + self.close() + + def __iter__(self): + # This is a workaround for http://bugs.python.org/issue17575 + # Basically httplib will try to iterate over the contents, even + # if its a file like object. This wasn't noticed because we've + # already exhausted the stream so iterating over the file immediately + # stops, which is what we're simulating here. + return iter([]) + + +class StreamReaderProgress(object): + """Wrapper for a read only stream that adds progress callbacks.""" + def __init__(self, stream, callback=None): + self._stream = stream + self._callback = callback + + def read(self, *args, **kwargs): + value = self._stream.read(*args, **kwargs) + if self._callback is not None: + self._callback(len(value)) + return value + + +class OSUtils(object): + def get_file_size(self, filename): + return os.path.getsize(filename) + + def open_file_chunk_reader(self, filename, start_byte, size, callback): + return ReadFileChunk.from_filename(filename, start_byte, + size, callback, + enable_callback=False) + + def open(self, filename, mode): + return open(filename, mode) + + def remove_file(self, filename): + """Remove a file, noop if file does not exist.""" + # Unlike os.remove, if the file does not exist, + # then this method does nothing. + try: + os.remove(filename) + except OSError: + pass + + def rename_file(self, current_filename, new_filename): + s3transfer.compat.rename_file(current_filename, new_filename) + + +class MultipartUploader(object): + # These are the extra_args that need to be forwarded onto + # subsequent upload_parts. + UPLOAD_PART_ARGS = [ + 'SSECustomerKey', + 'SSECustomerAlgorithm', + 'SSECustomerKeyMD5', + 'RequestPayer', + ] + + def __init__(self, client, config, osutil, + executor_cls=futures.ThreadPoolExecutor): + self._client = client + self._config = config + self._os = osutil + self._executor_cls = executor_cls + + def _extra_upload_part_args(self, extra_args): + # Only the args in UPLOAD_PART_ARGS actually need to be passed + # onto the upload_part calls. + upload_parts_args = {} + for key, value in extra_args.items(): + if key in self.UPLOAD_PART_ARGS: + upload_parts_args[key] = value + return upload_parts_args + + def upload_file(self, filename, bucket, key, callback, extra_args): + response = self._client.create_multipart_upload(Bucket=bucket, + Key=key, **extra_args) + upload_id = response['UploadId'] + try: + parts = self._upload_parts(upload_id, filename, bucket, key, + callback, extra_args) + except Exception as e: + logger.debug("Exception raised while uploading parts, " + "aborting multipart upload.", exc_info=True) + self._client.abort_multipart_upload( + Bucket=bucket, Key=key, UploadId=upload_id) + raise S3UploadFailedError( + "Failed to upload %s to %s: %s" % ( + filename, '/'.join([bucket, key]), e)) + self._client.complete_multipart_upload( + Bucket=bucket, Key=key, UploadId=upload_id, + MultipartUpload={'Parts': parts}) + + def _upload_parts(self, upload_id, filename, bucket, key, callback, + extra_args): + upload_parts_extra_args = self._extra_upload_part_args(extra_args) + parts = [] + part_size = self._config.multipart_chunksize + num_parts = int( + math.ceil(self._os.get_file_size(filename) / float(part_size))) + max_workers = self._config.max_concurrency + with self._executor_cls(max_workers=max_workers) as executor: + upload_partial = functools.partial( + self._upload_one_part, filename, bucket, key, upload_id, + part_size, upload_parts_extra_args, callback) + for part in executor.map(upload_partial, range(1, num_parts + 1)): + parts.append(part) + return parts + + def _upload_one_part(self, filename, bucket, key, + upload_id, part_size, extra_args, + callback, part_number): + open_chunk_reader = self._os.open_file_chunk_reader + with open_chunk_reader(filename, part_size * (part_number - 1), + part_size, callback) as body: + response = self._client.upload_part( + Bucket=bucket, Key=key, + UploadId=upload_id, PartNumber=part_number, Body=body, + **extra_args) + etag = response['ETag'] + return {'ETag': etag, 'PartNumber': part_number} + + +class ShutdownQueue(queue.Queue): + """A queue implementation that can be shutdown. + + Shutting down a queue means that this class adds a + trigger_shutdown method that will trigger all subsequent + calls to put() to fail with a ``QueueShutdownError``. + + It purposefully deviates from queue.Queue, and is *not* meant + to be a drop in replacement for ``queue.Queue``. + + """ + def _init(self, maxsize): + self._shutdown = False + self._shutdown_lock = threading.Lock() + # queue.Queue is an old style class so we don't use super(). + return queue.Queue._init(self, maxsize) + + def trigger_shutdown(self): + with self._shutdown_lock: + self._shutdown = True + logger.debug("The IO queue is now shutdown.") + + def put(self, item): + # Note: this is not sufficient, it's still possible to deadlock! + # Need to hook into the condition vars used by this class. + with self._shutdown_lock: + if self._shutdown: + raise QueueShutdownError("Cannot put item to queue when " + "queue has been shutdown.") + return queue.Queue.put(self, item) + + +class MultipartDownloader(object): + def __init__(self, client, config, osutil, + executor_cls=futures.ThreadPoolExecutor): + self._client = client + self._config = config + self._os = osutil + self._executor_cls = executor_cls + self._ioqueue = ShutdownQueue(self._config.max_io_queue) + + def download_file(self, bucket, key, filename, object_size, + extra_args, callback=None): + with self._executor_cls(max_workers=2) as controller: + # 1 thread for the future that manages the uploading of files + # 1 thread for the future that manages IO writes. + download_parts_handler = functools.partial( + self._download_file_as_future, + bucket, key, filename, object_size, callback) + parts_future = controller.submit(download_parts_handler) + + io_writes_handler = functools.partial( + self._perform_io_writes, filename) + io_future = controller.submit(io_writes_handler) + results = futures.wait([parts_future, io_future], + return_when=futures.FIRST_EXCEPTION) + self._process_future_results(results) + + def _process_future_results(self, futures): + finished, unfinished = futures + for future in finished: + future.result() + + def _download_file_as_future(self, bucket, key, filename, object_size, + callback): + part_size = self._config.multipart_chunksize + num_parts = int(math.ceil(object_size / float(part_size))) + max_workers = self._config.max_concurrency + download_partial = functools.partial( + self._download_range, bucket, key, filename, + part_size, num_parts, callback) + try: + with self._executor_cls(max_workers=max_workers) as executor: + list(executor.map(download_partial, range(num_parts))) + finally: + self._ioqueue.put(SHUTDOWN_SENTINEL) + + def _calculate_range_param(self, part_size, part_index, num_parts): + start_range = part_index * part_size + if part_index == num_parts - 1: + end_range = '' + else: + end_range = start_range + part_size - 1 + range_param = 'bytes=%s-%s' % (start_range, end_range) + return range_param + + def _download_range(self, bucket, key, filename, + part_size, num_parts, callback, part_index): + try: + range_param = self._calculate_range_param( + part_size, part_index, num_parts) + + max_attempts = self._config.num_download_attempts + last_exception = None + for i in range(max_attempts): + try: + logger.debug("Making get_object call.") + response = self._client.get_object( + Bucket=bucket, Key=key, Range=range_param) + streaming_body = StreamReaderProgress( + response['Body'], callback) + buffer_size = 1024 * 16 + current_index = part_size * part_index + for chunk in iter(lambda: streaming_body.read(buffer_size), + b''): + self._ioqueue.put((current_index, chunk)) + current_index += len(chunk) + return + except (socket.timeout, socket.error, + ReadTimeoutError, IncompleteReadError) as e: + logger.debug("Retrying exception caught (%s), " + "retrying request, (attempt %s / %s)", e, i, + max_attempts, exc_info=True) + last_exception = e + continue + raise RetriesExceededError(last_exception) + finally: + logger.debug("EXITING _download_range for part: %s", part_index) + + def _perform_io_writes(self, filename): + with self._os.open(filename, 'wb') as f: + while True: + task = self._ioqueue.get() + if task is SHUTDOWN_SENTINEL: + logger.debug("Shutdown sentinel received in IO handler, " + "shutting down IO handler.") + return + else: + try: + offset, data = task + f.seek(offset) + f.write(data) + except Exception as e: + logger.debug("Caught exception in IO thread: %s", + e, exc_info=True) + self._ioqueue.trigger_shutdown() + raise + + +class TransferConfig(object): + def __init__(self, + multipart_threshold=8 * MB, + max_concurrency=10, + multipart_chunksize=8 * MB, + num_download_attempts=5, + max_io_queue=100): + self.multipart_threshold = multipart_threshold + self.max_concurrency = max_concurrency + self.multipart_chunksize = multipart_chunksize + self.num_download_attempts = num_download_attempts + self.max_io_queue = max_io_queue + + +class S3Transfer(object): + + ALLOWED_DOWNLOAD_ARGS = [ + 'VersionId', + 'SSECustomerAlgorithm', + 'SSECustomerKey', + 'SSECustomerKeyMD5', + 'RequestPayer', + ] + + ALLOWED_UPLOAD_ARGS = [ + 'ACL', + 'CacheControl', + 'ContentDisposition', + 'ContentEncoding', + 'ContentLanguage', + 'ContentType', + 'Expires', + 'GrantFullControl', + 'GrantRead', + 'GrantReadACP', + 'GrantWriteACL', + 'Metadata', + 'RequestPayer', + 'ServerSideEncryption', + 'StorageClass', + 'SSECustomerAlgorithm', + 'SSECustomerKey', + 'SSECustomerKeyMD5', + 'SSEKMSKeyId', + ] + + def __init__(self, client, config=None, osutil=None): + self._client = client + if config is None: + config = TransferConfig() + self._config = config + if osutil is None: + osutil = OSUtils() + self._osutil = osutil + + def upload_file(self, filename, bucket, key, + callback=None, extra_args=None): + """Upload a file to an S3 object. + + Variants have also been injected into S3 client, Bucket and Object. + You don't have to use S3Transfer.upload_file() directly. + """ + if extra_args is None: + extra_args = {} + self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS) + events = self._client.meta.events + events.register_first('request-created.s3', + disable_upload_callbacks, + unique_id='s3upload-callback-disable') + events.register_last('request-created.s3', + enable_upload_callbacks, + unique_id='s3upload-callback-enable') + if self._osutil.get_file_size(filename) >= \ + self._config.multipart_threshold: + self._multipart_upload(filename, bucket, key, callback, extra_args) + else: + self._put_object(filename, bucket, key, callback, extra_args) + + def _put_object(self, filename, bucket, key, callback, extra_args): + # We're using open_file_chunk_reader so we can take advantage of the + # progress callback functionality. + open_chunk_reader = self._osutil.open_file_chunk_reader + with open_chunk_reader(filename, 0, + self._osutil.get_file_size(filename), + callback=callback) as body: + self._client.put_object(Bucket=bucket, Key=key, Body=body, + **extra_args) + + def download_file(self, bucket, key, filename, extra_args=None, + callback=None): + """Download an S3 object to a file. + + Variants have also been injected into S3 client, Bucket and Object. + You don't have to use S3Transfer.download_file() directly. + """ + # This method will issue a ``head_object`` request to determine + # the size of the S3 object. This is used to determine if the + # object is downloaded in parallel. + if extra_args is None: + extra_args = {} + self._validate_all_known_args(extra_args, self.ALLOWED_DOWNLOAD_ARGS) + object_size = self._object_size(bucket, key, extra_args) + temp_filename = filename + os.extsep + random_file_extension() + try: + self._download_file(bucket, key, temp_filename, object_size, + extra_args, callback) + except Exception: + logger.debug("Exception caught in download_file, removing partial " + "file: %s", temp_filename, exc_info=True) + self._osutil.remove_file(temp_filename) + raise + else: + self._osutil.rename_file(temp_filename, filename) + + def _download_file(self, bucket, key, filename, object_size, + extra_args, callback): + if object_size >= self._config.multipart_threshold: + self._ranged_download(bucket, key, filename, object_size, + extra_args, callback) + else: + self._get_object(bucket, key, filename, extra_args, callback) + + def _validate_all_known_args(self, actual, allowed): + for kwarg in actual: + if kwarg not in allowed: + raise ValueError( + "Invalid extra_args key '%s', " + "must be one of: %s" % ( + kwarg, ', '.join(allowed))) + + def _ranged_download(self, bucket, key, filename, object_size, + extra_args, callback): + downloader = MultipartDownloader(self._client, self._config, + self._osutil) + downloader.download_file(bucket, key, filename, object_size, + extra_args, callback) + + def _get_object(self, bucket, key, filename, extra_args, callback): + # precondition: num_download_attempts > 0 + max_attempts = self._config.num_download_attempts + last_exception = None + for i in range(max_attempts): + try: + return self._do_get_object(bucket, key, filename, + extra_args, callback) + except (socket.timeout, socket.error, + ReadTimeoutError, IncompleteReadError) as e: + # TODO: we need a way to reset the callback if the + # download failed. + logger.debug("Retrying exception caught (%s), " + "retrying request, (attempt %s / %s)", e, i, + max_attempts, exc_info=True) + last_exception = e + continue + raise RetriesExceededError(last_exception) + + def _do_get_object(self, bucket, key, filename, extra_args, callback): + response = self._client.get_object(Bucket=bucket, Key=key, + **extra_args) + streaming_body = StreamReaderProgress( + response['Body'], callback) + with self._osutil.open(filename, 'wb') as f: + for chunk in iter(lambda: streaming_body.read(8192), b''): + f.write(chunk) + + def _object_size(self, bucket, key, extra_args): + return self._client.head_object( + Bucket=bucket, Key=key, **extra_args)['ContentLength'] + + def _multipart_upload(self, filename, bucket, key, callback, extra_args): + uploader = MultipartUploader(self._client, self._config, self._osutil) + uploader.upload_file(filename, bucket, key, callback, extra_args) diff --git a/s3transfer/legacy.py b/s3transfer/legacy.py deleted file mode 100644 index 4d5e7172..00000000 --- a/s3transfer/legacy.py +++ /dev/null @@ -1,727 +0,0 @@ -# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"). You -# may not use this file except in compliance with the License. A copy of -# the License is located at -# -# http://aws.amazon.com/apache2.0/ -# -# or in the "license" file accompanying this file. This file is -# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF -# ANY KIND, either express or implied. See the License for the specific -# language governing permissions and limitations under the License. -"""Abstractions over S3's upload/download operations. - -This module provides high level abstractions for efficient -uploads/downloads. It handles several things for the user: - -* Automatically switching to multipart transfers when - a file is over a specific size threshold -* Uploading/downloading a file in parallel -* Throttling based on max bandwidth -* Progress callbacks to monitor transfers -* Retries. While botocore handles retries for streaming uploads, - it is not possible for it to handle retries for streaming - downloads. This module handles retries for both cases so - you don't need to implement any retry logic yourself. - -This module has a reasonable set of defaults. It also allows you -to configure many aspects of the transfer process including: - -* Multipart threshold size -* Max parallel downloads -* Max bandwidth -* Socket timeouts -* Retry amounts - -There is no support for s3->s3 multipart copies at this -time. - - -.. _ref_s3transfer_usage: - -Usage -===== - -The simplest way to use this module is: - -.. code-block:: python - - client = boto3.client('s3', 'us-west-2') - transfer = S3Transfer(client) - # Upload /tmp/myfile to s3://bucket/key - transfer.upload_file('/tmp/myfile', 'bucket', 'key') - - # Download s3://bucket/key to /tmp/myfile - transfer.download_file('bucket', 'key', '/tmp/myfile') - -The ``upload_file`` and ``download_file`` methods also accept -``**kwargs``, which will be forwarded through to the corresponding -client operation. Here are a few examples using ``upload_file``:: - - # Making the object public - transfer.upload_file('/tmp/myfile', 'bucket', 'key', - extra_args={'ACL': 'public-read'}) - - # Setting metadata - transfer.upload_file('/tmp/myfile', 'bucket', 'key', - extra_args={'Metadata': {'a': 'b', 'c': 'd'}}) - - # Setting content type - transfer.upload_file('/tmp/myfile.json', 'bucket', 'key', - extra_args={'ContentType': "application/json"}) - - -The ``S3Transfer`` clas also supports progress callbacks so you can -provide transfer progress to users. Both the ``upload_file`` and -``download_file`` methods take an optional ``callback`` parameter. -Here's an example of how to print a simple progress percentage -to the user: - -.. code-block:: python - - class ProgressPercentage(object): - def __init__(self, filename): - self._filename = filename - self._size = float(os.path.getsize(filename)) - self._seen_so_far = 0 - self._lock = threading.Lock() - - def __call__(self, bytes_amount): - # To simplify we'll assume this is hooked up - # to a single filename. - with self._lock: - self._seen_so_far += bytes_amount - percentage = (self._seen_so_far / self._size) * 100 - sys.stdout.write( - "\r%s %s / %s (%.2f%%)" % (self._filename, self._seen_so_far, - self._size, percentage)) - sys.stdout.flush() - - - transfer = S3Transfer(boto3.client('s3', 'us-west-2')) - # Upload /tmp/myfile to s3://bucket/key and print upload progress. - transfer.upload_file('/tmp/myfile', 'bucket', 'key', - callback=ProgressPercentage('/tmp/myfile')) - - - -You can also provide a TransferConfig object to the S3Transfer -object that gives you more fine grained control over the -transfer. For example: - -.. code-block:: python - - client = boto3.client('s3', 'us-west-2') - config = TransferConfig( - multipart_threshold=8 * 1024 * 1024, - max_concurrency=10, - num_download_attempts=10, - ) - transfer = S3Transfer(client, config) - transfer.upload_file('/tmp/foo', 'bucket', 'key') - - -""" -import os -import math -import functools -import logging -import socket -import threading -import random -import string -from concurrent import futures - -from botocore.compat import six -from botocore.vendored.requests.packages.urllib3.exceptions import \ - ReadTimeoutError -from botocore.exceptions import IncompleteReadError - -import s3transfer.compat -from s3transfer.exceptions import RetriesExceededError, S3UploadFailedError - - -logger = logging.getLogger(__name__) -queue = six.moves.queue - -MB = 1024 * 1024 -SHUTDOWN_SENTINEL = object() - - -def random_file_extension(num_digits=8): - return ''.join(random.choice(string.hexdigits) for _ in range(num_digits)) - - -def disable_upload_callbacks(request, operation_name, **kwargs): - if operation_name in ['PutObject', 'UploadPart'] and \ - hasattr(request.body, 'disable_callback'): - request.body.disable_callback() - - -def enable_upload_callbacks(request, operation_name, **kwargs): - if operation_name in ['PutObject', 'UploadPart'] and \ - hasattr(request.body, 'enable_callback'): - request.body.enable_callback() - - -class QueueShutdownError(Exception): - pass - - -class ReadFileChunk(object): - def __init__(self, fileobj, start_byte, chunk_size, full_file_size, - callback=None, enable_callback=True): - """ - - Given a file object shown below: - - |___________________________________________________| - 0 | | full_file_size - |----chunk_size---| - start_byte - - :type fileobj: file - :param fileobj: File like object - - :type start_byte: int - :param start_byte: The first byte from which to start reading. - - :type chunk_size: int - :param chunk_size: The max chunk size to read. Trying to read - pass the end of the chunk size will behave like you've - reached the end of the file. - - :type full_file_size: int - :param full_file_size: The entire content length associated - with ``fileobj``. - - :type callback: function(amount_read) - :param callback: Called whenever data is read from this object. - - """ - self._fileobj = fileobj - self._start_byte = start_byte - self._size = self._calculate_file_size( - self._fileobj, requested_size=chunk_size, - start_byte=start_byte, actual_file_size=full_file_size) - self._fileobj.seek(self._start_byte) - self._amount_read = 0 - self._callback = callback - self._callback_enabled = enable_callback - - @classmethod - def from_filename(cls, filename, start_byte, chunk_size, callback=None, - enable_callback=True): - """Convenience factory function to create from a filename. - - :type start_byte: int - :param start_byte: The first byte from which to start reading. - - :type chunk_size: int - :param chunk_size: The max chunk size to read. Trying to read - pass the end of the chunk size will behave like you've - reached the end of the file. - - :type full_file_size: int - :param full_file_size: The entire content length associated - with ``fileobj``. - - :type callback: function(amount_read) - :param callback: Called whenever data is read from this object. - - :type enable_callback: bool - :param enable_callback: Indicate whether to invoke callback - during read() calls. - - :rtype: ``ReadFileChunk`` - :return: A new instance of ``ReadFileChunk`` - - """ - f = open(filename, 'rb') - file_size = os.fstat(f.fileno()).st_size - return cls(f, start_byte, chunk_size, file_size, callback, - enable_callback) - - def _calculate_file_size(self, fileobj, requested_size, start_byte, - actual_file_size): - max_chunk_size = actual_file_size - start_byte - return min(max_chunk_size, requested_size) - - def read(self, amount=None): - if amount is None: - amount_to_read = self._size - self._amount_read - else: - amount_to_read = min(self._size - self._amount_read, amount) - data = self._fileobj.read(amount_to_read) - self._amount_read += len(data) - if self._callback is not None and self._callback_enabled: - self._callback(len(data)) - return data - - def enable_callback(self): - self._callback_enabled = True - - def disable_callback(self): - self._callback_enabled = False - - def seek(self, where): - self._fileobj.seek(self._start_byte + where) - if self._callback is not None and self._callback_enabled: - # To also rewind the callback() for an accurate progress report - self._callback(where - self._amount_read) - self._amount_read = where - - def close(self): - self._fileobj.close() - - def tell(self): - return self._amount_read - - def __len__(self): - # __len__ is defined because requests will try to determine the length - # of the stream to set a content length. In the normal case - # of the file it will just stat the file, but we need to change that - # behavior. By providing a __len__, requests will use that instead - # of stat'ing the file. - return self._size - - def __enter__(self): - return self - - def __exit__(self, *args, **kwargs): - self.close() - - def __iter__(self): - # This is a workaround for http://bugs.python.org/issue17575 - # Basically httplib will try to iterate over the contents, even - # if its a file like object. This wasn't noticed because we've - # already exhausted the stream so iterating over the file immediately - # stops, which is what we're simulating here. - return iter([]) - - -class StreamReaderProgress(object): - """Wrapper for a read only stream that adds progress callbacks.""" - def __init__(self, stream, callback=None): - self._stream = stream - self._callback = callback - - def read(self, *args, **kwargs): - value = self._stream.read(*args, **kwargs) - if self._callback is not None: - self._callback(len(value)) - return value - - -class OSUtils(object): - def get_file_size(self, filename): - return os.path.getsize(filename) - - def open_file_chunk_reader(self, filename, start_byte, size, callback): - return ReadFileChunk.from_filename(filename, start_byte, - size, callback, - enable_callback=False) - - def open(self, filename, mode): - return open(filename, mode) - - def remove_file(self, filename): - """Remove a file, noop if file does not exist.""" - # Unlike os.remove, if the file does not exist, - # then this method does nothing. - try: - os.remove(filename) - except OSError: - pass - - def rename_file(self, current_filename, new_filename): - s3transfer.compat.rename_file(current_filename, new_filename) - - -class MultipartUploader(object): - # These are the extra_args that need to be forwarded onto - # subsequent upload_parts. - UPLOAD_PART_ARGS = [ - 'SSECustomerKey', - 'SSECustomerAlgorithm', - 'SSECustomerKeyMD5', - 'RequestPayer', - ] - - def __init__(self, client, config, osutil, - executor_cls=futures.ThreadPoolExecutor): - self._client = client - self._config = config - self._os = osutil - self._executor_cls = executor_cls - - def _extra_upload_part_args(self, extra_args): - # Only the args in UPLOAD_PART_ARGS actually need to be passed - # onto the upload_part calls. - upload_parts_args = {} - for key, value in extra_args.items(): - if key in self.UPLOAD_PART_ARGS: - upload_parts_args[key] = value - return upload_parts_args - - def upload_file(self, filename, bucket, key, callback, extra_args): - response = self._client.create_multipart_upload(Bucket=bucket, - Key=key, **extra_args) - upload_id = response['UploadId'] - try: - parts = self._upload_parts(upload_id, filename, bucket, key, - callback, extra_args) - except Exception as e: - logger.debug("Exception raised while uploading parts, " - "aborting multipart upload.", exc_info=True) - self._client.abort_multipart_upload( - Bucket=bucket, Key=key, UploadId=upload_id) - raise S3UploadFailedError( - "Failed to upload %s to %s: %s" % ( - filename, '/'.join([bucket, key]), e)) - self._client.complete_multipart_upload( - Bucket=bucket, Key=key, UploadId=upload_id, - MultipartUpload={'Parts': parts}) - - def _upload_parts(self, upload_id, filename, bucket, key, callback, - extra_args): - upload_parts_extra_args = self._extra_upload_part_args(extra_args) - parts = [] - part_size = self._config.multipart_chunksize - num_parts = int( - math.ceil(self._os.get_file_size(filename) / float(part_size))) - max_workers = self._config.max_concurrency - with self._executor_cls(max_workers=max_workers) as executor: - upload_partial = functools.partial( - self._upload_one_part, filename, bucket, key, upload_id, - part_size, upload_parts_extra_args, callback) - for part in executor.map(upload_partial, range(1, num_parts + 1)): - parts.append(part) - return parts - - def _upload_one_part(self, filename, bucket, key, - upload_id, part_size, extra_args, - callback, part_number): - open_chunk_reader = self._os.open_file_chunk_reader - with open_chunk_reader(filename, part_size * (part_number - 1), - part_size, callback) as body: - response = self._client.upload_part( - Bucket=bucket, Key=key, - UploadId=upload_id, PartNumber=part_number, Body=body, - **extra_args) - etag = response['ETag'] - return {'ETag': etag, 'PartNumber': part_number} - - -class ShutdownQueue(queue.Queue): - """A queue implementation that can be shutdown. - - Shutting down a queue means that this class adds a - trigger_shutdown method that will trigger all subsequent - calls to put() to fail with a ``QueueShutdownError``. - - It purposefully deviates from queue.Queue, and is *not* meant - to be a drop in replacement for ``queue.Queue``. - - """ - def _init(self, maxsize): - self._shutdown = False - self._shutdown_lock = threading.Lock() - # queue.Queue is an old style class so we don't use super(). - return queue.Queue._init(self, maxsize) - - def trigger_shutdown(self): - with self._shutdown_lock: - self._shutdown = True - logger.debug("The IO queue is now shutdown.") - - def put(self, item): - # Note: this is not sufficient, it's still possible to deadlock! - # Need to hook into the condition vars used by this class. - with self._shutdown_lock: - if self._shutdown: - raise QueueShutdownError("Cannot put item to queue when " - "queue has been shutdown.") - return queue.Queue.put(self, item) - - -class MultipartDownloader(object): - def __init__(self, client, config, osutil, - executor_cls=futures.ThreadPoolExecutor): - self._client = client - self._config = config - self._os = osutil - self._executor_cls = executor_cls - self._ioqueue = ShutdownQueue(self._config.max_io_queue) - - def download_file(self, bucket, key, filename, object_size, - extra_args, callback=None): - with self._executor_cls(max_workers=2) as controller: - # 1 thread for the future that manages the uploading of files - # 1 thread for the future that manages IO writes. - download_parts_handler = functools.partial( - self._download_file_as_future, - bucket, key, filename, object_size, callback) - parts_future = controller.submit(download_parts_handler) - - io_writes_handler = functools.partial( - self._perform_io_writes, filename) - io_future = controller.submit(io_writes_handler) - results = futures.wait([parts_future, io_future], - return_when=futures.FIRST_EXCEPTION) - self._process_future_results(results) - - def _process_future_results(self, futures): - finished, unfinished = futures - for future in finished: - future.result() - - def _download_file_as_future(self, bucket, key, filename, object_size, - callback): - part_size = self._config.multipart_chunksize - num_parts = int(math.ceil(object_size / float(part_size))) - max_workers = self._config.max_concurrency - download_partial = functools.partial( - self._download_range, bucket, key, filename, - part_size, num_parts, callback) - try: - with self._executor_cls(max_workers=max_workers) as executor: - list(executor.map(download_partial, range(num_parts))) - finally: - self._ioqueue.put(SHUTDOWN_SENTINEL) - - def _calculate_range_param(self, part_size, part_index, num_parts): - start_range = part_index * part_size - if part_index == num_parts - 1: - end_range = '' - else: - end_range = start_range + part_size - 1 - range_param = 'bytes=%s-%s' % (start_range, end_range) - return range_param - - def _download_range(self, bucket, key, filename, - part_size, num_parts, callback, part_index): - try: - range_param = self._calculate_range_param( - part_size, part_index, num_parts) - - max_attempts = self._config.num_download_attempts - last_exception = None - for i in range(max_attempts): - try: - logger.debug("Making get_object call.") - response = self._client.get_object( - Bucket=bucket, Key=key, Range=range_param) - streaming_body = StreamReaderProgress( - response['Body'], callback) - buffer_size = 1024 * 16 - current_index = part_size * part_index - for chunk in iter(lambda: streaming_body.read(buffer_size), - b''): - self._ioqueue.put((current_index, chunk)) - current_index += len(chunk) - return - except (socket.timeout, socket.error, - ReadTimeoutError, IncompleteReadError) as e: - logger.debug("Retrying exception caught (%s), " - "retrying request, (attempt %s / %s)", e, i, - max_attempts, exc_info=True) - last_exception = e - continue - raise RetriesExceededError(last_exception) - finally: - logger.debug("EXITING _download_range for part: %s", part_index) - - def _perform_io_writes(self, filename): - with self._os.open(filename, 'wb') as f: - while True: - task = self._ioqueue.get() - if task is SHUTDOWN_SENTINEL: - logger.debug("Shutdown sentinel received in IO handler, " - "shutting down IO handler.") - return - else: - try: - offset, data = task - f.seek(offset) - f.write(data) - except Exception as e: - logger.debug("Caught exception in IO thread: %s", - e, exc_info=True) - self._ioqueue.trigger_shutdown() - raise - - -class TransferConfig(object): - def __init__(self, - multipart_threshold=8 * MB, - max_concurrency=10, - multipart_chunksize=8 * MB, - num_download_attempts=5, - max_io_queue=100): - self.multipart_threshold = multipart_threshold - self.max_concurrency = max_concurrency - self.multipart_chunksize = multipart_chunksize - self.num_download_attempts = num_download_attempts - self.max_io_queue = max_io_queue - - -class S3Transfer(object): - - ALLOWED_DOWNLOAD_ARGS = [ - 'VersionId', - 'SSECustomerAlgorithm', - 'SSECustomerKey', - 'SSECustomerKeyMD5', - 'RequestPayer', - ] - - ALLOWED_UPLOAD_ARGS = [ - 'ACL', - 'CacheControl', - 'ContentDisposition', - 'ContentEncoding', - 'ContentLanguage', - 'ContentType', - 'Expires', - 'GrantFullControl', - 'GrantRead', - 'GrantReadACP', - 'GrantWriteACL', - 'Metadata', - 'RequestPayer', - 'ServerSideEncryption', - 'StorageClass', - 'SSECustomerAlgorithm', - 'SSECustomerKey', - 'SSECustomerKeyMD5', - 'SSEKMSKeyId', - ] - - def __init__(self, client, config=None, osutil=None): - self._client = client - if config is None: - config = TransferConfig() - self._config = config - if osutil is None: - osutil = OSUtils() - self._osutil = osutil - - def upload_file(self, filename, bucket, key, - callback=None, extra_args=None): - """Upload a file to an S3 object. - - Variants have also been injected into S3 client, Bucket and Object. - You don't have to use S3Transfer.upload_file() directly. - """ - if extra_args is None: - extra_args = {} - self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS) - events = self._client.meta.events - events.register_first('request-created.s3', - disable_upload_callbacks, - unique_id='s3upload-callback-disable') - events.register_last('request-created.s3', - enable_upload_callbacks, - unique_id='s3upload-callback-enable') - if self._osutil.get_file_size(filename) >= \ - self._config.multipart_threshold: - self._multipart_upload(filename, bucket, key, callback, extra_args) - else: - self._put_object(filename, bucket, key, callback, extra_args) - - def _put_object(self, filename, bucket, key, callback, extra_args): - # We're using open_file_chunk_reader so we can take advantage of the - # progress callback functionality. - open_chunk_reader = self._osutil.open_file_chunk_reader - with open_chunk_reader(filename, 0, - self._osutil.get_file_size(filename), - callback=callback) as body: - self._client.put_object(Bucket=bucket, Key=key, Body=body, - **extra_args) - - def download_file(self, bucket, key, filename, extra_args=None, - callback=None): - """Download an S3 object to a file. - - Variants have also been injected into S3 client, Bucket and Object. - You don't have to use S3Transfer.download_file() directly. - """ - # This method will issue a ``head_object`` request to determine - # the size of the S3 object. This is used to determine if the - # object is downloaded in parallel. - if extra_args is None: - extra_args = {} - self._validate_all_known_args(extra_args, self.ALLOWED_DOWNLOAD_ARGS) - object_size = self._object_size(bucket, key, extra_args) - temp_filename = filename + os.extsep + random_file_extension() - try: - self._download_file(bucket, key, temp_filename, object_size, - extra_args, callback) - except Exception: - logger.debug("Exception caught in download_file, removing partial " - "file: %s", temp_filename, exc_info=True) - self._osutil.remove_file(temp_filename) - raise - else: - self._osutil.rename_file(temp_filename, filename) - - def _download_file(self, bucket, key, filename, object_size, - extra_args, callback): - if object_size >= self._config.multipart_threshold: - self._ranged_download(bucket, key, filename, object_size, - extra_args, callback) - else: - self._get_object(bucket, key, filename, extra_args, callback) - - def _validate_all_known_args(self, actual, allowed): - for kwarg in actual: - if kwarg not in allowed: - raise ValueError( - "Invalid extra_args key '%s', " - "must be one of: %s" % ( - kwarg, ', '.join(allowed))) - - def _ranged_download(self, bucket, key, filename, object_size, - extra_args, callback): - downloader = MultipartDownloader(self._client, self._config, - self._osutil) - downloader.download_file(bucket, key, filename, object_size, - extra_args, callback) - - def _get_object(self, bucket, key, filename, extra_args, callback): - # precondition: num_download_attempts > 0 - max_attempts = self._config.num_download_attempts - last_exception = None - for i in range(max_attempts): - try: - return self._do_get_object(bucket, key, filename, - extra_args, callback) - except (socket.timeout, socket.error, - ReadTimeoutError, IncompleteReadError) as e: - # TODO: we need a way to reset the callback if the - # download failed. - logger.debug("Retrying exception caught (%s), " - "retrying request, (attempt %s / %s)", e, i, - max_attempts, exc_info=True) - last_exception = e - continue - raise RetriesExceededError(last_exception) - - def _do_get_object(self, bucket, key, filename, extra_args, callback): - response = self._client.get_object(Bucket=bucket, Key=key, - **extra_args) - streaming_body = StreamReaderProgress( - response['Body'], callback) - with self._osutil.open(filename, 'wb') as f: - for chunk in iter(lambda: streaming_body.read(8192), b''): - f.write(chunk) - - def _object_size(self, bucket, key, extra_args): - return self._client.head_object( - Bucket=bucket, Key=key, **extra_args)['ContentLength'] - - def _multipart_upload(self, filename, bucket, key, callback, extra_args): - uploader = MultipartUploader(self._client, self._config, self._osutil) - uploader.upload_file(filename, bucket, key, callback, extra_args) diff --git a/tests/integration/test_legacy.py b/tests/integration/test_s3transfer.py similarity index 96% rename from tests/integration/test_legacy.py rename to tests/integration/test_s3transfer.py index 379dd507..56cd8bc4 100644 --- a/tests/integration/test_legacy.py +++ b/tests/integration/test_s3transfer.py @@ -23,7 +23,7 @@ from botocore.compat import six from botocore.client import Config -import s3transfer.legacy +import s3transfer urlopen = six.moves.urllib.request.urlopen @@ -105,7 +105,7 @@ def full_path(self, filename): class TestS3Transfers(unittest.TestCase): - """Tests for the high level s3transfer.legacy module.""" + """Tests for the high level s3transfer module.""" @classmethod def setUpClass(cls): @@ -138,7 +138,7 @@ def object_exists(self, key): return True def create_s3_transfer(self, config=None): - return s3transfer.legacy.S3Transfer(self.client, + return s3transfer.S3Transfer(self.client, config=config) def assert_has_public_read_acl(self, response): @@ -148,7 +148,7 @@ def assert_has_public_read_acl(self, response): self.assertIn('groups/global/AllUsers', public_read[0]) def test_upload_below_threshold(self): - config = s3transfer.legacy.TransferConfig( + config = s3transfer.TransferConfig( multipart_threshold=2 * 1024 * 1024) transfer = self.create_s3_transfer(config) filename = self.files.create_file_with_size( @@ -160,7 +160,7 @@ def test_upload_below_threshold(self): self.assertTrue(self.object_exists('foo.txt')) def test_upload_above_threshold(self): - config = s3transfer.legacy.TransferConfig( + config = s3transfer.TransferConfig( multipart_threshold=2 * 1024 * 1024) transfer = self.create_s3_transfer(config) filename = self.files.create_file_with_size( @@ -171,7 +171,7 @@ def test_upload_above_threshold(self): self.assertTrue(self.object_exists('20mb.txt')) def test_upload_file_above_threshold_with_acl(self): - config = s3transfer.legacy.TransferConfig( + config = s3transfer.TransferConfig( multipart_threshold=5 * 1024 * 1024) transfer = self.create_s3_transfer(config) filename = self.files.create_file_with_size( @@ -192,7 +192,7 @@ def test_upload_file_above_threshold_with_ssec(self): 'SSECustomerKey': key_bytes, 'SSECustomerAlgorithm': 'AES256', } - config = s3transfer.legacy.TransferConfig( + config = s3transfer.TransferConfig( multipart_threshold=5 * 1024 * 1024) transfer = self.create_s3_transfer(config) filename = self.files.create_file_with_size( @@ -242,7 +242,7 @@ def progress_callback(amount): client = self.session.create_client( 's3', self.region, config=Config(signature_version='s3v4')) - transfer = s3transfer.legacy.S3Transfer(client) + transfer = s3transfer.S3Transfer(client) filename = self.files.create_file_with_size( '10mb.txt', filesize=10 * 1024 * 1024) transfer.upload_file(filename, self.bucket_name, @@ -263,7 +263,7 @@ def test_can_send_extra_params_on_upload(self): self.assert_has_public_read_acl(response) def test_can_configure_threshold(self): - config = s3transfer.legacy.TransferConfig( + config = s3transfer.TransferConfig( multipart_threshold=6 * 1024 * 1024 ) transfer = self.create_s3_transfer(config) diff --git a/tests/unit/test_legacy.py b/tests/unit/test_s3transfer.py similarity index 97% rename from tests/unit/test_legacy.py rename to tests/unit/test_s3transfer.py index 3dd4481f..a87b4ddb 100644 --- a/tests/unit/test_legacy.py +++ b/tests/unit/test_s3transfer.py @@ -23,14 +23,14 @@ from s3transfer.exceptions import RetriesExceededError from s3transfer.exceptions import S3UploadFailedError -from s3transfer.legacy import ReadFileChunk, StreamReaderProgress -from s3transfer.legacy import S3Transfer -from s3transfer.legacy import OSUtils, TransferConfig -from s3transfer.legacy import MultipartDownloader, MultipartUploader -from s3transfer.legacy import ShutdownQueue -from s3transfer.legacy import QueueShutdownError -from s3transfer.legacy import random_file_extension -from s3transfer.legacy import disable_upload_callbacks, enable_upload_callbacks +from s3transfer import ReadFileChunk, StreamReaderProgress +from s3transfer import S3Transfer +from s3transfer import OSUtils, TransferConfig +from s3transfer import MultipartDownloader, MultipartUploader +from s3transfer import ShutdownQueue +from s3transfer import QueueShutdownError +from s3transfer import random_file_extension +from s3transfer import disable_upload_callbacks, enable_upload_callbacks class InMemoryOSLayer(OSUtils): @@ -98,7 +98,7 @@ def test_get_file_size(self): m.assert_called_with('myfile') def test_open_file_chunk_reader(self): - with mock.patch('s3transfer.legacy.ReadFileChunk') as m: + with mock.patch('s3transfer.ReadFileChunk') as m: OSUtils().open_file_chunk_reader('myfile', 0, 100, None) m.from_filename.assert_called_with('myfile', 0, 100, None, enable_callback=False) @@ -489,7 +489,7 @@ class TestS3Transfer(unittest.TestCase): def setUp(self): self.client = mock.Mock() self.random_file_patch = mock.patch( - 's3transfer.legacy.random_file_extension') + 's3transfer.random_file_extension') self.random_file = self.random_file_patch.start() self.random_file.return_value = 'RANDOM' @@ -538,7 +538,7 @@ def test_extra_args_on_uploaded_passed_to_api_call(self): ) def test_uses_multipart_upload_when_over_threshold(self): - with mock.patch('s3transfer.legacy.MultipartUploader') as uploader: + with mock.patch('s3transfer.MultipartUploader') as uploader: fake_files = { 'smallfile': b'foobar', } @@ -552,7 +552,7 @@ def test_uses_multipart_upload_when_over_threshold(self): 'smallfile', 'bucket', 'key', None, {}) def test_uses_multipart_download_when_over_threshold(self): - with mock.patch('s3transfer.legacy.MultipartDownloader') as downloader: + with mock.patch('s3transfer.MultipartDownloader') as downloader: osutil = InMemoryOSLayer({}) over_multipart_threshold = 100 * 1024 * 1024 transfer = S3Transfer(self.client, osutil=osutil)