diff --git a/testbench/common.py b/testbench/common.py index 7a7363a4..ab5fd72c 100644 --- a/testbench/common.py +++ b/testbench/common.py @@ -611,6 +611,11 @@ def corrupt_media(media): return b"B" + media[1:] if media[0:1] == b"A" else b"A" + media[1:] +def partial_media(media, range_end, range_start=0): + """Returns partial media due to forced interruption or server validation.""" + return media[range_start:range_end] + + # === HEADERS === # @@ -784,25 +789,6 @@ def handle_retry_test_instruction(database, request, socket_closer, method): return __get_streamer_response_fn( database, method, socket_closer, test_id, limit=after_bytes ) - error_after_bytes_matches = testbench.common.retry_return_error_after_bytes.match( - next_instruction - ) - if error_after_bytes_matches and method == "storage.objects.insert": - items = list(error_after_bytes_matches.groups()) - error_code = int(items[0]) - after_bytes = int(items[1]) * 1024 - # Upload failures should allow to not complete after certain bytes - upload_id = request.args.get("upload_id", None) - if upload_id is not None: - upload = database.get_upload(upload_id, None) - if upload is not None and len(upload.media) >= after_bytes: - database.dequeue_next_instruction(test_id, method) - testbench.error.generic( - "Fault injected after uploading %d bytes" % len(upload.media), - rest_code=error_code, - grpc_code=StatusCode.INTERNAL, # not really used - context=None, - ) retry_return_short_response = testbench.common.retry_return_short_response.match( next_instruction ) @@ -836,6 +822,65 @@ def wrapper(*args, **kwargs): return retry_test +def get_retry_uploads_error_after_bytes(database, request): + """Retrieve error code and #bytes corresponding to uploads from retry test instructions.""" + test_id = request.headers.get("x-retry-test-id", None) + if not test_id: + return 0, 0, "" + next_instruction = database.peek_next_instruction(test_id, "storage.objects.insert") + if not next_instruction: + return 0, 0, "" + error_after_bytes_matches = testbench.common.retry_return_error_after_bytes.match( + next_instruction + ) + if error_after_bytes_matches: + items = list(error_after_bytes_matches.groups()) + error_code = int(items[0]) + after_bytes = int(items[1]) * 1024 + return error_code, after_bytes, test_id + return 0, 0, "" + + +def handle_retry_uploads_error_after_bytes( + upload, + data, + database, + error_code, + after_bytes, + last_byte_persisted, + chunk_first_byte, + chunk_last_byte, + test_id=0, +): + """ + Handle error-after-bytes instructions for resumable uploads and commit only partial data before forcing a testbench error. + This helper method also ignores request bytes that have already been persisted, which aligns with GCS behavior. + """ + if after_bytes > last_byte_persisted and after_bytes <= (chunk_last_byte + 1): + range_start = 0 + # Ignore request bytes that have already been persisted. + if last_byte_persisted != 0 and int(chunk_first_byte) <= last_byte_persisted: + range_start = last_byte_persisted - int(chunk_first_byte) + 1 + range_end = len(data) + # Only partial data will be commited due to the instructed interruption. + if after_bytes <= chunk_last_byte: + range_end = len(data) - (chunk_last_byte - after_bytes + 1) + data = testbench.common.partial_media( + data, range_end=range_end, range_start=range_start + ) + upload.media += data + upload.complete = False + if len(upload.media) >= after_bytes: + if test_id: + database.dequeue_next_instruction(test_id, "storage.objects.insert") + testbench.error.generic( + "Fault injected during a resumable upload", + rest_code=error_code, + grpc_code=None, + context=None, + ) + + def handle_gzip_request(request): """ Handle gzip compressed JSON payloads when Content-Encoding: gzip is present on metadata requests. diff --git a/testbench/rest_server.py b/testbench/rest_server.py index ae7315ba..166ccbd8 100644 --- a/testbench/rest_server.py +++ b/testbench/rest_server.py @@ -950,6 +950,7 @@ def resumable_upload_chunk(bucket_name): upload = db.get_upload(upload_id, None) if upload.complete: return gcs_type.object.Object.rest(upload.metadata) + last_byte_persisted = 0 if len(upload.media) == 0 else (len(upload.media) - 1) upload.transfer.add(request.environ.get("HTTP_TRANSFER_ENCODING", "")) content_length = request.headers.get("content-length", None) data = testbench.common.extract_media(request) @@ -1000,13 +1001,20 @@ def resumable_upload_chunk(bucket_name): blob.rest_metadata(), projection, fields ) return upload.resumable_status_rest() - _, chunk_last_byte = [v for v in items[0].split("-")] + # In addition to chunk_last_byte, we also need to inspect chunk_first_byte. + chunk_first_byte, chunk_last_byte = [v for v in items[0].split("-")] x_upload_content_length = int( upload.request.headers.get("x-upload-content-length", 0) ) if chunk_last_byte == "*": - x_upload_content_length = len(upload.media) - chunk_last_byte = len(upload.media) - 1 + x_upload_content_length = ( + len(data) if not x_upload_content_length else x_upload_content_length + ) + chunk_last_byte = ( + len(data) - 1 + if chunk_first_byte == "*" + else int(chunk_first_byte) + len(data) - 1 + ) else: chunk_last_byte = int(chunk_last_byte) total_object_size = ( @@ -1023,6 +1031,42 @@ def resumable_upload_chunk(bucket_name): None, rest_code=400, ) + ### Handle error-after-bytes instructions, either retry test or x-goog-emulator-instructions. + instruction = testbench.common.extract_instruction(request, context=None) + ( + error_code, + after_bytes, + test_id, + ) = testbench.common.get_retry_uploads_error_after_bytes(db, request) + if error_code or instruction == "return-503-after-256K": + if instruction == "return-503-after-256K": + error_code = 503 + after_bytes = 262144 + testbench.common.handle_retry_uploads_error_after_bytes( + upload, + data, + db, + error_code, + after_bytes, + last_byte_persisted, + chunk_first_byte, + chunk_last_byte, + test_id, + ) + # The testbench should ignore any request bytes that have already been persisted, + # to be aligned with GCS behavior (https://cloud.google.com/storage/docs/resumable-uploads#resent-data). + # Thus we validate chunk_first_byte against last_byte_persisted. + range_start = 0 + if chunk_first_byte != "*": + if ( + last_byte_persisted != 0 + and int(chunk_first_byte) <= last_byte_persisted + ): + range_start = last_byte_persisted - int(chunk_first_byte) + 1 + if range_start: + data = testbench.common.partial_media( + data, range_end=(chunk_last_byte + 1), range_start=range_start + ) upload.media += data upload.complete = total_object_size == len(upload.media) or ( chunk_last_byte + 1 == total_object_size diff --git a/tests/test_testbench_object_upload.py b/tests/test_testbench_object_upload.py index ba81bcfb..267b1d7b 100644 --- a/tests/test_testbench_object_upload.py +++ b/tests/test_testbench_object_upload.py @@ -268,7 +268,7 @@ def test_upload_resumable_x_upload_content_length(self): "/upload/storage/v1/b/bucket-name/o", query_string={"upload_id": upload_id}, headers={ - "content-range": "bytes {last:d}-*/*".format(last=len(chunk) - 1), + "content-range": "bytes {last:d}-*/*".format(last=len(chunk)), }, data=chunk, ) @@ -617,6 +617,69 @@ def test_upload_pre_conditions_failure(self): ) self.assertEqual(response.status_code, 412, msg=name) + def test_upload_resumable_w_fault_injection(self): + # Test fault injection "return-503-after-256K" + media = self._create_valid_chunk() + response = self.client.post( + "/upload/storage/v1/b/bucket-name/o", + query_string={"uploadType": "resumable", "name": "fox"}, + content_type="application/json", + headers={ + "x-goog-testbench-instructions": "return-503-after-256K", + }, + ) + self.assertEqual(response.status_code, 200) + location = response.headers.get("location") + self.assertIn("upload_id=", location) + match = re.search("[&?]upload_id=([^&]+)", location) + self.assertIsNotNone(match, msg=location) + upload_id = match.group(1) + + response = self.client.put( + "/upload/storage/v1/b/bucket-name/o", + query_string={"upload_id": upload_id}, + data=media, + headers={ + "content-range": "bytes 0-{last:d}/{object_size:d}".format( + last=UPLOAD_QUANTUM - 1, object_size=UPLOAD_QUANTUM + ), + "x-goog-testbench-instructions": "return-503-after-256K", + }, + ) + self.assertEqual(response.status_code, 503) + + # Test fault injection "inject-upload-data-error" + response = self.client.post( + "/upload/storage/v1/b/bucket-name/o", + query_string={"uploadType": "resumable", "name": "zebra"}, + content_type="application/json", + headers={ + "x-goog-testbench-instructions": "inject-upload-data-error", + }, + ) + self.assertEqual(response.status_code, 200) + location = response.headers.get("location") + self.assertIn("upload_id=", location) + match = re.search("[&?]upload_id=([^&]+)", location) + self.assertIsNotNone(match, msg=location) + upload_id = match.group(1) + + response = self.client.put( + "/upload/storage/v1/b/bucket-name/o", + query_string={"upload_id": upload_id}, + data=media, + headers={ + "x-goog-testbench-instructions": "inject-upload-data-error", + }, + ) + self.assertEqual(response.status_code, 200) + + response = self.client.get( + "/download/storage/v1/b/bucket-name/o/zebra", query_string={"alt": "media"} + ) + self.assertEqual(response.status_code, 200) + self.assertNotEqual(response.data.decode("utf-8"), media) + if __name__ == "__main__": unittest.main() diff --git a/tests/test_testbench_retry.py b/tests/test_testbench_retry.py index 9279b54e..b3530645 100644 --- a/tests/test_testbench_retry.py +++ b/tests/test_testbench_retry.py @@ -324,7 +324,7 @@ def test_retry_test_return_no_metadata_on_resumable_multi_chunk_complete(self): response = self.client.put( location, headers={ - "content-range": "bytes {last:d}-*/*".format(last=len(chunk) - 1), + "content-range": "bytes {last:d}-*/*".format(last=len(chunk)), "x-retry-test-id": id, }, data=chunk, @@ -438,20 +438,21 @@ def test_retry_test_return_error_after_bytes(self): "/storage/v1/b", data=json.dumps({"name": "bucket-name"}) ) self.assertEqual(response.status_code, 200) - # Use the XML API to inject an object with some data. - media = self._create_block(256) - response = self.client.put( - "/bucket-name/256k.txt", - content_type="text/plain", - data=media, - ) - self.assertEqual(response.status_code, 200) - # Setup a failure for reading back the object. + # Setup two after-bytes errors to test injecting failures in + # resumable uploads, both multiple chunks and a single chunk. + error_after_300K = 300 * 1024 response = self.client.post( "/retry_test", data=json.dumps( - {"instructions": {"storage.objects.insert": ["return-504-after-256K"]}} + { + "instructions": { + "storage.objects.insert": [ + "return-504-after-256K", + "return-504-after-300K", + ] + } + } ), ) self.assertEqual(response.status_code, 200) @@ -474,6 +475,7 @@ def test_retry_test_return_error_after_bytes(self): self.assertIsNotNone(match, msg=location) upload_id = match.group(1) + # Upload the first 256KiB chunk of data and trigger error. chunk = self._create_block(UPLOAD_QUANTUM) self.assertEqual(len(chunk), UPLOAD_QUANTUM) @@ -481,23 +483,42 @@ def test_retry_test_return_error_after_bytes(self): "/upload/storage/v1/b/bucket-name/o", query_string={"upload_id": upload_id}, headers={ - "content-range": "bytes 0-{len:d}/*".format(len=UPLOAD_QUANTUM - 1), + "content-range": "bytes 0-{len:d}/{obj_size:d}".format( + len=UPLOAD_QUANTUM - 1, obj_size=2 * UPLOAD_QUANTUM + ), "x-retry-test-id": id, }, data=chunk, ) + self.assertEqual(response.status_code, 504, msg=response.data) + + # Check the status of a resumable upload. + response = self.client.put( + "/upload/storage/v1/b/bucket-name/o", + query_string={"upload_id": upload_id}, + headers={ + "content-range": "bytes */*", + "x-retry-test-id": id, + }, + ) self.assertEqual(response.status_code, 308, msg=response.data) self.assertIn("range", response.headers) self.assertEqual( response.headers.get("range"), "bytes=0-%d" % (UPLOAD_QUANTUM - 1) ) + # Send a full object upload here to verify testbench can + # (1) trigger error_after_bytes instructions, + # (2) ignore duplicate request bytes and + # (3) return a forced failure with partial data. + chunk = self._create_block(2 * UPLOAD_QUANTUM) + self.assertEqual(len(chunk), 2 * UPLOAD_QUANTUM) response = self.client.put( "/upload/storage/v1/b/bucket-name/o", query_string={"upload_id": upload_id}, headers={ - "content-range": "bytes {beg:d}-{end:d}/*".format( - beg=UPLOAD_QUANTUM, end=2 * UPLOAD_QUANTUM - 1 + "content-range": "bytes 0-{len:d}/{obj_size:d}".format( + len=2 * UPLOAD_QUANTUM - 1, obj_size=2 * UPLOAD_QUANTUM ), "x-retry-test-id": id, }, @@ -505,6 +526,38 @@ def test_retry_test_return_error_after_bytes(self): ) self.assertEqual(response.status_code, 504, msg=response.data) + # Check the status of a resumable upload. + response = self.client.put( + "/upload/storage/v1/b/bucket-name/o", + query_string={"upload_id": upload_id}, + headers={ + "content-range": "bytes */*", + "x-retry-test-id": id, + }, + ) + self.assertEqual(response.status_code, 308, msg=response.data) + self.assertIn("range", response.headers) + self.assertEqual( + response.headers.get("range"), "bytes=0-%d" % (error_after_300K - 1) + ) + + # Finally to complete the upload, resend a full object upload again. + response = self.client.put( + "/upload/storage/v1/b/bucket-name/o", + query_string={"upload_id": upload_id}, + headers={ + "content-range": "bytes 0-{len:d}/{obj_size:d}".format( + len=2 * UPLOAD_QUANTUM - 1, obj_size=2 * UPLOAD_QUANTUM + ), + "x-retry-test-id": id, + }, + data=chunk, + ) + self.assertEqual(response.status_code, 200, msg=response.data) + create_rest = json.loads(response.data) + self.assertIn("size", create_rest) + self.assertEqual(int(create_rest.get("size")), 2 * UPLOAD_QUANTUM) + if __name__ == "__main__": unittest.main()