Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: allow after instructions for resumable uploads in a single chunk #514

Merged
merged 14 commits into from
Jul 19, 2023
83 changes: 64 additions & 19 deletions testbench/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,11 @@ def corrupt_media(media):
return b"B" + media[1:] if media[0:1] == b"A" else b"A" + media[1:]


def partial_media(media, range_end, range_start=0):
"""Returns partial media due to forced interruption or server validation."""
return media[range_start:range_end]


# === HEADERS === #


Expand Down Expand Up @@ -784,25 +789,6 @@ def handle_retry_test_instruction(database, request, socket_closer, method):
return __get_streamer_response_fn(
database, method, socket_closer, test_id, limit=after_bytes
)
error_after_bytes_matches = testbench.common.retry_return_error_after_bytes.match(
next_instruction
)
if error_after_bytes_matches and method == "storage.objects.insert":
items = list(error_after_bytes_matches.groups())
error_code = int(items[0])
after_bytes = int(items[1]) * 1024
# Upload failures should allow to not complete after certain bytes
upload_id = request.args.get("upload_id", None)
if upload_id is not None:
upload = database.get_upload(upload_id, None)
if upload is not None and len(upload.media) >= after_bytes:
database.dequeue_next_instruction(test_id, method)
testbench.error.generic(
"Fault injected after uploading %d bytes" % len(upload.media),
rest_code=error_code,
grpc_code=StatusCode.INTERNAL, # not really used
context=None,
)
retry_return_short_response = testbench.common.retry_return_short_response.match(
next_instruction
)
Expand Down Expand Up @@ -836,6 +822,65 @@ def wrapper(*args, **kwargs):
return retry_test


def get_retry_uploads_error_after_bytes(database, request):
"""Retrieve error code and #bytes corresponding to uploads from retry test instructions."""
test_id = request.headers.get("x-retry-test-id", None)
if not test_id:
return 0, 0, ""
next_instruction = database.peek_next_instruction(test_id, "storage.objects.insert")
if not next_instruction:
return 0, 0, ""
error_after_bytes_matches = testbench.common.retry_return_error_after_bytes.match(
next_instruction
)
if error_after_bytes_matches:
items = list(error_after_bytes_matches.groups())
error_code = int(items[0])
after_bytes = int(items[1]) * 1024
return error_code, after_bytes, test_id
return 0, 0, ""


def handle_retry_uploads_error_after_bytes(
upload,
data,
database,
error_code,
after_bytes,
last_byte_persisted,
chunk_first_byte,
chunk_last_byte,
test_id=0,
):
"""
Handle error-after-bytes instructions for resumable uploads and commit only partial data before forcing a testbench error.
This helper method also ignores request bytes that have already been persisted, which aligns with GCS behavior.
"""
if after_bytes > last_byte_persisted and after_bytes <= (chunk_last_byte + 1):
range_start = 0
# Ignore request bytes that have already been persisted.
if last_byte_persisted != 0 and int(chunk_first_byte) <= last_byte_persisted:
range_start = last_byte_persisted - int(chunk_first_byte) + 1
range_end = len(data)
# Only partial data will be commited due to the instructed interruption.
if after_bytes <= chunk_last_byte:
range_end = len(data) - (chunk_last_byte - after_bytes + 1)
data = testbench.common.partial_media(
data, range_end=range_end, range_start=range_start
)
upload.media += data
upload.complete = False
if len(upload.media) >= after_bytes:
if test_id:
database.dequeue_next_instruction(test_id, "storage.objects.insert")
testbench.error.generic(
"Fault injected during a resumable upload",
rest_code=error_code,
grpc_code=None,
context=None,
)


def handle_gzip_request(request):
"""
Handle gzip compressed JSON payloads when Content-Encoding: gzip is present on metadata requests.
Expand Down
50 changes: 47 additions & 3 deletions testbench/rest_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,7 @@ def resumable_upload_chunk(bucket_name):
upload = db.get_upload(upload_id, None)
if upload.complete:
return gcs_type.object.Object.rest(upload.metadata)
last_byte_persisted = 0 if len(upload.media) == 0 else (len(upload.media) - 1)
upload.transfer.add(request.environ.get("HTTP_TRANSFER_ENCODING", ""))
content_length = request.headers.get("content-length", None)
data = testbench.common.extract_media(request)
Expand Down Expand Up @@ -1000,13 +1001,20 @@ def resumable_upload_chunk(bucket_name):
blob.rest_metadata(), projection, fields
)
return upload.resumable_status_rest()
_, chunk_last_byte = [v for v in items[0].split("-")]
# In addition to chunk_last_byte, we also need to inspect chunk_first_byte.
chunk_first_byte, chunk_last_byte = [v for v in items[0].split("-")]
x_upload_content_length = int(
upload.request.headers.get("x-upload-content-length", 0)
)
if chunk_last_byte == "*":
x_upload_content_length = len(upload.media)
chunk_last_byte = len(upload.media) - 1
x_upload_content_length = (
len(data) if not x_upload_content_length else x_upload_content_length
)
chunk_last_byte = (
len(data) - 1
if chunk_first_byte == "*"
else int(chunk_first_byte) + len(data) - 1
)
else:
chunk_last_byte = int(chunk_last_byte)
total_object_size = (
Expand All @@ -1023,6 +1031,42 @@ def resumable_upload_chunk(bucket_name):
None,
rest_code=400,
)
### Handle error-after-bytes instructions, either retry test or x-goog-emulator-instructions.
instruction = testbench.common.extract_instruction(request, context=None)
(
error_code,
after_bytes,
test_id,
) = testbench.common.get_retry_uploads_error_after_bytes(db, request)
if error_code or instruction == "return-503-after-256K":
if instruction == "return-503-after-256K":
error_code = 503
after_bytes = 262144
testbench.common.handle_retry_uploads_error_after_bytes(
upload,
data,
db,
error_code,
after_bytes,
last_byte_persisted,
chunk_first_byte,
chunk_last_byte,
test_id,
)
# The testbench should ignore any request bytes that have already been persisted,
# to be aligned with GCS behavior (https://cloud.google.com/storage/docs/resumable-uploads#resent-data).
# Thus we validate chunk_first_byte against last_byte_persisted.
range_start = 0
if chunk_first_byte != "*":
if (
last_byte_persisted != 0
and int(chunk_first_byte) <= last_byte_persisted
):
range_start = last_byte_persisted - int(chunk_first_byte) + 1
if range_start:
data = testbench.common.partial_media(
data, range_end=(chunk_last_byte + 1), range_start=range_start
)
upload.media += data
upload.complete = total_object_size == len(upload.media) or (
chunk_last_byte + 1 == total_object_size
Expand Down
65 changes: 64 additions & 1 deletion tests/test_testbench_object_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ def test_upload_resumable_x_upload_content_length(self):
"/upload/storage/v1/b/bucket-name/o",
query_string={"upload_id": upload_id},
headers={
"content-range": "bytes {last:d}-*/*".format(last=len(chunk) - 1),
"content-range": "bytes {last:d}-*/*".format(last=len(chunk)),
},
data=chunk,
)
Expand Down Expand Up @@ -617,6 +617,69 @@ def test_upload_pre_conditions_failure(self):
)
self.assertEqual(response.status_code, 412, msg=name)

def test_upload_resumable_w_fault_injection(self):
# Test fault injection "return-503-after-256K"
media = self._create_valid_chunk()
response = self.client.post(
"/upload/storage/v1/b/bucket-name/o",
query_string={"uploadType": "resumable", "name": "fox"},
content_type="application/json",
headers={
"x-goog-testbench-instructions": "return-503-after-256K",
},
)
self.assertEqual(response.status_code, 200)
location = response.headers.get("location")
self.assertIn("upload_id=", location)
match = re.search("[&?]upload_id=([^&]+)", location)
self.assertIsNotNone(match, msg=location)
upload_id = match.group(1)

response = self.client.put(
"/upload/storage/v1/b/bucket-name/o",
query_string={"upload_id": upload_id},
data=media,
headers={
"content-range": "bytes 0-{last:d}/{object_size:d}".format(
last=UPLOAD_QUANTUM - 1, object_size=UPLOAD_QUANTUM
),
"x-goog-testbench-instructions": "return-503-after-256K",
},
)
self.assertEqual(response.status_code, 503)

# Test fault injection "inject-upload-data-error"
response = self.client.post(
"/upload/storage/v1/b/bucket-name/o",
query_string={"uploadType": "resumable", "name": "zebra"},
content_type="application/json",
headers={
"x-goog-testbench-instructions": "inject-upload-data-error",
},
)
self.assertEqual(response.status_code, 200)
location = response.headers.get("location")
self.assertIn("upload_id=", location)
match = re.search("[&?]upload_id=([^&]+)", location)
self.assertIsNotNone(match, msg=location)
upload_id = match.group(1)

response = self.client.put(
"/upload/storage/v1/b/bucket-name/o",
query_string={"upload_id": upload_id},
data=media,
headers={
"x-goog-testbench-instructions": "inject-upload-data-error",
},
)
self.assertEqual(response.status_code, 200)

response = self.client.get(
"/download/storage/v1/b/bucket-name/o/zebra", query_string={"alt": "media"}
)
self.assertEqual(response.status_code, 200)
self.assertNotEqual(response.data.decode("utf-8"), media)


if __name__ == "__main__":
unittest.main()
81 changes: 67 additions & 14 deletions tests/test_testbench_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ def test_retry_test_return_no_metadata_on_resumable_multi_chunk_complete(self):
response = self.client.put(
location,
headers={
"content-range": "bytes {last:d}-*/*".format(last=len(chunk) - 1),
"content-range": "bytes {last:d}-*/*".format(last=len(chunk)),
"x-retry-test-id": id,
},
data=chunk,
Expand Down Expand Up @@ -438,20 +438,21 @@ def test_retry_test_return_error_after_bytes(self):
"/storage/v1/b", data=json.dumps({"name": "bucket-name"})
)
self.assertEqual(response.status_code, 200)
# Use the XML API to inject an object with some data.
media = self._create_block(256)
response = self.client.put(
"/bucket-name/256k.txt",
content_type="text/plain",
data=media,
)
self.assertEqual(response.status_code, 200)

# Setup a failure for reading back the object.
# Setup two after-bytes errors to test injecting failures in
# resumable uploads, both multiple chunks and a single chunk.
error_after_300K = 300 * 1024
response = self.client.post(
"/retry_test",
data=json.dumps(
{"instructions": {"storage.objects.insert": ["return-504-after-256K"]}}
{
"instructions": {
"storage.objects.insert": [
"return-504-after-256K",
"return-504-after-300K",
]
}
}
),
)
self.assertEqual(response.status_code, 200)
Expand All @@ -474,37 +475,89 @@ def test_retry_test_return_error_after_bytes(self):
self.assertIsNotNone(match, msg=location)
upload_id = match.group(1)

# Upload the first 256KiB chunk of data and trigger error.
chunk = self._create_block(UPLOAD_QUANTUM)
self.assertEqual(len(chunk), UPLOAD_QUANTUM)

response = self.client.put(
"/upload/storage/v1/b/bucket-name/o",
query_string={"upload_id": upload_id},
headers={
"content-range": "bytes 0-{len:d}/*".format(len=UPLOAD_QUANTUM - 1),
"content-range": "bytes 0-{len:d}/{obj_size:d}".format(
len=UPLOAD_QUANTUM - 1, obj_size=2 * UPLOAD_QUANTUM
),
"x-retry-test-id": id,
},
data=chunk,
)
self.assertEqual(response.status_code, 504, msg=response.data)

# Check the status of a resumable upload.
response = self.client.put(
"/upload/storage/v1/b/bucket-name/o",
query_string={"upload_id": upload_id},
headers={
"content-range": "bytes */*",
"x-retry-test-id": id,
},
)
self.assertEqual(response.status_code, 308, msg=response.data)
self.assertIn("range", response.headers)
self.assertEqual(
response.headers.get("range"), "bytes=0-%d" % (UPLOAD_QUANTUM - 1)
)

# Send a full object upload here to verify testbench can
# (1) trigger error_after_bytes instructions,
# (2) ignore duplicate request bytes and
# (3) return a forced failure with partial data.
chunk = self._create_block(2 * UPLOAD_QUANTUM)
self.assertEqual(len(chunk), 2 * UPLOAD_QUANTUM)
response = self.client.put(
"/upload/storage/v1/b/bucket-name/o",
query_string={"upload_id": upload_id},
headers={
"content-range": "bytes {beg:d}-{end:d}/*".format(
beg=UPLOAD_QUANTUM, end=2 * UPLOAD_QUANTUM - 1
"content-range": "bytes 0-{len:d}/{obj_size:d}".format(
len=2 * UPLOAD_QUANTUM - 1, obj_size=2 * UPLOAD_QUANTUM
),
"x-retry-test-id": id,
},
data=chunk,
)
self.assertEqual(response.status_code, 504, msg=response.data)

# Check the status of a resumable upload.
response = self.client.put(
"/upload/storage/v1/b/bucket-name/o",
query_string={"upload_id": upload_id},
headers={
"content-range": "bytes */*",
"x-retry-test-id": id,
},
)
self.assertEqual(response.status_code, 308, msg=response.data)
self.assertIn("range", response.headers)
self.assertEqual(
response.headers.get("range"), "bytes=0-%d" % (error_after_300K - 1)
)

# Finally to complete the upload, resend a full object upload again.
response = self.client.put(
"/upload/storage/v1/b/bucket-name/o",
query_string={"upload_id": upload_id},
headers={
"content-range": "bytes 0-{len:d}/{obj_size:d}".format(
len=2 * UPLOAD_QUANTUM - 1, obj_size=2 * UPLOAD_QUANTUM
),
"x-retry-test-id": id,
},
data=chunk,
)
self.assertEqual(response.status_code, 200, msg=response.data)
create_rest = json.loads(response.data)
self.assertIn("size", create_rest)
self.assertEqual(int(create_rest.get("size")), 2 * UPLOAD_QUANTUM)


if __name__ == "__main__":
unittest.main()