From c449942347cd91f4e7ef837e0b17c39b99190bcc Mon Sep 17 00:00:00 2001 From: James Saryerwinnie Date: Mon, 11 Apr 2016 10:11:06 -0700 Subject: [PATCH] Shutdown task executor before aborting multipart uploads There's a race condition in which multipart uploads can still be kept alive because aborting a multipart uploads can happen while existing tasks are still being executed. We must shutdown the executor before we can then abort multipart uploads. This order was correct in the normal shutdown case, but for the case where a user 'Ctrl-C's the process or where unexpected exceptions propogate back to the S3 handler, the order was reversed. In other words, the race condition exposed two possibilities: Proper cleanup: t1 ---|start_part|-------------|end_part|--------------------- | join t2 ---|start_part|-------------|end_part|--------------------- | join main -----------------|ctrl-c|---------------|abort_upload|----- Bad cleanup: t1 ---|start_part|--------------------------------|end_part|-- | join t2 ---|start_part|--------------------------------|end_part|-- | join main -----------------|ctrl-c|--|abort_upload|------------------ There's also a gap in test coverage here, so I've added a test case for this case. --- awscli/customizations/s3/s3handler.py | 21 +++++--- .../customizations/s3/test_plugin.py | 50 ++++++++++++++----- 2 files changed, 51 insertions(+), 20 deletions(-) diff --git a/awscli/customizations/s3/s3handler.py b/awscli/customizations/s3/s3handler.py index 37a0cf5c3993..dd2414f2495d 100644 --- a/awscli/customizations/s3/s3handler.py +++ b/awscli/customizations/s3/s3handler.py @@ -101,28 +101,35 @@ def call(self, files): self.executor.print_thread.set_total_files(total_files) self.executor.print_thread.set_total_parts(total_parts) self.executor.initiate_shutdown() - self.executor.wait_until_shutdown() - self._shutdown() + self._finalize_shutdown() except Exception as e: LOGGER.debug('Exception caught during task execution: %s', str(e), exc_info=True) self.result_queue.put(PrintTask(message=str(e), error=True)) self.executor.initiate_shutdown( priority=self.executor.IMMEDIATE_PRIORITY) - self._shutdown() - self.executor.wait_until_shutdown() + self._finalize_shutdown() except KeyboardInterrupt: self.result_queue.put(PrintTask(message=("Cleaning up. " "Please wait..."), error=True)) self.executor.initiate_shutdown( priority=self.executor.IMMEDIATE_PRIORITY) - self._shutdown() - self.executor.wait_until_shutdown() + self._finalize_shutdown() return CommandResult(self.executor.num_tasks_failed, self.executor.num_tasks_warned) - def _shutdown(self): + def _finalize_shutdown(self): + # Run all remaining tasks needed to completely shutdown the + # S3 handler. This method will block until shutdown is complete. + # The order here is important. We need to wait until all the + # tasks have been completed before we can cleanup. Otherwise + # we can have race conditions where we're trying to cleanup + # uploads/downloads that are still in progress. + self.executor.wait_until_shutdown() + self._cleanup() + + def _cleanup(self): # And finally we need to make a pass through all the existing # multipart uploads and abort any pending multipart uploads. self._abort_pending_multipart_uploads() diff --git a/tests/integration/customizations/s3/test_plugin.py b/tests/integration/customizations/s3/test_plugin.py index b1c67fab5685..5bebdf807062 100644 --- a/tests/integration/customizations/s3/test_plugin.py +++ b/tests/integration/customizations/s3/test_plugin.py @@ -28,8 +28,6 @@ import shutil import copy -import botocore.session -from botocore.exceptions import ClientError from awscli.compat import six from nose.plugins.attrib import attr @@ -62,6 +60,19 @@ def aws(command, collect_memory=False, env_vars=None, wait_for_finish=True, input_file=input_file) +def wait_for_process_exit(process, timeout=60): + deadline = time.time() + timeout + while time.time() < deadline: + rc = process.poll() + if rc is not None: + break + time.sleep(1) + else: + process.kill() + raise AssertionError("CLI did not exist within %s seconds of " + "receiving a Ctrl+C" % timeout) + + def _running_on_rhel(): return ( hasattr(platform, 'linux_distribution') and @@ -351,26 +362,39 @@ def test_download_ctrl_c_does_not_hang(self): process = aws('s3 cp s3://%s/foo.txt %s' % (bucket_name, local_foo_txt), wait_for_finish=False) # Give it some time to start up and enter it's main task loop. - time.sleep(1) + time.sleep(2) # The process has 60 seconds to finish after being sent a Ctrl+C, # otherwise the test fails. process.send_signal(signal.SIGINT) - deadline = time.time() + 60 - while time.time() < deadline: - rc = process.poll() - if rc is not None: - break - time.sleep(1) - else: - process.kill() - self.fail("CLI did not exist within 30 seconds of " - "receiving a Ctrl+C") + wait_for_process_exit(process, timeout=60) # A Ctrl+C should have a non-zero RC. # We either caught the process in # its main polling loop (rc=1), or it was successfully terminated by # the SIGINT (rc=-2). self.assertIn(process.returncode, [1, -2]) + @attr('slow') + @skip_if_windows('SIGINT not supported on Windows.') + def test_cleans_up_aborted_uploads(self): + bucket_name = self.create_bucket() + foo_txt = self.files.create_file('foo.txt', '') + with open(foo_txt, 'wb') as f: + for i in range(20): + f.write(b'a' * 1024 * 1024) + + process = aws('s3 cp %s s3://%s/' % (foo_txt, bucket_name), + wait_for_finish=False) + time.sleep(3) + # The process has 60 seconds to finish after being sent a Ctrl+C, + # otherwise the test fails. + process.send_signal(signal.SIGINT) + wait_for_process_exit(process, timeout=60) + uploads_after = self.client.list_multipart_uploads( + Bucket=bucket_name).get('Uploads', []) + self.assertEqual(uploads_after, [], + "Not all multipart uploads were properly " + "aborted after receiving Ctrl-C: %s" % uploads_after) + def test_cp_to_nonexistent_bucket(self): foo_txt = self.files.create_file('foo.txt', 'this is foo.txt') p = aws('s3 cp %s s3://noexist-bucket-foo-bar123/foo.txt' % (foo_txt,))