Skip to content

Commit

Permalink
fix: allow after instructions for resumable uploads in a single chunk (
Browse files Browse the repository at this point in the history
…#514)

* fix: add support to resumable uploads

* dedup

* update upload only; no need to init blob yet

* uploads should ignore request bytes already persisted

* wip update retry test after bytes in resum uploads

* cleanup logic around ignoring duplicate request bytes

* add helper method and update comments

* black lint

* update tests

* handle after_byte instruction only when neccessary

* fix previous commit

* update tests and coverage

* update chunk last byte in PUT

* only handle after-bytes testbench instructions
  • Loading branch information
cojenco authored Jul 19, 2023
1 parent adb1928 commit 99e3202
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 37 deletions.
83 changes: 64 additions & 19 deletions testbench/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,11 @@ def corrupt_media(media):
return b"B" + media[1:] if media[0:1] == b"A" else b"A" + media[1:]


def partial_media(media, range_end, range_start=0):
"""Returns partial media due to forced interruption or server validation."""
return media[range_start:range_end]


# === HEADERS === #


Expand Down Expand Up @@ -784,25 +789,6 @@ def handle_retry_test_instruction(database, request, socket_closer, method):
return __get_streamer_response_fn(
database, method, socket_closer, test_id, limit=after_bytes
)
error_after_bytes_matches = testbench.common.retry_return_error_after_bytes.match(
next_instruction
)
if error_after_bytes_matches and method == "storage.objects.insert":
items = list(error_after_bytes_matches.groups())
error_code = int(items[0])
after_bytes = int(items[1]) * 1024
# Upload failures should allow to not complete after certain bytes
upload_id = request.args.get("upload_id", None)
if upload_id is not None:
upload = database.get_upload(upload_id, None)
if upload is not None and len(upload.media) >= after_bytes:
database.dequeue_next_instruction(test_id, method)
testbench.error.generic(
"Fault injected after uploading %d bytes" % len(upload.media),
rest_code=error_code,
grpc_code=StatusCode.INTERNAL, # not really used
context=None,
)
retry_return_short_response = testbench.common.retry_return_short_response.match(
next_instruction
)
Expand Down Expand Up @@ -836,6 +822,65 @@ def wrapper(*args, **kwargs):
return retry_test


def get_retry_uploads_error_after_bytes(database, request):
"""Retrieve error code and #bytes corresponding to uploads from retry test instructions."""
test_id = request.headers.get("x-retry-test-id", None)
if not test_id:
return 0, 0, ""
next_instruction = database.peek_next_instruction(test_id, "storage.objects.insert")
if not next_instruction:
return 0, 0, ""
error_after_bytes_matches = testbench.common.retry_return_error_after_bytes.match(
next_instruction
)
if error_after_bytes_matches:
items = list(error_after_bytes_matches.groups())
error_code = int(items[0])
after_bytes = int(items[1]) * 1024
return error_code, after_bytes, test_id
return 0, 0, ""


def handle_retry_uploads_error_after_bytes(
upload,
data,
database,
error_code,
after_bytes,
last_byte_persisted,
chunk_first_byte,
chunk_last_byte,
test_id=0,
):
"""
Handle error-after-bytes instructions for resumable uploads and commit only partial data before forcing a testbench error.
This helper method also ignores request bytes that have already been persisted, which aligns with GCS behavior.
"""
if after_bytes > last_byte_persisted and after_bytes <= (chunk_last_byte + 1):
range_start = 0
# Ignore request bytes that have already been persisted.
if last_byte_persisted != 0 and int(chunk_first_byte) <= last_byte_persisted:
range_start = last_byte_persisted - int(chunk_first_byte) + 1
range_end = len(data)
# Only partial data will be commited due to the instructed interruption.
if after_bytes <= chunk_last_byte:
range_end = len(data) - (chunk_last_byte - after_bytes + 1)
data = testbench.common.partial_media(
data, range_end=range_end, range_start=range_start
)
upload.media += data
upload.complete = False
if len(upload.media) >= after_bytes:
if test_id:
database.dequeue_next_instruction(test_id, "storage.objects.insert")
testbench.error.generic(
"Fault injected during a resumable upload",
rest_code=error_code,
grpc_code=None,
context=None,
)


def handle_gzip_request(request):
"""
Handle gzip compressed JSON payloads when Content-Encoding: gzip is present on metadata requests.
Expand Down
50 changes: 47 additions & 3 deletions testbench/rest_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,7 @@ def resumable_upload_chunk(bucket_name):
upload = db.get_upload(upload_id, None)
if upload.complete:
return gcs_type.object.Object.rest(upload.metadata)
last_byte_persisted = 0 if len(upload.media) == 0 else (len(upload.media) - 1)
upload.transfer.add(request.environ.get("HTTP_TRANSFER_ENCODING", ""))
content_length = request.headers.get("content-length", None)
data = testbench.common.extract_media(request)
Expand Down Expand Up @@ -1000,13 +1001,20 @@ def resumable_upload_chunk(bucket_name):
blob.rest_metadata(), projection, fields
)
return upload.resumable_status_rest()
_, chunk_last_byte = [v for v in items[0].split("-")]
# In addition to chunk_last_byte, we also need to inspect chunk_first_byte.
chunk_first_byte, chunk_last_byte = [v for v in items[0].split("-")]
x_upload_content_length = int(
upload.request.headers.get("x-upload-content-length", 0)
)
if chunk_last_byte == "*":
x_upload_content_length = len(upload.media)
chunk_last_byte = len(upload.media) - 1
x_upload_content_length = (
len(data) if not x_upload_content_length else x_upload_content_length
)
chunk_last_byte = (
len(data) - 1
if chunk_first_byte == "*"
else int(chunk_first_byte) + len(data) - 1
)
else:
chunk_last_byte = int(chunk_last_byte)
total_object_size = (
Expand All @@ -1023,6 +1031,42 @@ def resumable_upload_chunk(bucket_name):
None,
rest_code=400,
)
### Handle error-after-bytes instructions, either retry test or x-goog-emulator-instructions.
instruction = testbench.common.extract_instruction(request, context=None)
(
error_code,
after_bytes,
test_id,
) = testbench.common.get_retry_uploads_error_after_bytes(db, request)
if error_code or instruction == "return-503-after-256K":
if instruction == "return-503-after-256K":
error_code = 503
after_bytes = 262144
testbench.common.handle_retry_uploads_error_after_bytes(
upload,
data,
db,
error_code,
after_bytes,
last_byte_persisted,
chunk_first_byte,
chunk_last_byte,
test_id,
)
# The testbench should ignore any request bytes that have already been persisted,
# to be aligned with GCS behavior (https://cloud.google.com/storage/docs/resumable-uploads#resent-data).
# Thus we validate chunk_first_byte against last_byte_persisted.
range_start = 0
if chunk_first_byte != "*":
if (
last_byte_persisted != 0
and int(chunk_first_byte) <= last_byte_persisted
):
range_start = last_byte_persisted - int(chunk_first_byte) + 1
if range_start:
data = testbench.common.partial_media(
data, range_end=(chunk_last_byte + 1), range_start=range_start
)
upload.media += data
upload.complete = total_object_size == len(upload.media) or (
chunk_last_byte + 1 == total_object_size
Expand Down
65 changes: 64 additions & 1 deletion tests/test_testbench_object_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ def test_upload_resumable_x_upload_content_length(self):
"/upload/storage/v1/b/bucket-name/o",
query_string={"upload_id": upload_id},
headers={
"content-range": "bytes {last:d}-*/*".format(last=len(chunk) - 1),
"content-range": "bytes {last:d}-*/*".format(last=len(chunk)),
},
data=chunk,
)
Expand Down Expand Up @@ -617,6 +617,69 @@ def test_upload_pre_conditions_failure(self):
)
self.assertEqual(response.status_code, 412, msg=name)

def test_upload_resumable_w_fault_injection(self):
# Test fault injection "return-503-after-256K"
media = self._create_valid_chunk()
response = self.client.post(
"/upload/storage/v1/b/bucket-name/o",
query_string={"uploadType": "resumable", "name": "fox"},
content_type="application/json",
headers={
"x-goog-testbench-instructions": "return-503-after-256K",
},
)
self.assertEqual(response.status_code, 200)
location = response.headers.get("location")
self.assertIn("upload_id=", location)
match = re.search("[&?]upload_id=([^&]+)", location)
self.assertIsNotNone(match, msg=location)
upload_id = match.group(1)

response = self.client.put(
"/upload/storage/v1/b/bucket-name/o",
query_string={"upload_id": upload_id},
data=media,
headers={
"content-range": "bytes 0-{last:d}/{object_size:d}".format(
last=UPLOAD_QUANTUM - 1, object_size=UPLOAD_QUANTUM
),
"x-goog-testbench-instructions": "return-503-after-256K",
},
)
self.assertEqual(response.status_code, 503)

# Test fault injection "inject-upload-data-error"
response = self.client.post(
"/upload/storage/v1/b/bucket-name/o",
query_string={"uploadType": "resumable", "name": "zebra"},
content_type="application/json",
headers={
"x-goog-testbench-instructions": "inject-upload-data-error",
},
)
self.assertEqual(response.status_code, 200)
location = response.headers.get("location")
self.assertIn("upload_id=", location)
match = re.search("[&?]upload_id=([^&]+)", location)
self.assertIsNotNone(match, msg=location)
upload_id = match.group(1)

response = self.client.put(
"/upload/storage/v1/b/bucket-name/o",
query_string={"upload_id": upload_id},
data=media,
headers={
"x-goog-testbench-instructions": "inject-upload-data-error",
},
)
self.assertEqual(response.status_code, 200)

response = self.client.get(
"/download/storage/v1/b/bucket-name/o/zebra", query_string={"alt": "media"}
)
self.assertEqual(response.status_code, 200)
self.assertNotEqual(response.data.decode("utf-8"), media)


if __name__ == "__main__":
unittest.main()
81 changes: 67 additions & 14 deletions tests/test_testbench_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ def test_retry_test_return_no_metadata_on_resumable_multi_chunk_complete(self):
response = self.client.put(
location,
headers={
"content-range": "bytes {last:d}-*/*".format(last=len(chunk) - 1),
"content-range": "bytes {last:d}-*/*".format(last=len(chunk)),
"x-retry-test-id": id,
},
data=chunk,
Expand Down Expand Up @@ -438,20 +438,21 @@ def test_retry_test_return_error_after_bytes(self):
"/storage/v1/b", data=json.dumps({"name": "bucket-name"})
)
self.assertEqual(response.status_code, 200)
# Use the XML API to inject an object with some data.
media = self._create_block(256)
response = self.client.put(
"/bucket-name/256k.txt",
content_type="text/plain",
data=media,
)
self.assertEqual(response.status_code, 200)

# Setup a failure for reading back the object.
# Setup two after-bytes errors to test injecting failures in
# resumable uploads, both multiple chunks and a single chunk.
error_after_300K = 300 * 1024
response = self.client.post(
"/retry_test",
data=json.dumps(
{"instructions": {"storage.objects.insert": ["return-504-after-256K"]}}
{
"instructions": {
"storage.objects.insert": [
"return-504-after-256K",
"return-504-after-300K",
]
}
}
),
)
self.assertEqual(response.status_code, 200)
Expand All @@ -474,37 +475,89 @@ def test_retry_test_return_error_after_bytes(self):
self.assertIsNotNone(match, msg=location)
upload_id = match.group(1)

# Upload the first 256KiB chunk of data and trigger error.
chunk = self._create_block(UPLOAD_QUANTUM)
self.assertEqual(len(chunk), UPLOAD_QUANTUM)

response = self.client.put(
"/upload/storage/v1/b/bucket-name/o",
query_string={"upload_id": upload_id},
headers={
"content-range": "bytes 0-{len:d}/*".format(len=UPLOAD_QUANTUM - 1),
"content-range": "bytes 0-{len:d}/{obj_size:d}".format(
len=UPLOAD_QUANTUM - 1, obj_size=2 * UPLOAD_QUANTUM
),
"x-retry-test-id": id,
},
data=chunk,
)
self.assertEqual(response.status_code, 504, msg=response.data)

# Check the status of a resumable upload.
response = self.client.put(
"/upload/storage/v1/b/bucket-name/o",
query_string={"upload_id": upload_id},
headers={
"content-range": "bytes */*",
"x-retry-test-id": id,
},
)
self.assertEqual(response.status_code, 308, msg=response.data)
self.assertIn("range", response.headers)
self.assertEqual(
response.headers.get("range"), "bytes=0-%d" % (UPLOAD_QUANTUM - 1)
)

# Send a full object upload here to verify testbench can
# (1) trigger error_after_bytes instructions,
# (2) ignore duplicate request bytes and
# (3) return a forced failure with partial data.
chunk = self._create_block(2 * UPLOAD_QUANTUM)
self.assertEqual(len(chunk), 2 * UPLOAD_QUANTUM)
response = self.client.put(
"/upload/storage/v1/b/bucket-name/o",
query_string={"upload_id": upload_id},
headers={
"content-range": "bytes {beg:d}-{end:d}/*".format(
beg=UPLOAD_QUANTUM, end=2 * UPLOAD_QUANTUM - 1
"content-range": "bytes 0-{len:d}/{obj_size:d}".format(
len=2 * UPLOAD_QUANTUM - 1, obj_size=2 * UPLOAD_QUANTUM
),
"x-retry-test-id": id,
},
data=chunk,
)
self.assertEqual(response.status_code, 504, msg=response.data)

# Check the status of a resumable upload.
response = self.client.put(
"/upload/storage/v1/b/bucket-name/o",
query_string={"upload_id": upload_id},
headers={
"content-range": "bytes */*",
"x-retry-test-id": id,
},
)
self.assertEqual(response.status_code, 308, msg=response.data)
self.assertIn("range", response.headers)
self.assertEqual(
response.headers.get("range"), "bytes=0-%d" % (error_after_300K - 1)
)

# Finally to complete the upload, resend a full object upload again.
response = self.client.put(
"/upload/storage/v1/b/bucket-name/o",
query_string={"upload_id": upload_id},
headers={
"content-range": "bytes 0-{len:d}/{obj_size:d}".format(
len=2 * UPLOAD_QUANTUM - 1, obj_size=2 * UPLOAD_QUANTUM
),
"x-retry-test-id": id,
},
data=chunk,
)
self.assertEqual(response.status_code, 200, msg=response.data)
create_rest = json.loads(response.data)
self.assertIn("size", create_rest)
self.assertEqual(int(create_rest.get("size")), 2 * UPLOAD_QUANTUM)


if __name__ == "__main__":
unittest.main()

0 comments on commit 99e3202

Please sign in to comment.