Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shutdown task executor before aborting multipart uploads #1905

Merged
merged 1 commit into from
Apr 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions awscli/customizations/s3/s3handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,28 +101,35 @@ def call(self, files):
self.executor.print_thread.set_total_files(total_files)
self.executor.print_thread.set_total_parts(total_parts)
self.executor.initiate_shutdown()
self.executor.wait_until_shutdown()
self._shutdown()
self._finalize_shutdown()
except Exception as e:
LOGGER.debug('Exception caught during task execution: %s',
str(e), exc_info=True)
self.result_queue.put(PrintTask(message=str(e), error=True))
self.executor.initiate_shutdown(
priority=self.executor.IMMEDIATE_PRIORITY)
self._shutdown()
self.executor.wait_until_shutdown()
self._finalize_shutdown()
except KeyboardInterrupt:
self.result_queue.put(PrintTask(message=("Cleaning up. "
"Please wait..."),
error=True))
self.executor.initiate_shutdown(
priority=self.executor.IMMEDIATE_PRIORITY)
self._shutdown()
self.executor.wait_until_shutdown()
self._finalize_shutdown()
return CommandResult(self.executor.num_tasks_failed,
self.executor.num_tasks_warned)

def _shutdown(self):
def _finalize_shutdown(self):
# Run all remaining tasks needed to completely shutdown the
# S3 handler. This method will block until shutdown is complete.
# The order here is important. We need to wait until all the
# tasks have been completed before we can cleanup. Otherwise
# we can have race conditions where we're trying to cleanup
# uploads/downloads that are still in progress.
self.executor.wait_until_shutdown()
self._cleanup()

def _cleanup(self):
# And finally we need to make a pass through all the existing
# multipart uploads and abort any pending multipart uploads.
self._abort_pending_multipart_uploads()
Expand Down
50 changes: 37 additions & 13 deletions tests/integration/customizations/s3/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import shutil
import copy

import botocore.session
from botocore.exceptions import ClientError
from awscli.compat import six
from nose.plugins.attrib import attr

Expand Down Expand Up @@ -62,6 +60,19 @@ def aws(command, collect_memory=False, env_vars=None, wait_for_finish=True,
input_file=input_file)


def wait_for_process_exit(process, timeout=60):
deadline = time.time() + timeout
while time.time() < deadline:
rc = process.poll()
if rc is not None:
break
time.sleep(1)
else:
process.kill()
raise AssertionError("CLI did not exist within %s seconds of "
"receiving a Ctrl+C" % timeout)


def _running_on_rhel():
return (
hasattr(platform, 'linux_distribution') and
Expand Down Expand Up @@ -351,26 +362,39 @@ def test_download_ctrl_c_does_not_hang(self):
process = aws('s3 cp s3://%s/foo.txt %s' %
(bucket_name, local_foo_txt), wait_for_finish=False)
# Give it some time to start up and enter it's main task loop.
time.sleep(1)
time.sleep(2)
# The process has 60 seconds to finish after being sent a Ctrl+C,
# otherwise the test fails.
process.send_signal(signal.SIGINT)
deadline = time.time() + 60
while time.time() < deadline:
rc = process.poll()
if rc is not None:
break
time.sleep(1)
else:
process.kill()
self.fail("CLI did not exist within 30 seconds of "
"receiving a Ctrl+C")
wait_for_process_exit(process, timeout=60)
# A Ctrl+C should have a non-zero RC.
# We either caught the process in
# its main polling loop (rc=1), or it was successfully terminated by
# the SIGINT (rc=-2).
self.assertIn(process.returncode, [1, -2])

@attr('slow')
@skip_if_windows('SIGINT not supported on Windows.')
def test_cleans_up_aborted_uploads(self):
bucket_name = self.create_bucket()
foo_txt = self.files.create_file('foo.txt', '')
with open(foo_txt, 'wb') as f:
for i in range(20):
f.write(b'a' * 1024 * 1024)

process = aws('s3 cp %s s3://%s/' % (foo_txt, bucket_name),
wait_for_finish=False)
time.sleep(3)
# The process has 60 seconds to finish after being sent a Ctrl+C,
# otherwise the test fails.
process.send_signal(signal.SIGINT)
wait_for_process_exit(process, timeout=60)
uploads_after = self.client.list_multipart_uploads(
Bucket=bucket_name).get('Uploads', [])
self.assertEqual(uploads_after, [],
"Not all multipart uploads were properly "
"aborted after receiving Ctrl-C: %s" % uploads_after)

def test_cp_to_nonexistent_bucket(self):
foo_txt = self.files.create_file('foo.txt', 'this is foo.txt')
p = aws('s3 cp %s s3://noexist-bucket-foo-bar123/foo.txt' % (foo_txt,))
Expand Down