diff --git a/.idea/workspace.xml b/.idea/workspace.xml new file mode 100644 index 00000000..126dab29 --- /dev/null +++ b/.idea/workspace.xml @@ -0,0 +1,280 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + { + "keyToString": { + "RunOnceActivity.OpenProjectViewOnStart": "true", + "RunOnceActivity.ShowReadmeOnStart": "true", + "RunOnceActivity.go.formatter.settings.were.checked": "true", + "RunOnceActivity.go.migrated.go.modules.settings": "true", + "WebServerToolWindowFactoryState": "false", + "go.import.settings.migrated": "true", + "last_opened_file_path": "/usr/local/google/home/tulsishah/fork-testbench/storage-testbench", + "node.js.detected.package.eslint": "true", + "node.js.detected.package.tslint": "true", + "node.js.selected.package.eslint": "(autodetect)", + "node.js.selected.package.tslint": "(autodetect)", + "nodejs_package_manager_path": "npm", + "project.structure.last.edited": "Project", + "project.structure.proportion": "0.15", + "project.structure.side.proportion": "0.2", + "vue.rearranger.settings.migration": "true" + } +} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + 1729861767767 + + + + + + + true + + + + + \ No newline at end of file diff --git a/testbench/rest_server.py b/testbench/rest_server.py index 72a0ae88..0e87fc90 100644 --- a/testbench/rest_server.py +++ b/testbench/rest_server.py @@ -979,6 +979,22 @@ def object_insert(bucket_name): blob, projection = gcs_type.object.Object.init_media(flask.request, bucket) elif upload_type == "multipart": blob, projection = gcs_type.object.Object.init_multipart(flask.request, bucket) + testbench.common.extract_instruction(request, context=None) + ( + error_code, + after_bytes, + test_id, + ) = testbench.common.get_retry_uploads_error_after_bytes(db, request) + if error_code and len(blob.media) >= after_bytes: + if test_id: + db.dequeue_next_instruction(test_id, "storage.objects.insert") + testbench.error.generic( + "Fault injected during a resumable upload", + rest_code=error_code, + grpc_code=None, + context=None, + ) + # Handle stall for full uploads. testbench.common.extract_instruction(request, context=None) ( diff --git a/tests/test_testbench_retry.py b/tests/test_testbench_retry.py index 0030993e..7fd3e733 100644 --- a/tests/test_testbench_retry.py +++ b/tests/test_testbench_retry.py @@ -36,581 +36,825 @@ class TestTestbenchRetry(unittest.TestCase): - def setUp(self): - rest_server.db.clear() - self.client = rest_server.server.test_client() - # Avoid magic buckets in the test - os.environ.pop("GOOGLE_CLOUD_CPP_STORAGE_TEST_BUCKET_NAME", None) - - def test_retry_test_supported_operations(self): - BUCKET_OPERATIONS = { - "storage.buckets." + op - for op in [ - "list", - "insert", - "get", - "update", - "patch", - "delete", - "getIamPolicy", - "setIamPolicy", - "testIamPermissions", - "lockRetentionPolicy", - ] - } - BUCKET_ACL_OPERATIONS = { - "storage.bucket_acl." + op - for op in ["list", "insert", "get", "update", "patch", "delete"] - } - BUCKET_DEFAULT_OBJECT_ACL_OPERATIONS = { - "storage.default_object_acl." + op - for op in ["list", "insert", "get", "update", "patch", "delete"] - } - NOTIFICATION_OPERATIONS = { - "storage.notifications." + op for op in ["list", "insert", "get", "delete"] - } - OBJECT_OPERATIONS = { - "storage.objects." + op - for op in [ - "list", - "insert", - "get", - "update", - "patch", - "delete", - "compose", - "copy", - "rewrite", - ] - } - OBJECT_ACL_OPERATIONS = { - "storage.object_acl." + op - for op in ["list", "insert", "get", "update", "patch", "delete"] - } - PROJECT_OPERATIONS = {"storage.serviceaccount.get"} | { - "storage.hmacKey." + op - for op in [ - "create", - "list", - "delete", - "get", - "update", - ] - } - groups = { - "buckets": BUCKET_OPERATIONS, - "bucket_acl": BUCKET_ACL_OPERATIONS, - "bucket_default_object_acl": BUCKET_DEFAULT_OBJECT_ACL_OPERATIONS, - "notifications": NOTIFICATION_OPERATIONS, - "objects": OBJECT_OPERATIONS, - "object_acl": OBJECT_ACL_OPERATIONS, - "project": PROJECT_OPERATIONS, - } - all = set(rest_server.db.supported_methods()) - for name, operations in groups.items(): - self.assertEqual(all, all | operations, msg=name) - - @staticmethod - def _create_valid_chunk(): - line = "How vexingly quick daft zebras jump!" - pad = (255 - len(line)) * " " - line = line + pad + "\n" - return 1024 * line - - def test_retry_test_crud(self): - self.assertIn("storage.buckets.list", rest_server.db.supported_methods()) - response = self.client.post( - "/retry_test", - data=json.dumps({"instructions": {"storage.buckets.list": ["return-429"]}}), - ) - self.assertEqual(response.status_code, 200, msg=response.data) - self.assertTrue( - response.headers.get("content-type").startswith("application/json") - ) - create_rest = json.loads(response.data) - self.assertIn("id", create_rest) - - response = self.client.get("/retry_test/" + create_rest.get("id")) - self.assertEqual(response.status_code, 200) - self.assertTrue( - response.headers.get("content-type").startswith("application/json") - ) - get_rest = json.loads(response.data) - self.assertEqual(get_rest, create_rest) - - response = self.client.get("/retry_tests") - self.assertEqual(response.status_code, 200) - self.assertTrue( - response.headers.get("content-type").startswith("application/json") - ) - list_rest = json.loads(response.data) - ids = [test.get("id") for test in list_rest.get("retry_test", [])] - self.assertEqual(ids, [create_rest.get("id")], msg=response.data) - - response = self.client.delete("/retry_test/" + create_rest.get("id")) - self.assertEqual(response.status_code, 200) - # Once deleted, getting the test should fail. - response = self.client.get("/retry_test/" + create_rest.get("id")) - self.assertEqual(response.status_code, 404) - - def test_retry_test_create_invalid(self): - response = self.client.post("/retry_test", data=json.dumps({})) - self.assertEqual(response.status_code, 400) - - def test_retry_test_get_notfound(self): - response = self.client.get("/retry_test/invalid-id") - self.assertEqual(response.status_code, 404) - - def test_retry_test_return_error(self): - response = self.client.post( - "/retry_test", - data=json.dumps({"instructions": {"storage.buckets.list": ["return-429"]}}), - ) - self.assertEqual(response.status_code, 200) - self.assertTrue( - response.headers.get("content-type").startswith("application/json") - ) - create_rest = json.loads(response.data) - self.assertIn("id", create_rest) - - list_response = self.client.get( - "/storage/v1/b", - query_string={"project": "test-project-unused"}, - headers={"x-retry-test-id": create_rest.get("id")}, - ) - self.assertEqual(list_response.status_code, 429, msg=list_response.data) - - @staticmethod - def _create_block(desired_kib): - line = "A" * 127 + "\n" - return int(desired_kib / len(line)) * line - - def test_retry_test_return_reset_connection(self): - response = self.client.post( - "/storage/v1/b", data=json.dumps({"name": "bucket-name"}) - ) - self.assertEqual(response.status_code, 200) - # Use the XML API to inject an object with some data. - media = self._create_block(256) - response = self.client.put( - "/bucket-name/256k.txt", - content_type="text/plain", - data=media, - ) - self.assertEqual(response.status_code, 200) - - # Setup a failure for reading back the object. - response = self.client.post( - "/retry_test", - data=json.dumps( - {"instructions": {"storage.objects.get": ["return-reset-connection"]}} - ), - ) - self.assertEqual(response.status_code, 200) - self.assertTrue( - response.headers.get("content-type").startswith("application/json") - ) - create_rest = json.loads(response.data) - self.assertIn("id", create_rest) - id = create_rest.get("id") - - response = self.client.get( - "/storage/v1/b/bucket-name/o/256k.txt", - query_string={"alt": "media"}, - headers={"x-retry-test-id": id}, - ) - self.assertEqual(response.status_code, 500) - error = json.loads(response.data) - self.assertIn( - "connection reset by peer", - error.get("error", dict()).get("message"), - msg=response.data, - ) - - def test_retry_test_return_no_metadata_on_resumable_complete(self): - response = self.client.post( - "/storage/v1/b", data=json.dumps({"name": "bucket-name"}) - ) - self.assertEqual(response.status_code, 200) - - # Setup a error for resumable upload to respond with a 200 without object metadata returned - bytes_returned = 0 - response = self.client.post( - "/retry_test", - data=json.dumps( - { - "instructions": { - "storage.objects.insert": [ - "return-broken-stream-final-chunk-after-%dB" - % bytes_returned - ] - } + def setUp(self): + rest_server.db.clear() + self.client = rest_server.server.test_client() + # Avoid magic buckets in the test + os.environ.pop("GOOGLE_CLOUD_CPP_STORAGE_TEST_BUCKET_NAME", None) + + def test_retry_test_supported_operations(self): + BUCKET_OPERATIONS = { + "storage.buckets." + op + for op in [ + "list", + "insert", + "get", + "update", + "patch", + "delete", + "getIamPolicy", + "setIamPolicy", + "testIamPermissions", + "lockRetentionPolicy", + ] + } + BUCKET_ACL_OPERATIONS = { + "storage.bucket_acl." + op + for op in ["list", "insert", "get", "update", "patch", "delete"] + } + BUCKET_DEFAULT_OBJECT_ACL_OPERATIONS = { + "storage.default_object_acl." + op + for op in ["list", "insert", "get", "update", "patch", "delete"] + } + NOTIFICATION_OPERATIONS = { + "storage.notifications." + op for op in + ["list", "insert", "get", "delete"] + } + OBJECT_OPERATIONS = { + "storage.objects." + op + for op in [ + "list", + "insert", + "get", + "update", + "patch", + "delete", + "compose", + "copy", + "rewrite", + ] + } + OBJECT_ACL_OPERATIONS = { + "storage.object_acl." + op + for op in ["list", "insert", "get", "update", "patch", "delete"] + } + PROJECT_OPERATIONS = {"storage.serviceaccount.get"} | { + "storage.hmacKey." + op + for op in [ + "create", + "list", + "delete", + "get", + "update", + ] + } + groups = { + "buckets": BUCKET_OPERATIONS, + "bucket_acl": BUCKET_ACL_OPERATIONS, + "bucket_default_object_acl": BUCKET_DEFAULT_OBJECT_ACL_OPERATIONS, + "notifications": NOTIFICATION_OPERATIONS, + "objects": OBJECT_OPERATIONS, + "object_acl": OBJECT_ACL_OPERATIONS, + "project": PROJECT_OPERATIONS, + } + all = set(rest_server.db.supported_methods()) + for name, operations in groups.items(): + self.assertEqual(all, all | operations, msg=name) + + @staticmethod + def _create_valid_chunk(): + line = "How vexingly quick daft zebras jump!" + pad = (255 - len(line)) * " " + line = line + pad + "\n" + return 1024 * line + + def test_retry_test_crud(self): + self.assertIn("storage.buckets.list", rest_server.db.supported_methods()) + response = self.client.post( + "/retry_test", + data=json.dumps( + {"instructions": {"storage.buckets.list": ["return-429"]}}), + ) + self.assertEqual(response.status_code, 200, msg=response.data) + self.assertTrue( + response.headers.get("content-type").startswith("application/json") + ) + create_rest = json.loads(response.data) + self.assertIn("id", create_rest) + + response = self.client.get("/retry_test/" + create_rest.get("id")) + self.assertEqual(response.status_code, 200) + self.assertTrue( + response.headers.get("content-type").startswith("application/json") + ) + get_rest = json.loads(response.data) + self.assertEqual(get_rest, create_rest) + + response = self.client.get("/retry_tests") + self.assertEqual(response.status_code, 200) + self.assertTrue( + response.headers.get("content-type").startswith("application/json") + ) + list_rest = json.loads(response.data) + ids = [test.get("id") for test in list_rest.get("retry_test", [])] + self.assertEqual(ids, [create_rest.get("id")], msg=response.data) + + response = self.client.delete("/retry_test/" + create_rest.get("id")) + self.assertEqual(response.status_code, 200) + # Once deleted, getting the test should fail. + response = self.client.get("/retry_test/" + create_rest.get("id")) + self.assertEqual(response.status_code, 404) + + def test_retry_test_create_invalid(self): + response = self.client.post("/retry_test", data=json.dumps({})) + self.assertEqual(response.status_code, 400) + + def test_retry_test_get_notfound(self): + response = self.client.get("/retry_test/invalid-id") + self.assertEqual(response.status_code, 404) + + def test_retry_test_return_error(self): + response = self.client.post( + "/retry_test", + data=json.dumps( + {"instructions": {"storage.buckets.list": ["return-429"]}}), + ) + self.assertEqual(response.status_code, 200) + self.assertTrue( + response.headers.get("content-type").startswith("application/json") + ) + create_rest = json.loads(response.data) + self.assertIn("id", create_rest) + + list_response = self.client.get( + "/storage/v1/b", + query_string={"project": "test-project-unused"}, + headers={"x-retry-test-id": create_rest.get("id")}, + ) + self.assertEqual(list_response.status_code, 429, msg=list_response.data) + + @staticmethod + def _create_block(desired_kib): + line = "A" * 127 + "\n" + return int(desired_kib / len(line)) * line + + def test_retry_test_return_reset_connection(self): + response = self.client.post( + "/storage/v1/b", data=json.dumps({"name": "bucket-name"}) + ) + self.assertEqual(response.status_code, 200) + # Use the XML API to inject an object with some data. + media = self._create_block(256) + response = self.client.put( + "/bucket-name/256k.txt", + content_type="text/plain", + data=media, + ) + self.assertEqual(response.status_code, 200) + + # Setup a failure for reading back the object. + response = self.client.post( + "/retry_test", + data=json.dumps( + {"instructions": { + "storage.objects.get": ["return-reset-connection"]}} + ), + ) + self.assertEqual(response.status_code, 200) + self.assertTrue( + response.headers.get("content-type").startswith("application/json") + ) + create_rest = json.loads(response.data) + self.assertIn("id", create_rest) + id = create_rest.get("id") + + response = self.client.get( + "/storage/v1/b/bucket-name/o/256k.txt", + query_string={"alt": "media"}, + headers={"x-retry-test-id": id}, + ) + self.assertEqual(response.status_code, 500) + error = json.loads(response.data) + self.assertIn( + "connection reset by peer", + error.get("error", dict()).get("message"), + msg=response.data, + ) + + def test_retry_test_return_no_metadata_on_resumable_complete(self): + response = self.client.post( + "/storage/v1/b", data=json.dumps({"name": "bucket-name"}) + ) + self.assertEqual(response.status_code, 200) + + # Setup a error for resumable upload to respond with a 200 without object metadata returned + bytes_returned = 0 + response = self.client.post( + "/retry_test", + data=json.dumps( + { + "instructions": { + "storage.objects.insert": [ + "return-broken-stream-final-chunk-after-%dB" + % bytes_returned + ] } - ), - ) - self.assertEqual(response.status_code, 200) - self.assertTrue( - response.headers.get("content-type").startswith("application/json") - ) - create_rest = json.loads(response.data) - self.assertIn("id", create_rest) - id = create_rest.get("id") - - response = self.client.post( - "/upload/storage/v1/b/bucket-name/o", - query_string={"uploadType": "resumable", "name": "256kobject"}, - headers={"x-retry-test-id": id}, - ) - self.assertEqual(response.status_code, 200) - location = response.headers.get("location") - - response = self.client.put( - location, - data="test", - headers={"x-retry-test-id": id}, - ) - self.assertEqual(response.status_code, 200) - self.assertTrue( - response.headers.get("content-type").startswith("application/json") - ) - self.assertEqual(len(response.data), bytes_returned) - - def test_retry_test_return_no_metadata_on_resumable_multi_chunk_complete(self): - response = self.client.post( - "/storage/v1/b", data=json.dumps({"name": "bucket-name"}) - ) - self.assertEqual(response.status_code, 200) - - # Setup a error for resumable upload to respond with a 200 without object metadata returned - bytes_returned = 10 - response = self.client.post( - "/retry_test", - data=json.dumps( - { - "instructions": { - "storage.objects.insert": [ - "return-broken-stream-final-chunk-after-%dB" - % bytes_returned - ] - } + } + ), + ) + self.assertEqual(response.status_code, 200) + self.assertTrue( + response.headers.get("content-type").startswith("application/json") + ) + create_rest = json.loads(response.data) + self.assertIn("id", create_rest) + id = create_rest.get("id") + + response = self.client.post( + "/upload/storage/v1/b/bucket-name/o", + query_string={"uploadType": "resumable", "name": "256kobject"}, + headers={"x-retry-test-id": id}, + ) + self.assertEqual(response.status_code, 200) + location = response.headers.get("location") + + response = self.client.put( + location, + data="test", + headers={"x-retry-test-id": id}, + ) + self.assertEqual(response.status_code, 200) + self.assertTrue( + response.headers.get("content-type").startswith("application/json") + ) + self.assertEqual(len(response.data), bytes_returned) + + def test_retry_test_return_no_metadata_on_resumable_multi_chunk_complete( + self): + response = self.client.post( + "/storage/v1/b", data=json.dumps({"name": "bucket-name"}) + ) + self.assertEqual(response.status_code, 200) + + # Setup a error for resumable upload to respond with a 200 without object metadata returned + bytes_returned = 10 + response = self.client.post( + "/retry_test", + data=json.dumps( + { + "instructions": { + "storage.objects.insert": [ + "return-broken-stream-final-chunk-after-%dB" + % bytes_returned + ] } + } + ), + ) + self.assertEqual(response.status_code, 200) + self.assertTrue( + response.headers.get("content-type").startswith("application/json") + ) + create_rest = json.loads(response.data) + self.assertIn("id", create_rest) + id = create_rest.get("id") + chunk = self._create_valid_chunk() + response = self.client.post( + "/upload/storage/v1/b/bucket-name/o", + query_string={"uploadType": "resumable", "name": "256kobject"}, + headers={ + "x-upload-content-length": "%d" % (2 * len(chunk)), + "x-retry-test-id": id, + }, + ) + self.assertEqual(response.status_code, 200) + location = response.headers.get("location") + + # Upload in chunks, but there is not need to specify the ending byte because + # it was set via the x-upload-content-length header. + response = self.client.put( + location, + headers={ + "content-range": "bytes 0-{last:d}/*".format(last=len(chunk) - 1), + "x-upload-content-length": "%d" % (2 * len(chunk)), + "x-retry-test-id": id, + }, + data=chunk, + ) + self.assertEqual(response.status_code, 308, msg=response.data) + + chunk = self._create_valid_chunk() + response = self.client.put( + location, + headers={ + "content-range": "bytes {last:d}-*/*".format(last=len(chunk)), + "x-retry-test-id": id, + }, + data=chunk, + ) + self.assertEqual(response.status_code, 200) + self.assertTrue( + response.headers.get("content-type").startswith("application/json") + ) + self.assertEqual(len(response.data), bytes_returned) + + def test_retry_test_return_broken_stream(self): + response = self.client.post( + "/storage/v1/b", data=json.dumps({"name": "bucket-name"}) + ) + self.assertEqual(response.status_code, 200) + # Use the XML API to inject an object with some data. + media = self._create_block(256) + response = self.client.put( + "/bucket-name/256k.txt", + content_type="text/plain", + data=media, + ) + self.assertEqual(response.status_code, 200) + + # Setup a failure for reading back the object. + response = self.client.post( + "/retry_test", + data=json.dumps( + {"instructions": {"storage.objects.get": ["return-broken-stream"]}} + ), + ) + self.assertEqual(response.status_code, 200) + self.assertTrue( + response.headers.get("content-type").startswith("application/json") + ) + create_rest = json.loads(response.data) + self.assertIn("id", create_rest) + id = create_rest.get("id") + + response = self.client.get( + "/storage/v1/b/bucket-name/o/256k.txt", + query_string={"alt": "media"}, + headers={"x-retry-test-id": id}, + ) + with self.assertRaises(testbench.error.RestException) as ex: + _ = len(response.data) + self.assertIn("broken stream", ex.exception.msg) + + def test_retry_test_return_broken_stream_after_bytes(self): + response = self.client.post( + "/storage/v1/b", data=json.dumps({"name": "bucket-name"}) + ) + self.assertEqual(response.status_code, 200) + # Use the XML API to inject a larger object and smaller object. + media = self._create_block(UPLOAD_QUANTUM) + blob_larger = self.client.put( + "/bucket-name/256k.txt", + content_type="text/plain", + data=media, + ) + self.assertEqual(blob_larger.status_code, 200) + + media = self._create_block(128) + blob_smaller = self.client.put( + "/bucket-name/128.txt", + content_type="text/plain", + data=media, + ) + self.assertEqual(blob_smaller.status_code, 200) + + # Setup a failure for reading back the object. + response = self.client.post( + "/retry_test", + data=json.dumps( + { + "instructions": { + "storage.objects.get": ["return-broken-stream-after-256K"] + } + } + ), + ) + self.assertEqual(response.status_code, 200) + self.assertTrue( + response.headers.get("content-type").startswith("application/json") + ) + create_rest = json.loads(response.data) + self.assertIn("id", create_rest) + id = create_rest.get("id") + + # The 128-bytes file is too small to trigger the "return-504-after-256K" fault injection. + response = self.client.get( + "/storage/v1/b/bucket-name/o/128.txt", + query_string={"alt": "media"}, + headers={"x-retry-test-id": id}, + ) + self.assertEqual(response.status_code, 200, msg=response.data) + + # The 256KiB file triggers the "return-broken-stream-after-256K" fault injection. + response = self.client.get( + "/storage/v1/b/bucket-name/o/256k.txt", + query_string={"alt": "media"}, + headers={"x-retry-test-id": id}, + ) + self.assertIn("x-goog-generation", response.headers) + with self.assertRaises(testbench.error.RestException) as ex: + _ = len(response.data) + self.assertIn("broken stream", ex.exception.msg) + + def test_list_retry_stall_test(self): + response = self.client.post( + "/retry_test", + data=json.dumps( + {"instructions": { + "storage.buckets.list": ["stall-for-1s-after-0K"]}} + ), + ) + self.assertEqual(response.status_code, 200) + self.assertTrue( + response.headers.get("content-type").startswith("application/json") + ) + create_rest = json.loads(response.data) + self.assertIn("id", create_rest) + + start_time = time.perf_counter() + list_response = self.client.get( + "/storage/v1/b", + query_string={"project": "test-project-unused"}, + headers={"x-retry-test-id": create_rest.get("id")}, + ) + self.assertEqual(len(list_response.get_data()), 40) + end_time = time.perf_counter() + elapsed_time = end_time - start_time + self.assertGreater(elapsed_time, 1) + + def test_read_retry_test_stall_after_bytes(self): + response = self.client.post( + "/storage/v1/b", data=json.dumps({"name": "bucket-name"}) + ) + self.assertEqual(response.status_code, 200) + # Use the XML API to inject a larger object and smaller object. + media = self._create_block(UPLOAD_QUANTUM) + blob_larger = self.client.put( + "/bucket-name/256k.txt", + content_type="text/plain", + data=media, + ) + self.assertEqual(blob_larger.status_code, 200) + + media = self._create_block(128) + blob_smaller = self.client.put( + "/bucket-name/128.txt", + content_type="text/plain", + data=media, + ) + self.assertEqual(blob_smaller.status_code, 200) + + # Setup a stall for reading back the object. + response = self.client.post( + "/retry_test", + data=json.dumps( + {"instructions": { + "storage.objects.get": ["stall-for-1s-after-256K"]}} + ), + ) + + self.assertEqual(response.status_code, 200) + self.assertTrue( + response.headers.get("content-type").startswith("application/json") + ) + create_rest = json.loads(response.data) + self.assertIn("id", create_rest) + id = create_rest.get("id") + + start_time = time.perf_counter() + # The 128-bytes file is too small to trigger the "stall-for-1s-after-128K" fault injection. + response = self.client.get( + "/storage/v1/b/bucket-name/o/128.txt", + query_string={"alt": "media"}, + headers={"x-retry-test-id": id}, + ) + self.assertEqual(response.status_code, 200, msg=response.data) + self.assertEqual(len(response.get_data()), 128) + end_time = time.perf_counter() + elapsed_time = end_time - start_time + # This will take less the injected delay (1s). + self.assertLess(elapsed_time, 1) + + start_time = time.perf_counter() + # The 256KiB file triggers the "stall-for-1s-after-128K" and will + # take more than injected delay (1s). + response = self.client.get( + "/storage/v1/b/bucket-name/o/256k.txt", + query_string={"alt": "media"}, + headers={"x-retry-test-id": id}, + ) + self.assertEqual(len(response.get_data()), UPLOAD_QUANTUM) + end_time = time.perf_counter() + elapsed_time = end_time - start_time + self.assertGreater(elapsed_time, 1) + + def test_write_retry_test_stall_after_bytes(self): + response = self.client.post( + "/storage/v1/b", data=json.dumps({"name": "bucket-name"}) + ) + self.assertEqual(response.status_code, 200) + + # Setup a stall for reading back the object. + response = self.client.post( + "/retry_test", + data=json.dumps( + {"instructions": { + "storage.objects.insert": ["stall-for-1s-after-256K"]}} + ), + ) + + self.assertEqual(response.status_code, 200) + self.assertTrue( + response.headers.get("content-type").startswith("application/json") + ) + create_rest = json.loads(response.data) + self.assertIn("id", create_rest) + id = create_rest.get("id") + + response = self.client.post( + "/upload/storage/v1/b/bucket-name/o", + query_string={"uploadType": "resumable", "name": "stall"}, + content_type="application/json", + ) + self.assertEqual(response.status_code, 200) + location = response.headers.get("location") + self.assertIn("upload_id=", location) + match = re.search("[&?]upload_id=([^&]+)", location) + self.assertIsNotNone(match, msg=location) + upload_id = match.group(1) + + # Upload the first 256KiB chunk of data and trigger error. + chunk = self._create_block(UPLOAD_QUANTUM) + self.assertEqual(len(chunk), UPLOAD_QUANTUM) + start_time = time.perf_counter() + response = self.client.put( + "/upload/storage/v1/b/bucket-name/o", + query_string={"upload_id": upload_id}, + headers={ + "content-range": "bytes 0-{len:d}/{obj_size:d}".format( + len=UPLOAD_QUANTUM - 1, obj_size=2 * UPLOAD_QUANTUM ), - ) - self.assertEqual(response.status_code, 200) - self.assertTrue( - response.headers.get("content-type").startswith("application/json") - ) - create_rest = json.loads(response.data) - self.assertIn("id", create_rest) - id = create_rest.get("id") - chunk = self._create_valid_chunk() - response = self.client.post( - "/upload/storage/v1/b/bucket-name/o", - query_string={"uploadType": "resumable", "name": "256kobject"}, - headers={ - "x-upload-content-length": "%d" % (2 * len(chunk)), - "x-retry-test-id": id, - }, - ) - self.assertEqual(response.status_code, 200) - location = response.headers.get("location") - - # Upload in chunks, but there is not need to specify the ending byte because - # it was set via the x-upload-content-length header. - response = self.client.put( - location, - headers={ - "content-range": "bytes 0-{last:d}/*".format(last=len(chunk) - 1), - "x-upload-content-length": "%d" % (2 * len(chunk)), - "x-retry-test-id": id, - }, - data=chunk, - ) - self.assertEqual(response.status_code, 308, msg=response.data) - - chunk = self._create_valid_chunk() - response = self.client.put( - location, - headers={ - "content-range": "bytes {last:d}-*/*".format(last=len(chunk)), - "x-retry-test-id": id, - }, - data=chunk, - ) - self.assertEqual(response.status_code, 200) - self.assertTrue( - response.headers.get("content-type").startswith("application/json") - ) - self.assertEqual(len(response.data), bytes_returned) - - def test_retry_test_return_broken_stream(self): - response = self.client.post( - "/storage/v1/b", data=json.dumps({"name": "bucket-name"}) - ) - self.assertEqual(response.status_code, 200) - # Use the XML API to inject an object with some data. - media = self._create_block(256) - response = self.client.put( - "/bucket-name/256k.txt", - content_type="text/plain", - data=media, - ) - self.assertEqual(response.status_code, 200) - - # Setup a failure for reading back the object. - response = self.client.post( - "/retry_test", - data=json.dumps( - {"instructions": {"storage.objects.get": ["return-broken-stream"]}} + "x-retry-test-id": id, + }, + data=chunk, + ) + end_time = time.perf_counter() + elapsed_time = end_time - start_time + self.assertGreater(elapsed_time, 1) + + # Check the status of a resumable upload. + start_time = time.perf_counter() + response = self.client.put( + "/upload/storage/v1/b/bucket-name/o", + query_string={"upload_id": upload_id}, + headers={ + "content-range": "bytes */*", + "x-retry-test-id": id, + }, + ) + end_time = time.perf_counter() + elapsed_time = end_time - start_time + self.assertLess(elapsed_time, 1) + + # Send a full object upload here to verify testbench can + # (1) trigger error_after_bytes instructions, + # (2) ignore duplicate request bytes and + # (3) return a forced failure with partial data. + start_time = time.perf_counter() + chunk = self._create_block(2 * UPLOAD_QUANTUM) + self.assertEqual(len(chunk), 2 * UPLOAD_QUANTUM) + response = self.client.put( + "/upload/storage/v1/b/bucket-name/o", + query_string={"upload_id": upload_id}, + headers={ + "content-range": "bytes 0-{len:d}/{obj_size:d}".format( + len=2 * UPLOAD_QUANTUM - 1, obj_size=2 * UPLOAD_QUANTUM ), - ) - self.assertEqual(response.status_code, 200) - self.assertTrue( - response.headers.get("content-type").startswith("application/json") - ) - create_rest = json.loads(response.data) - self.assertIn("id", create_rest) - id = create_rest.get("id") - - response = self.client.get( - "/storage/v1/b/bucket-name/o/256k.txt", - query_string={"alt": "media"}, - headers={"x-retry-test-id": id}, - ) - with self.assertRaises(testbench.error.RestException) as ex: - _ = len(response.data) - self.assertIn("broken stream", ex.exception.msg) - - def test_retry_test_return_broken_stream_after_bytes(self): - response = self.client.post( - "/storage/v1/b", data=json.dumps({"name": "bucket-name"}) - ) - self.assertEqual(response.status_code, 200) - # Use the XML API to inject a larger object and smaller object. - media = self._create_block(UPLOAD_QUANTUM) - blob_larger = self.client.put( - "/bucket-name/256k.txt", - content_type="text/plain", - data=media, - ) - self.assertEqual(blob_larger.status_code, 200) - - media = self._create_block(128) - blob_smaller = self.client.put( - "/bucket-name/128.txt", - content_type="text/plain", - data=media, - ) - self.assertEqual(blob_smaller.status_code, 200) - - # Setup a failure for reading back the object. - response = self.client.post( - "/retry_test", - data=json.dumps( - { - "instructions": { - "storage.objects.get": ["return-broken-stream-after-256K"] - } + "x-retry-test-id": id, + }, + data=chunk, + ) + end_time = time.perf_counter() + elapsed_time = end_time - start_time + self.assertLess(elapsed_time, 1) + self.assertEqual(response.status_code, 200, msg=response.data) + + # Check the status of a resumable upload. + response = self.client.put( + "/upload/storage/v1/b/bucket-name/o", + query_string={"upload_id": upload_id}, + headers={ + "content-range": "bytes */*", + "x-retry-test-id": id, + }, + ) + end_time = time.perf_counter() + elapsed_time = end_time - start_time + self.assertLess(elapsed_time, 1) + self.assertEqual(response.status_code, 200, msg=response.data) + + # Finally to complete the upload, resend a full object upload again. + response = self.client.put( + "/upload/storage/v1/b/bucket-name/o", + query_string={"upload_id": upload_id}, + headers={ + "content-range": "bytes 0-{len:d}/{obj_size:d}".format( + len=2 * UPLOAD_QUANTUM - 1, obj_size=2 * UPLOAD_QUANTUM + ), + "x-retry-test-id": id, + }, + data=chunk, + ) + self.assertEqual(response.status_code, 200, msg=response.data) + create_rest = json.loads(response.data) + self.assertIn("size", create_rest) + self.assertEqual(int(create_rest.get("size")), 2 * UPLOAD_QUANTUM) + + def test_retry_test_return_error_after_bytes(self): + response = self.client.post( + "/storage/v1/b", data=json.dumps({"name": "bucket-name"}) + ) + self.assertEqual(response.status_code, 200) + + # Setup two after-bytes errors to test injecting failures in + # resumable uploads, both multiple chunks and a single chunk. + error_after_300K = 300 * 1024 + response = self.client.post( + "/retry_test", + data=json.dumps( + { + "instructions": { + "storage.objects.insert": [ + "return-504-after-256K", + "return-504-after-300K", + ] } + } + ), + ) + self.assertEqual(response.status_code, 200) + self.assertTrue( + response.headers.get("content-type").startswith("application/json") + ) + create_rest = json.loads(response.data) + self.assertIn("id", create_rest) + id = create_rest.get("id") + + response = self.client.post( + "/upload/storage/v1/b/bucket-name/o", + query_string={"uploadType": "resumable", "name": "will-fail"}, + content_type="application/json", + ) + self.assertEqual(response.status_code, 200) + location = response.headers.get("location") + self.assertIn("upload_id=", location) + match = re.search("[&?]upload_id=([^&]+)", location) + self.assertIsNotNone(match, msg=location) + upload_id = match.group(1) + + # Upload the first 256KiB chunk of data and trigger error. + chunk = self._create_block(UPLOAD_QUANTUM) + self.assertEqual(len(chunk), UPLOAD_QUANTUM) + + response = self.client.put( + "/upload/storage/v1/b/bucket-name/o", + query_string={"upload_id": upload_id}, + headers={ + "content-range": "bytes 0-{len:d}/{obj_size:d}".format( + len=UPLOAD_QUANTUM - 1, obj_size=2 * UPLOAD_QUANTUM ), - ) - self.assertEqual(response.status_code, 200) - self.assertTrue( - response.headers.get("content-type").startswith("application/json") - ) - create_rest = json.loads(response.data) - self.assertIn("id", create_rest) - id = create_rest.get("id") - - # The 128-bytes file is too small to trigger the "return-504-after-256K" fault injection. - response = self.client.get( - "/storage/v1/b/bucket-name/o/128.txt", - query_string={"alt": "media"}, - headers={"x-retry-test-id": id}, - ) - self.assertEqual(response.status_code, 200, msg=response.data) - - # The 256KiB file triggers the "return-broken-stream-after-256K" fault injection. - response = self.client.get( - "/storage/v1/b/bucket-name/o/256k.txt", - query_string={"alt": "media"}, - headers={"x-retry-test-id": id}, - ) - self.assertIn("x-goog-generation", response.headers) - with self.assertRaises(testbench.error.RestException) as ex: - _ = len(response.data) - self.assertIn("broken stream", ex.exception.msg) - - def test_list_retry_stall_test(self): - response = self.client.post( - "/retry_test", - data=json.dumps( - {"instructions": {"storage.buckets.list": ["stall-for-1s-after-0K"]}} + "x-retry-test-id": id, + }, + data=chunk, + ) + self.assertEqual(response.status_code, 504, msg=response.data) + + # Check the status of a resumable upload. + response = self.client.put( + "/upload/storage/v1/b/bucket-name/o", + query_string={"upload_id": upload_id}, + headers={ + "content-range": "bytes */*", + "x-retry-test-id": id, + }, + ) + self.assertEqual(response.status_code, 308, msg=response.data) + self.assertIn("range", response.headers) + self.assertEqual( + response.headers.get("range"), "bytes=0-%d" % (UPLOAD_QUANTUM - 1) + ) + + # Send a full object upload here to verify testbench can + # (1) trigger error_after_bytes instructions, + # (2) ignore duplicate request bytes and + # (3) return a forced failure with partial data. + chunk = self._create_block(2 * UPLOAD_QUANTUM) + self.assertEqual(len(chunk), 2 * UPLOAD_QUANTUM) + response = self.client.put( + "/upload/storage/v1/b/bucket-name/o", + query_string={"upload_id": upload_id}, + headers={ + "content-range": "bytes 0-{len:d}/{obj_size:d}".format( + len=2 * UPLOAD_QUANTUM - 1, obj_size=2 * UPLOAD_QUANTUM ), - ) - self.assertEqual(response.status_code, 200) - self.assertTrue( - response.headers.get("content-type").startswith("application/json") - ) - create_rest = json.loads(response.data) - self.assertIn("id", create_rest) - - start_time = time.perf_counter() - list_response = self.client.get( - "/storage/v1/b", - query_string={"project": "test-project-unused"}, - headers={"x-retry-test-id": create_rest.get("id")}, - ) - self.assertEqual(len(list_response.get_data()), 40) - end_time = time.perf_counter() - elapsed_time = end_time - start_time - self.assertGreater(elapsed_time, 1) - - def test_read_retry_test_stall_after_bytes(self): - response = self.client.post( - "/storage/v1/b", data=json.dumps({"name": "bucket-name"}) - ) - self.assertEqual(response.status_code, 200) - # Use the XML API to inject a larger object and smaller object. - media = self._create_block(UPLOAD_QUANTUM) - blob_larger = self.client.put( - "/bucket-name/256k.txt", - content_type="text/plain", - data=media, - ) - self.assertEqual(blob_larger.status_code, 200) - - media = self._create_block(128) - blob_smaller = self.client.put( - "/bucket-name/128.txt", - content_type="text/plain", - data=media, - ) - self.assertEqual(blob_smaller.status_code, 200) - - # Setup a stall for reading back the object. - response = self.client.post( - "/retry_test", - data=json.dumps( - {"instructions": {"storage.objects.get": ["stall-for-1s-after-128K"]}} + "x-retry-test-id": id, + }, + data=chunk, + ) + self.assertEqual(response.status_code, 504, msg=response.data) + + # Check the status of a resumable upload. + response = self.client.put( + "/upload/storage/v1/b/bucket-name/o", + query_string={"upload_id": upload_id}, + headers={ + "content-range": "bytes */*", + "x-retry-test-id": id, + }, + ) + self.assertEqual(response.status_code, 308, msg=response.data) + self.assertIn("range", response.headers) + self.assertEqual( + response.headers.get("range"), "bytes=0-%d" % (error_after_300K - 1) + ) + + # Finally to complete the upload, resend a full object upload again. + response = self.client.put( + "/upload/storage/v1/b/bucket-name/o", + query_string={"upload_id": upload_id}, + headers={ + "content-range": "bytes 0-{len:d}/{obj_size:d}".format( + len=2 * UPLOAD_QUANTUM - 1, obj_size=2 * UPLOAD_QUANTUM ), - ) - - self.assertEqual(response.status_code, 200) - self.assertTrue( - response.headers.get("content-type").startswith("application/json") - ) - create_rest = json.loads(response.data) - self.assertIn("id", create_rest) - id = create_rest.get("id") - - start_time = time.perf_counter() - # The 128-bytes file is too small to trigger the "stall-for-1s-after-128K" fault injection. - response = self.client.get( - "/storage/v1/b/bucket-name/o/128.txt", - query_string={"alt": "media"}, - headers={"x-retry-test-id": id}, - ) - self.assertEqual(response.status_code, 200, msg=response.data) - self.assertEqual(len(response.get_data()), 128) - end_time = time.perf_counter() - elapsed_time = end_time - start_time - # This will take less the injected delay (1s). - self.assertLess(elapsed_time, 1) + "x-retry-test-id": id, + }, + data=chunk, + ) + self.assertEqual(response.status_code, 200, msg=response.data) + create_rest = json.loads(response.data) + self.assertIn("size", create_rest) + self.assertEqual(int(create_rest.get("size")), 2 * UPLOAD_QUANTUM) - start_time = time.perf_counter() - # The 256KiB file triggers the "stall-for-1s-after-128K" and will - # take more than injected delay (1s). - response = self.client.get( - "/storage/v1/b/bucket-name/o/256k.txt", - query_string={"alt": "media"}, - headers={"x-retry-test-id": id}, - ) - self.assertEqual(len(response.get_data()), UPLOAD_QUANTUM) - end_time = time.perf_counter() - elapsed_time = end_time - start_time - self.assertGreater(elapsed_time, 1) - - def test_retry_test_return_error_after_bytes(self): + def test_write_retry_test_stall_after_bytes(self): + # Create a new bucket response = self.client.post( "/storage/v1/b", data=json.dumps({"name": "bucket-name"}) ) self.assertEqual(response.status_code, 200) - # Setup two after-bytes errors to test injecting failures in - # resumable uploads, both multiple chunks and a single chunk. - error_after_300K = 300 * 1024 + # Setup a stall for reading back the object. response = self.client.post( "/retry_test", data=json.dumps( { "instructions": { "storage.objects.insert": [ - "return-504-after-256K", - "return-504-after-300K", + "stall-for-1s-after-250K", + "stall-for-1s-after-300K", ] } } ), + content_type="application/json", ) self.assertEqual(response.status_code, 200) self.assertTrue( response.headers.get("content-type").startswith("application/json") ) + create_rest = json.loads(response.data) self.assertIn("id", create_rest) - id = create_rest.get("id") + test_id = create_rest.get("id") + # Initiate resumable upload response = self.client.post( "/upload/storage/v1/b/bucket-name/o", - query_string={"uploadType": "resumable", "name": "will-fail"}, + query_string={"uploadType": "resumable", "name": "stall"}, content_type="application/json", ) self.assertEqual(response.status_code, 200) + location = response.headers.get("location") self.assertIn("upload_id=", location) - match = re.search("[&?]upload_id=([^&]+)", location) + match = re.search(r"[&?]upload_id=([^&]+)", location) self.assertIsNotNone(match, msg=location) upload_id = match.group(1) - # Upload the first 256KiB chunk of data and trigger error. + # Upload the first 256KiB chunk of data and trigger the stall. chunk = self._create_block(UPLOAD_QUANTUM) self.assertEqual(len(chunk), UPLOAD_QUANTUM) + start_time = time.perf_counter() response = self.client.put( - "/upload/storage/v1/b/bucket-name/o", + f"/upload/storage/v1/b/bucket-name/o", query_string={"upload_id": upload_id}, headers={ "content-range": "bytes 0-{len:d}/{obj_size:d}".format( len=UPLOAD_QUANTUM - 1, obj_size=2 * UPLOAD_QUANTUM ), - "x-retry-test-id": id, + "x-retry-test-id": test_id, }, data=chunk, ) - self.assertEqual(response.status_code, 504, msg=response.data) - - # Check the status of a resumable upload. - response = self.client.put( - "/upload/storage/v1/b/bucket-name/o", - query_string={"upload_id": upload_id}, - headers={ - "content-range": "bytes */*", - "x-retry-test-id": id, - }, - ) - self.assertEqual(response.status_code, 308, msg=response.data) - self.assertIn("range", response.headers) - self.assertEqual( - response.headers.get("range"), "bytes=0-%d" % (UPLOAD_QUANTUM - 1) - ) + end_time = time.perf_counter() + elapsed_time = end_time - start_time + self.assertGreater(elapsed_time, 1) + self.assertEqual(response.status_code, 308) - # Send a full object upload here to verify testbench can - # (1) trigger error_after_bytes instructions, - # (2) ignore duplicate request bytes and - # (3) return a forced failure with partial data. - chunk = self._create_block(2 * UPLOAD_QUANTUM) - self.assertEqual(len(chunk), 2 * UPLOAD_QUANTUM) + # Upload the second 256KiB chunk of data and trigger the stall again. + start_time = time.perf_counter() + chunk = self._create_block(UPLOAD_QUANTUM) + self.assertEqual(len(chunk), UPLOAD_QUANTUM) response = self.client.put( "/upload/storage/v1/b/bucket-name/o", query_string={"upload_id": upload_id}, @@ -618,28 +862,19 @@ def test_retry_test_return_error_after_bytes(self): "content-range": "bytes 0-{len:d}/{obj_size:d}".format( len=2 * UPLOAD_QUANTUM - 1, obj_size=2 * UPLOAD_QUANTUM ), - "x-retry-test-id": id, + "x-retry-test-id": test_id, }, data=chunk, ) - self.assertEqual(response.status_code, 504, msg=response.data) - - # Check the status of a resumable upload. - response = self.client.put( - "/upload/storage/v1/b/bucket-name/o", - query_string={"upload_id": upload_id}, - headers={ - "content-range": "bytes */*", - "x-retry-test-id": id, - }, - ) - self.assertEqual(response.status_code, 308, msg=response.data) - self.assertIn("range", response.headers) - self.assertEqual( - response.headers.get("range"), "bytes=0-%d" % (error_after_300K - 1) - ) + end_time = time.perf_counter() + elapsed_time = end_time - start_time + self.assertGreater(elapsed_time, 1) + self.assertEqual(response.status_code, 200, msg=response.data) - # Finally to complete the upload, resend a full object upload again. + # Upload the second 256KiB chunk of data and check that stall not happen + start_time = time.perf_counter() + chunk = self._create_block(UPLOAD_QUANTUM) + self.assertEqual(len(chunk), UPLOAD_QUANTUM) response = self.client.put( "/upload/storage/v1/b/bucket-name/o", query_string={"upload_id": upload_id}, @@ -647,16 +882,15 @@ def test_retry_test_return_error_after_bytes(self): "content-range": "bytes 0-{len:d}/{obj_size:d}".format( len=2 * UPLOAD_QUANTUM - 1, obj_size=2 * UPLOAD_QUANTUM ), - "x-retry-test-id": id, + "x-retry-test-id": test_id, }, data=chunk, ) + end_time = time.perf_counter() + elapsed_time = end_time - start_time + self.assertLess(elapsed_time, 1) self.assertEqual(response.status_code, 200, msg=response.data) - create_rest = json.loads(response.data) - self.assertIn("size", create_rest) - self.assertEqual(int(create_rest.get("size")), 2 * UPLOAD_QUANTUM) - - def test_write_retry_test_stall_after_bytes(self): + def test_write_retry_test_stall_for_full_uploads(self): # Create a new bucket response = self.client.post( "/storage/v1/b", data=json.dumps({"name": "bucket-name"}) @@ -671,7 +905,6 @@ def test_write_retry_test_stall_after_bytes(self): "instructions": { "storage.objects.insert": [ "stall-for-1s-after-250K", - "stall-for-1s-after-300K", ] } } @@ -687,80 +920,25 @@ def test_write_retry_test_stall_after_bytes(self): self.assertIn("id", create_rest) test_id = create_rest.get("id") - # Initiate resumable upload - response = self.client.post( - "/upload/storage/v1/b/bucket-name/o", - query_string={"uploadType": "resumable", "name": "stall"}, - content_type="application/json", - ) - self.assertEqual(response.status_code, 200) - - location = response.headers.get("location") - self.assertIn("upload_id=", location) - match = re.search(r"[&?]upload_id=([^&]+)", location) - self.assertIsNotNone(match, msg=location) - upload_id = match.group(1) - - # Upload the first 256KiB chunk of data and trigger the stall. - chunk = self._create_block(UPLOAD_QUANTUM) - self.assertEqual(len(chunk), UPLOAD_QUANTUM) - - start_time = time.perf_counter() - response = self.client.put( - f"/upload/storage/v1/b/bucket-name/o", - query_string={"upload_id": upload_id}, - headers={ - "content-range": "bytes 0-{len:d}/{obj_size:d}".format( - len=UPLOAD_QUANTUM - 1, obj_size=2 * UPLOAD_QUANTUM - ), - "x-retry-test-id": test_id, - }, - data=chunk, - ) - end_time = time.perf_counter() - elapsed_time = end_time - start_time - self.assertGreater(elapsed_time, 1) - self.assertEqual(response.status_code, 308) + # Upload the 256KiB of data and see if stall happens + data = self._create_block(UPLOAD_QUANTUM) + self.assertEqual(len(data), UPLOAD_QUANTUM) - # Upload the second 256KiB chunk of data and trigger the stall again. start_time = time.perf_counter() - chunk = self._create_block(UPLOAD_QUANTUM) - self.assertEqual(len(chunk), UPLOAD_QUANTUM) - response = self.client.put( + boundary, payload = format_multipart_upload({}, data) + response = self.client.post( "/upload/storage/v1/b/bucket-name/o", - query_string={"upload_id": upload_id}, + query_string={"uploadType": "multipart", "name": "stall"}, + content_type="multipart/related; boundary=" + boundary, headers={ - "content-range": "bytes 0-{len:d}/{obj_size:d}".format( - len=2 * UPLOAD_QUANTUM - 1, obj_size=2 * UPLOAD_QUANTUM - ), "x-retry-test-id": test_id, }, - data=chunk, + data=payload, ) end_time = time.perf_counter() elapsed_time = end_time - start_time + self.assertEqual(response.status_code, 200) self.assertGreater(elapsed_time, 1) - self.assertEqual(response.status_code, 200, msg=response.data) - - # Upload the second 256KiB chunk of data and check that stall not happen - start_time = time.perf_counter() - chunk = self._create_block(UPLOAD_QUANTUM) - self.assertEqual(len(chunk), UPLOAD_QUANTUM) - response = self.client.put( - "/upload/storage/v1/b/bucket-name/o", - query_string={"upload_id": upload_id}, - headers={ - "content-range": "bytes 0-{len:d}/{obj_size:d}".format( - len=2 * UPLOAD_QUANTUM - 1, obj_size=2 * UPLOAD_QUANTUM - ), - "x-retry-test-id": test_id, - }, - data=chunk, - ) - end_time = time.perf_counter() - elapsed_time = end_time - start_time - self.assertLess(elapsed_time, 1) - self.assertEqual(response.status_code, 200, msg=response.data) def test_write_retry_test_stall_single_shot(self): # Create a new bucket @@ -825,8 +1003,8 @@ def test_write_retry_test_stall_single_shot(self): ) end_time = time.perf_counter() elapsed_time = end_time - start_time - self.assertLess(elapsed_time, 1) self.assertEqual(response.status_code, 200) + self.assertLess(elapsed_time, 1) def test_write_retry_test_stall_single_shot_while_upload_size_less_than_stall_size( self, @@ -864,7 +1042,6 @@ def test_write_retry_test_stall_single_shot_while_upload_size_less_than_stall_si data = self._create_block(200 * 1024) self.assertEqual(len(data), 200 * 1024) - start_time = time.perf_counter() boundary, payload = format_multipart_upload({}, data) response = self.client.post( "/upload/storage/v1/b/bucket-name/o", @@ -875,307 +1052,429 @@ def test_write_retry_test_stall_single_shot_while_upload_size_less_than_stall_si }, data=payload, ) - end_time = time.perf_counter() - elapsed_time = end_time - start_time self.assertEqual(response.status_code, 200) - self.assertLess(elapsed_time, 1) + def test_write_retry_test_stall_after_bytes(self): + response = self.client.post( + "/storage/v1/b", data=json.dumps({"name": "bucket-name"}) + ) + self.assertEqual(response.status_code, 200) + + # Setup a stall for reading back the object. + response = self.client.post( + "/retry_test", + data=json.dumps( + {"instructions": { + "storage.objects.insert": ["stall-for-1s-after-256K"]}} + ), + ) + + self.assertEqual(response.status_code, 200) + self.assertTrue( + response.headers.get("content-type").startswith("application/json") + ) + create_rest = json.loads(response.data) + self.assertIn("id", create_rest) + id = create_rest.get("id") + + response = self.client.post( + "/upload/storage/v1/b/bucket-name/o", + query_string={"uploadType": "resumable", "name": "stall"}, + content_type="application/json", + ) + self.assertEqual(response.status_code, 200) + location = response.headers.get("location") + self.assertIn("upload_id=", location) + match = re.search("[&?]upload_id=([^&]+)", location) + self.assertIsNotNone(match, msg=location) + upload_id = match.group(1) + + # Upload the first 256KiB chunk of data and trigger error. + chunk = self._create_block(UPLOAD_QUANTUM) + self.assertEqual(len(chunk), UPLOAD_QUANTUM) + start_time = time.perf_counter() + response = self.client.put( + "/upload/storage/v1/b/bucket-name/o", + query_string={"upload_id": upload_id}, + headers={ + "content-range": "bytes 0-{len:d}/{obj_size:d}".format( + len=UPLOAD_QUANTUM - 1, obj_size=2 * UPLOAD_QUANTUM + ), + "x-retry-test-id": id, + }, + data=chunk, + ) + end_time = time.perf_counter() + elapsed_time = end_time - start_time + self.assertGreater(elapsed_time, 1) + + # Check the status of a resumable upload. + start_time = time.perf_counter() + response = self.client.put( + "/upload/storage/v1/b/bucket-name/o", + query_string={"upload_id": upload_id}, + headers={ + "content-range": "bytes */*", + "x-retry-test-id": id, + }, + ) + end_time = time.perf_counter() + elapsed_time = end_time - start_time + self.assertLess(elapsed_time, 1) + + # Send a full object upload here to verify testbench can + # (1) trigger error_after_bytes instructions, + # (2) ignore duplicate request bytes and + # (3) return a forced failure with partial data. + start_time = time.perf_counter() + chunk = self._create_block(2 * UPLOAD_QUANTUM) + self.assertEqual(len(chunk), 2 * UPLOAD_QUANTUM) + response = self.client.put( + "/upload/storage/v1/b/bucket-name/o", + query_string={"upload_id": upload_id}, + headers={ + "content-range": "bytes 0-{len:d}/{obj_size:d}".format( + len=2 * UPLOAD_QUANTUM - 1, obj_size=2 * UPLOAD_QUANTUM + ), + "x-retry-test-id": id, + }, + data=chunk, + ) + end_time = time.perf_counter() + elapsed_time = end_time - start_time + self.assertLess(elapsed_time, 1) + self.assertEqual(response.status_code, 200, msg=response.data) + + # Check the status of a resumable upload. + response = self.client.put( + "/upload/storage/v1/b/bucket-name/o", + query_string={"upload_id": upload_id}, + headers={ + "content-range": "bytes */*", + "x-retry-test-id": id, + }, + ) + end_time = time.perf_counter() + elapsed_time = end_time - start_time + self.assertLess(elapsed_time, 1) + self.assertEqual(response.status_code, 200, msg=response.data) + + # Finally to complete the upload, resend a full object upload again. + response = self.client.put( + "/upload/storage/v1/b/bucket-name/o", + query_string={"upload_id": upload_id}, + headers={ + "content-range": "bytes 0-{len:d}/{obj_size:d}".format( + len=2 * UPLOAD_QUANTUM - 1, obj_size=2 * UPLOAD_QUANTUM + ), + "x-retry-test-id": id, + }, + data=chunk, + ) + self.assertEqual(response.status_code, 200, msg=response.data) + create_rest = json.loads(response.data) + self.assertIn("size", create_rest) + self.assertEqual(int(create_rest.get("size")), 2 * UPLOAD_QUANTUM) class TestTestbenchRetryGrpc(unittest.TestCase): - def setUp(self): - rest_server.db.clear() - self.db = rest_server.db - request = testbench.common.FakeRequest( - args={}, - data=json.dumps({"name": "bucket-name"}), - ) - self.bucket, _ = gcs.bucket.Bucket.init(request, None) - self.db.insert_bucket(self.bucket, None) - self.rest_client = rest_server.server.test_client() - self.grpc = testbench.grpc_server.StorageServicer(self.db) - - @staticmethod - def _create_block(desired_kib): - line = "A" * 127 + "\n" - return int(desired_kib / len(line)) * line - - def test_grpc_retry_return_error(self): - # Use the rest client to setup a 503 failure for retrieving bucket metadata. - response = self.rest_client.post( - "/retry_test", - data=json.dumps( - { - "instructions": {"storage.buckets.get": ["return-503"]}, - "transport": "GRPC", - }, - ), - ) - self.assertEqual(response.status_code, 200) - create_rest = json.loads(response.data) - self.assertIn("id", create_rest) - - context = unittest.mock.Mock() - context.invocation_metadata = unittest.mock.Mock( - return_value=(("x-retry-test-id", create_rest.get("id")),) - ) - response = self.grpc.GetBucket( - storage_pb2.GetBucketRequest(name="projects/_/buckets/bucket-name"), context - ) - context.abort.assert_called_once_with(StatusCode.UNAVAILABLE, unittest.mock.ANY) - - def test_grpc_retry_reset_connection(self): - # Use the rest client to setup a failure for retrieving bucket metadata. - response = self.rest_client.post( - "/retry_test", - data=json.dumps( - { - "instructions": { - "storage.buckets.get": ["return-reset-connection"] - }, - "transport": "GRPC", + def setUp(self): + rest_server.db.clear() + self.db = rest_server.db + request = testbench.common.FakeRequest( + args={}, + data=json.dumps({"name": "bucket-name"}), + ) + self.bucket, _ = gcs.bucket.Bucket.init(request, None) + self.db.insert_bucket(self.bucket, None) + self.rest_client = rest_server.server.test_client() + self.grpc = testbench.grpc_server.StorageServicer(self.db) + + @staticmethod + def _create_block(desired_kib): + line = "A" * 127 + "\n" + return int(desired_kib / len(line)) * line + + def test_grpc_retry_return_error(self): + # Use the rest client to setup a 503 failure for retrieving bucket metadata. + response = self.rest_client.post( + "/retry_test", + data=json.dumps( + { + "instructions": {"storage.buckets.get": ["return-503"]}, + "transport": "GRPC", + }, + ), + ) + self.assertEqual(response.status_code, 200) + create_rest = json.loads(response.data) + self.assertIn("id", create_rest) + + context = unittest.mock.Mock() + context.invocation_metadata = unittest.mock.Mock( + return_value=(("x-retry-test-id", create_rest.get("id")),) + ) + response = self.grpc.GetBucket( + storage_pb2.GetBucketRequest(name="projects/_/buckets/bucket-name"), + context + ) + context.abort.assert_called_once_with(StatusCode.UNAVAILABLE, + unittest.mock.ANY) + + def test_grpc_retry_reset_connection(self): + # Use the rest client to setup a failure for retrieving bucket metadata. + response = self.rest_client.post( + "/retry_test", + data=json.dumps( + { + "instructions": { + "storage.buckets.get": ["return-reset-connection"] }, - ), - ) - self.assertEqual(response.status_code, 200) - create_rest = json.loads(response.data) - self.assertIn("id", create_rest) - - context = unittest.mock.Mock() - context.invocation_metadata = unittest.mock.Mock( - return_value=(("x-retry-test-id", create_rest.get("id")),) - ) - response = self.grpc.GetBucket( - storage_pb2.GetBucketRequest(name="projects/_/buckets/bucket-name"), context - ) - context.abort.assert_called_once_with( - StatusCode.UNAVAILABLE, - "Injected 'socket closed, connection reset by peer' fault", - ) - - def test_grpc_retry_broken_stream(self): - # Use the XML API to inject an object with some data. - media = self._create_block(2 * UPLOAD_QUANTUM) - response = self.rest_client.put( - "/bucket-name/512k.txt", - content_type="text/plain", - data=media, - ) - self.assertEqual(response.status_code, 200) - - # Setup a return-broken-stream failure for reading back the object. - response = self.rest_client.post( - "/retry_test", - data=json.dumps( - { - "instructions": {"storage.objects.get": ["return-broken-stream"]}, - "transport": "GRPC", + "transport": "GRPC", + }, + ), + ) + self.assertEqual(response.status_code, 200) + create_rest = json.loads(response.data) + self.assertIn("id", create_rest) + + context = unittest.mock.Mock() + context.invocation_metadata = unittest.mock.Mock( + return_value=(("x-retry-test-id", create_rest.get("id")),) + ) + response = self.grpc.GetBucket( + storage_pb2.GetBucketRequest(name="projects/_/buckets/bucket-name"), + context + ) + context.abort.assert_called_once_with( + StatusCode.UNAVAILABLE, + "Injected 'socket closed, connection reset by peer' fault", + ) + + def test_grpc_retry_broken_stream(self): + # Use the XML API to inject an object with some data. + media = self._create_block(2 * UPLOAD_QUANTUM) + response = self.rest_client.put( + "/bucket-name/512k.txt", + content_type="text/plain", + data=media, + ) + self.assertEqual(response.status_code, 200) + + # Setup a return-broken-stream failure for reading back the object. + response = self.rest_client.post( + "/retry_test", + data=json.dumps( + { + "instructions": { + "storage.objects.get": ["return-broken-stream"]}, + "transport": "GRPC", + }, + ), + ) + self.assertEqual(response.status_code, 200) + create_rest = json.loads(response.data) + self.assertIn("id", create_rest) + + context = unittest.mock.Mock() + context.invocation_metadata = unittest.mock.Mock( + return_value=(("x-retry-test-id", create_rest.get("id")),) + ) + response = self.grpc.ReadObject( + storage_pb2.ReadObjectRequest( + bucket="projects/_/buckets/bucket-name", object="512k.txt" + ), + context, + ) + list(response) + context.abort.assert_called_once_with( + StatusCode.UNAVAILABLE, + "Injected 'broken stream' fault", + ) + + # Setup a return-broken-stream-after-256K failure for reading back the object. + response = self.rest_client.post( + "/retry_test", + data=json.dumps( + { + "instructions": { + "storage.objects.get": ["return-broken-stream-after-256K"] }, - ), - ) - self.assertEqual(response.status_code, 200) - create_rest = json.loads(response.data) - self.assertIn("id", create_rest) - - context = unittest.mock.Mock() - context.invocation_metadata = unittest.mock.Mock( - return_value=(("x-retry-test-id", create_rest.get("id")),) - ) - response = self.grpc.ReadObject( - storage_pb2.ReadObjectRequest( - bucket="projects/_/buckets/bucket-name", object="512k.txt" - ), - context, - ) - list(response) - context.abort.assert_called_once_with( - StatusCode.UNAVAILABLE, - "Injected 'broken stream' fault", - ) - - # Setup a return-broken-stream-after-256K failure for reading back the object. - response = self.rest_client.post( - "/retry_test", - data=json.dumps( - { - "instructions": { - "storage.objects.get": ["return-broken-stream-after-256K"] - }, - "transport": "GRPC", + "transport": "GRPC", + }, + ), + ) + self.assertEqual(response.status_code, 200) + create_rest = json.loads(response.data) + self.assertIn("id", create_rest) + + context = unittest.mock.Mock() + context.invocation_metadata = unittest.mock.Mock( + return_value=(("x-retry-test-id", create_rest.get("id")),) + ) + response = self.grpc.ReadObject( + storage_pb2.ReadObjectRequest( + bucket="projects/_/buckets/bucket-name", object="512k.txt" + ), + context, + ) + list(response) + context.abort.assert_called_once_with( + StatusCode.UNAVAILABLE, + "Injected 'broken stream' fault", + ) + + def test_grpc_return_error_after_bytes(self): + # Setup two after-bytes errors to test injecting failures in + # resumable uploads, both multiple chunks and a single chunk. + response = self.rest_client.post( + "/retry_test", + data=json.dumps( + { + "instructions": { + "storage.objects.insert": [ + "return-503-after-256K", + "return-503-after-300K", + ] }, - ), - ) - self.assertEqual(response.status_code, 200) - create_rest = json.loads(response.data) - self.assertIn("id", create_rest) - - context = unittest.mock.Mock() - context.invocation_metadata = unittest.mock.Mock( - return_value=(("x-retry-test-id", create_rest.get("id")),) - ) - response = self.grpc.ReadObject( - storage_pb2.ReadObjectRequest( - bucket="projects/_/buckets/bucket-name", object="512k.txt" - ), - context, - ) - list(response) - context.abort.assert_called_once_with( - StatusCode.UNAVAILABLE, - "Injected 'broken stream' fault", - ) - - def test_grpc_return_error_after_bytes(self): - # Setup two after-bytes errors to test injecting failures in - # resumable uploads, both multiple chunks and a single chunk. - response = self.rest_client.post( - "/retry_test", - data=json.dumps( - { - "instructions": { - "storage.objects.insert": [ - "return-503-after-256K", - "return-503-after-300K", - ] - }, - "transport": "GRPC", - } - ), - ) - self.assertEqual(response.status_code, 200) - create_rest = json.loads(response.data) - self.assertIn("id", create_rest) - id = create_rest.get("id") - - context = unittest.mock.Mock() - context.invocation_metadata = unittest.mock.Mock( - return_value=(("x-retry-test-id", id),) - ) - start = self.grpc.StartResumableWrite( - storage_pb2.StartResumableWriteRequest( - write_object_spec=storage_pb2.WriteObjectSpec( - resource=storage_pb2.Object( - name="object-name", bucket="projects/_/buckets/bucket-name" - ) + "transport": "GRPC", + } + ), + ) + self.assertEqual(response.status_code, 200) + create_rest = json.loads(response.data) + self.assertIn("id", create_rest) + id = create_rest.get("id") + + context = unittest.mock.Mock() + context.invocation_metadata = unittest.mock.Mock( + return_value=(("x-retry-test-id", id),) + ) + start = self.grpc.StartResumableWrite( + storage_pb2.StartResumableWriteRequest( + write_object_spec=storage_pb2.WriteObjectSpec( + resource=storage_pb2.Object( + name="object-name", bucket="projects/_/buckets/bucket-name" ) - ), - context=context, - ) - self.assertIsNotNone(start.upload_id) - - # Upload the first 256KiB chunk of data and trigger error. - content = self._create_block(UPLOAD_QUANTUM).encode("utf-8") - r1 = storage_pb2.WriteObjectRequest( - upload_id=start.upload_id, - write_offset=0, - checksummed_data=storage_pb2.ChecksummedData( - content=content, crc32c=crc32c.crc32c(content) - ), - finish_write=False, - ) - write = self.grpc.WriteObject([r1], context) - context.abort.assert_called_with(StatusCode.UNAVAILABLE, unittest.mock.ANY) - - status = self.grpc.QueryWriteStatus( - storage_pb2.QueryWriteStatusRequest(upload_id=start.upload_id), - context, - ) - self.assertEqual(status.persisted_size, UPLOAD_QUANTUM) - - # Send a full object upload here to verify testbench can - # (1) trigger error_after_bytes instructions, - # (2) ignore duplicate request bytes and - # (3) return a forced failure with partial data. - media = self._create_block(2 * UPLOAD_QUANTUM).encode("utf-8") - r2 = storage_pb2.WriteObjectRequest( - upload_id=start.upload_id, - write_offset=0, - checksummed_data=storage_pb2.ChecksummedData( - content=media, crc32c=crc32c.crc32c(media) - ), - finish_write=True, - ) - write = self.grpc.WriteObject([r2], context) - context.abort.assert_called_with(StatusCode.UNAVAILABLE, unittest.mock.ANY) - self.assertIsNotNone(write) - blob = write.resource - self.assertEqual(blob.name, "object-name") - self.assertEqual(blob.bucket, "projects/_/buckets/bucket-name") - self.assertEqual(blob.size, 2 * UPLOAD_QUANTUM) - - def test_grpc_bidiwrite_return_error_after_bytes(self): - # Setup an initial-response error and two after-bytes errors to test injecting - # failures in resumable uploads, both multiple chunks and a single chunk. - response = self.rest_client.post( - "/retry_test", - data=json.dumps( - { - "instructions": { - "storage.objects.insert": [ - "return-503", - "return-503-after-256K", - "return-503-after-300K", - ] - }, - "transport": "GRPC", - } - ), - ) - self.assertEqual(response.status_code, 200) - create_rest = json.loads(response.data) - self.assertIn("id", create_rest) - id = create_rest.get("id") - - context = unittest.mock.Mock() - context.invocation_metadata = unittest.mock.Mock( - return_value=(("x-retry-test-id", id),) - ) - start = self.grpc.StartResumableWrite( - storage_pb2.StartResumableWriteRequest( - write_object_spec=storage_pb2.WriteObjectSpec( - resource=storage_pb2.Object( - name="object-name", bucket="projects/_/buckets/bucket-name" - ) + ) + ), + context=context, + ) + self.assertIsNotNone(start.upload_id) + + # Upload the first 256KiB chunk of data and trigger error. + content = self._create_block(UPLOAD_QUANTUM).encode("utf-8") + r1 = storage_pb2.WriteObjectRequest( + upload_id=start.upload_id, + write_offset=0, + checksummed_data=storage_pb2.ChecksummedData( + content=content, crc32c=crc32c.crc32c(content) + ), + finish_write=False, + ) + write = self.grpc.WriteObject([r1], context) + context.abort.assert_called_with(StatusCode.UNAVAILABLE, unittest.mock.ANY) + + status = self.grpc.QueryWriteStatus( + storage_pb2.QueryWriteStatusRequest(upload_id=start.upload_id), + context, + ) + self.assertEqual(status.persisted_size, UPLOAD_QUANTUM) + + # Send a full object upload here to verify testbench can + # (1) trigger error_after_bytes instructions, + # (2) ignore duplicate request bytes and + # (3) return a forced failure with partial data. + media = self._create_block(2 * UPLOAD_QUANTUM).encode("utf-8") + r2 = storage_pb2.WriteObjectRequest( + upload_id=start.upload_id, + write_offset=0, + checksummed_data=storage_pb2.ChecksummedData( + content=media, crc32c=crc32c.crc32c(media) + ), + finish_write=True, + ) + write = self.grpc.WriteObject([r2], context) + context.abort.assert_called_with(StatusCode.UNAVAILABLE, unittest.mock.ANY) + self.assertIsNotNone(write) + blob = write.resource + self.assertEqual(blob.name, "object-name") + self.assertEqual(blob.bucket, "projects/_/buckets/bucket-name") + self.assertEqual(blob.size, 2 * UPLOAD_QUANTUM) + + def test_grpc_bidiwrite_return_error_after_bytes(self): + # Setup an initial-response error and two after-bytes errors to test injecting + # failures in resumable uploads, both multiple chunks and a single chunk. + response = self.rest_client.post( + "/retry_test", + data=json.dumps( + { + "instructions": { + "storage.objects.insert": [ + "return-503", + "return-503-after-256K", + "return-503-after-300K", + ] + }, + "transport": "GRPC", + } + ), + ) + self.assertEqual(response.status_code, 200) + create_rest = json.loads(response.data) + self.assertIn("id", create_rest) + id = create_rest.get("id") + + context = unittest.mock.Mock() + context.invocation_metadata = unittest.mock.Mock( + return_value=(("x-retry-test-id", id),) + ) + start = self.grpc.StartResumableWrite( + storage_pb2.StartResumableWriteRequest( + write_object_spec=storage_pb2.WriteObjectSpec( + resource=storage_pb2.Object( + name="object-name", bucket="projects/_/buckets/bucket-name" ) - ), - context=context, - ) - context.abort.assert_called_with(StatusCode.UNAVAILABLE, unittest.mock.ANY) - self.assertIsNotNone(start.upload_id) - - # Upload the first 256KiB chunk of data and trigger error. - content = self._create_block(UPLOAD_QUANTUM).encode("utf-8") - r1 = storage_pb2.BidiWriteObjectRequest( - upload_id=start.upload_id, - write_offset=0, - checksummed_data=storage_pb2.ChecksummedData( - content=content, crc32c=crc32c.crc32c(content) - ), - finish_write=False, - ) - list(self.grpc.BidiWriteObject([r1], context)) - context.abort.assert_called_with(StatusCode.UNAVAILABLE, unittest.mock.ANY) - - # Send a full object upload here to verify testbench can - # (1) trigger error_after_bytes instructions, - # (2) ignore duplicate request bytes and - # (3) return a forced failure with partial data. - media = self._create_block(2 * UPLOAD_QUANTUM).encode("utf-8") - r2 = storage_pb2.BidiWriteObjectRequest( - upload_id=start.upload_id, - write_offset=0, - checksummed_data=storage_pb2.ChecksummedData( - content=media, crc32c=crc32c.crc32c(media) - ), - finish_write=True, - ) - streamer = self.grpc.BidiWriteObject([r2], context) - responses = list(streamer) - context.abort.assert_called_with(StatusCode.UNAVAILABLE, unittest.mock.ANY) - blob = responses[0].resource - self.assertEqual(blob.name, "object-name") - self.assertEqual(blob.bucket, "projects/_/buckets/bucket-name") - self.assertEqual(blob.size, 2 * UPLOAD_QUANTUM) + ) + ), + context=context, + ) + context.abort.assert_called_with(StatusCode.UNAVAILABLE, unittest.mock.ANY) + self.assertIsNotNone(start.upload_id) + + # Upload the first 256KiB chunk of data and trigger error. + content = self._create_block(UPLOAD_QUANTUM).encode("utf-8") + r1 = storage_pb2.BidiWriteObjectRequest( + upload_id=start.upload_id, + write_offset=0, + checksummed_data=storage_pb2.ChecksummedData( + content=content, crc32c=crc32c.crc32c(content) + ), + finish_write=False, + ) + list(self.grpc.BidiWriteObject([r1], context)) + context.abort.assert_called_with(StatusCode.UNAVAILABLE, unittest.mock.ANY) + + # Send a full object upload here to verify testbench can + # (1) trigger error_after_bytes instructions, + # (2) ignore duplicate request bytes and + # (3) return a forced failure with partial data. + media = self._create_block(2 * UPLOAD_QUANTUM).encode("utf-8") + r2 = storage_pb2.BidiWriteObjectRequest( + upload_id=start.upload_id, + write_offset=0, + checksummed_data=storage_pb2.ChecksummedData( + content=media, crc32c=crc32c.crc32c(media) + ), + finish_write=True, + ) + streamer = self.grpc.BidiWriteObject([r2], context) + responses = list(streamer) + context.abort.assert_called_with(StatusCode.UNAVAILABLE, unittest.mock.ANY) + blob = responses[0].resource + self.assertEqual(blob.name, "object-name") + self.assertEqual(blob.bucket, "projects/_/buckets/bucket-name") + self.assertEqual(blob.size, 2 * UPLOAD_QUANTUM) if __name__ == "__main__": - unittest.main() + unittest.main()