Skip to content

Commit

Permalink
feat: add write stall support (#684)
Browse files Browse the repository at this point in the history
* add code for write stall

* fix test

* remove unnecessary files

* remove unnecessary files

* write test

* undo test changes to remove unnecessary changes

* Update test_testbench_retry.py

* add test

* remove .idea files

* write stall changes

* remove .idea file

* Update test_testbench_retry.py

* test

* stall once for identiacal req

* add comment

* remove .idea file

* fix unit test

* fix unit test

* test changes

* test changes

* review comments

* remove .idea files

* lint fixes

* lint fixes

* lint fixes

* lint fixes

* lint fixes

* code patch fix

* support full uploads

* remove unnecessary things

* remove unnecessary things

* remove unnecessary things

* adding comment

* lint fix

* lint fix

* stall should not happen if uploaded less amount of data then stall size

* stall should not happen if uploaded less amount of data then stall size

* remove last two commit changes

* remove env files

* lint fix

* lint fix

* review comment and adding scenario where upload size is less then stall byte size in single shot

* lint fix

* lint fix
  • Loading branch information
Tulsishah authored Oct 24, 2024
1 parent 7ba2902 commit dc200a3
Show file tree
Hide file tree
Showing 4 changed files with 308 additions and 9 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ curl -H "x-retry-test-id: 1d05c20627844214a9ff7cbcf696317d" "http://localhost:91
| return-broken-stream | [HTTP] Testbench will fail after a few downloaded bytes <br> [GRPC] Testbench will fail with `UNAVAILABLE` after a few downloaded bytes
| return-broken-stream-after-YK | [HTTP] Testbench will fail after YKiB of downloaded data <br> [GRPC] Testbench will fail with `UNAVAILABLE` after YKiB of downloaded data
| return-reset-connection | [HTTP] Testbench will fail with a reset connection <br> [GRPC] Testbench will fail the RPC with `UNAVAILABLE`
| stall-for-Ts-after-YK | [HTTP] Testbench will stall for T second after reading YKiB of downloaded data, e.g. stall-for-10s-after-12K stalls after reading 12KiB of data <br> [GRPC] Not supported
| stall-for-Ts-after-YK | [HTTP] Testbench will stall for T second after reading YKiB of downloaded/uploaded data, e.g. stall-for-10s-after-12K stalls after reading/writing 12KiB of data <br> [GRPC] Not supported

## Releasing the testbench

Expand All @@ -264,4 +264,4 @@ Steps:
1. Title "v0.x.x"
1. Click Generate release notes
1. Make sure "Set as the latest release" is checked
1. Click "Publish Release" to release
1. Click "Publish Release" to release
58 changes: 51 additions & 7 deletions testbench/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -844,24 +844,24 @@ def handle_retry_test_instruction(database, request, socket_closer, method):
broken_stream_after_bytes = (
testbench.common.retry_return_broken_stream_after_bytes.match(next_instruction)
)
if broken_stream_after_bytes and method == "storage.objects.get":
items = list(broken_stream_after_bytes.groups())
after_bytes = int(items[0]) * 1024
return __get_streamer_response_fn(
database, method, socket_closer, test_id, limit=after_bytes
)

retry_stall_after_bytes_matches = testbench.common.retry_stall_after_bytes.match(
next_instruction
)
if retry_stall_after_bytes_matches:
if retry_stall_after_bytes_matches and method != "storage.objects.insert":
items = list(retry_stall_after_bytes_matches.groups())
stall_time = int(items[0])
after_bytes = int(items[1]) * 1024
return __get_stream_and_stall_fn(
database, method, test_id, limit=after_bytes, stall_time_sec=stall_time
)

if broken_stream_after_bytes and method == "storage.objects.get":
items = list(broken_stream_after_bytes.groups())
after_bytes = int(items[0]) * 1024
return __get_streamer_response_fn(
database, method, socket_closer, test_id, limit=after_bytes
)
retry_return_short_response = testbench.common.retry_return_short_response.match(
next_instruction
)
Expand Down Expand Up @@ -895,6 +895,30 @@ def wrapper(*args, **kwargs):
return retry_test


def get_stall_uploads_after_bytes(database, request, context=None, transport="HTTP"):
"""Retrieve stall time and #bytes corresponding to uploads from retry test instructions."""
method = "storage.objects.insert"
test_id = request.headers.get("x-retry-test-id", None)
if not test_id:
return 0, 0, ""
next_instruction = None
if database.has_instructions_retry_test(test_id, method, transport=transport):
next_instruction = database.peek_next_instruction(test_id, method)
if not next_instruction:
return 0, 0, ""

stall_after_byte_matches = testbench.common.retry_stall_after_bytes.match(
next_instruction
)
if stall_after_byte_matches:
items = list(stall_after_byte_matches.groups())
stall_time = int(items[0])
after_bytes = int(items[1]) * 1024
return stall_time, after_bytes, test_id

return 0, 0, ""


def get_retry_uploads_error_after_bytes(
database, request, context=None, transport="HTTP"
):
Expand All @@ -919,9 +943,29 @@ def get_retry_uploads_error_after_bytes(
error_code = int(items[0])
after_bytes = int(items[1]) * 1024
return error_code, after_bytes, test_id

return 0, 0, ""


def handle_stall_uploads_after_bytes(
upload,
data,
database,
stall_time,
after_bytes,
test_id=0,
):
"""
Handle stall-after-bytes instructions for resumable uploads.
Stall happens after given value of bytes.
e.g. We are uploading 120K of data then, stall-2s-after-100K will stall the request.
"""
if len(upload.media) <= after_bytes and len(upload.media) + len(data) > after_bytes:
if test_id:
database.dequeue_next_instruction(test_id, "storage.objects.insert")
time.sleep(stall_time)


def handle_retry_uploads_error_after_bytes(
upload,
data,
Expand Down
30 changes: 30 additions & 0 deletions testbench/rest_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import datetime
import json
import logging
import time

import flask
from google.protobuf import json_format
Expand Down Expand Up @@ -978,6 +979,18 @@ def object_insert(bucket_name):
blob, projection = gcs_type.object.Object.init_media(flask.request, bucket)
elif upload_type == "multipart":
blob, projection = gcs_type.object.Object.init_multipart(flask.request, bucket)
# Handle stall for full uploads.
testbench.common.extract_instruction(request, context=None)
(
stall_time,
after_bytes,
test_id,
) = testbench.common.get_stall_uploads_after_bytes(db, request)
if stall_time and len(blob.media) >= after_bytes:
if test_id:
db.dequeue_next_instruction(test_id, "storage.objects.insert")
time.sleep(stall_time)

db.insert_object(
bucket_name,
blob,
Expand Down Expand Up @@ -1104,6 +1117,23 @@ def resumable_upload_chunk(bucket_name):
chunk_last_byte,
test_id,
)

testbench.common.extract_instruction(request, context=None)
(
stall_time,
after_bytes,
test_id,
) = testbench.common.get_stall_uploads_after_bytes(db, request)

if stall_time:
testbench.common.handle_stall_uploads_after_bytes(
upload,
data,
db,
stall_time,
after_bytes,
test_id,
)
# The testbench should ignore any request bytes that have already been persisted,
# to be aligned with GCS behavior (https://cloud.google.com/storage/docs/resumable-uploads#resent-data).
# Thus we validate chunk_first_byte against last_byte_persisted.
Expand Down
225 changes: 225 additions & 0 deletions tests/test_testbench_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import testbench
from google.storage.v2 import storage_pb2
from testbench import rest_server
from tests.format_multipart_upload import format_multipart_upload

UPLOAD_QUANTUM = 256 * 1024

Expand Down Expand Up @@ -655,6 +656,230 @@ def test_retry_test_return_error_after_bytes(self):
self.assertIn("size", create_rest)
self.assertEqual(int(create_rest.get("size")), 2 * UPLOAD_QUANTUM)

def test_write_retry_test_stall_after_bytes(self):
# Create a new bucket
response = self.client.post(
"/storage/v1/b", data=json.dumps({"name": "bucket-name"})
)
self.assertEqual(response.status_code, 200)

# Setup a stall for reading back the object.
response = self.client.post(
"/retry_test",
data=json.dumps(
{
"instructions": {
"storage.objects.insert": [
"stall-for-1s-after-250K",
"stall-for-1s-after-300K",
]
}
}
),
content_type="application/json",
)
self.assertEqual(response.status_code, 200)
self.assertTrue(
response.headers.get("content-type").startswith("application/json")
)

create_rest = json.loads(response.data)
self.assertIn("id", create_rest)
test_id = create_rest.get("id")

# Initiate resumable upload
response = self.client.post(
"/upload/storage/v1/b/bucket-name/o",
query_string={"uploadType": "resumable", "name": "stall"},
content_type="application/json",
)
self.assertEqual(response.status_code, 200)

location = response.headers.get("location")
self.assertIn("upload_id=", location)
match = re.search(r"[&?]upload_id=([^&]+)", location)
self.assertIsNotNone(match, msg=location)
upload_id = match.group(1)

# Upload the first 256KiB chunk of data and trigger the stall.
chunk = self._create_block(UPLOAD_QUANTUM)
self.assertEqual(len(chunk), UPLOAD_QUANTUM)

start_time = time.perf_counter()
response = self.client.put(
f"/upload/storage/v1/b/bucket-name/o",
query_string={"upload_id": upload_id},
headers={
"content-range": "bytes 0-{len:d}/{obj_size:d}".format(
len=UPLOAD_QUANTUM - 1, obj_size=2 * UPLOAD_QUANTUM
),
"x-retry-test-id": test_id,
},
data=chunk,
)
end_time = time.perf_counter()
elapsed_time = end_time - start_time
self.assertGreater(elapsed_time, 1)
self.assertEqual(response.status_code, 308)

# Upload the second 256KiB chunk of data and trigger the stall again.
start_time = time.perf_counter()
chunk = self._create_block(UPLOAD_QUANTUM)
self.assertEqual(len(chunk), UPLOAD_QUANTUM)
response = self.client.put(
"/upload/storage/v1/b/bucket-name/o",
query_string={"upload_id": upload_id},
headers={
"content-range": "bytes 0-{len:d}/{obj_size:d}".format(
len=2 * UPLOAD_QUANTUM - 1, obj_size=2 * UPLOAD_QUANTUM
),
"x-retry-test-id": test_id,
},
data=chunk,
)
end_time = time.perf_counter()
elapsed_time = end_time - start_time
self.assertGreater(elapsed_time, 1)
self.assertEqual(response.status_code, 200, msg=response.data)

# Upload the second 256KiB chunk of data and check that stall not happen
start_time = time.perf_counter()
chunk = self._create_block(UPLOAD_QUANTUM)
self.assertEqual(len(chunk), UPLOAD_QUANTUM)
response = self.client.put(
"/upload/storage/v1/b/bucket-name/o",
query_string={"upload_id": upload_id},
headers={
"content-range": "bytes 0-{len:d}/{obj_size:d}".format(
len=2 * UPLOAD_QUANTUM - 1, obj_size=2 * UPLOAD_QUANTUM
),
"x-retry-test-id": test_id,
},
data=chunk,
)
end_time = time.perf_counter()
elapsed_time = end_time - start_time
self.assertLess(elapsed_time, 1)
self.assertEqual(response.status_code, 200, msg=response.data)

def test_write_retry_test_stall_single_shot(self):
# Create a new bucket
response = self.client.post(
"/storage/v1/b", data=json.dumps({"name": "bucket-name"})
)
self.assertEqual(response.status_code, 200)

# Setup a stall for reading back the object.
response = self.client.post(
"/retry_test",
data=json.dumps(
{
"instructions": {
"storage.objects.insert": [
"stall-for-1s-after-250K",
]
}
}
),
content_type="application/json",
)
self.assertEqual(response.status_code, 200)
self.assertTrue(
response.headers.get("content-type").startswith("application/json")
)

create_rest = json.loads(response.data)
self.assertIn("id", create_rest)
test_id = create_rest.get("id")

# Upload the 256KiB of data and trigger the stall.
data = self._create_block(UPLOAD_QUANTUM)
self.assertEqual(len(data), UPLOAD_QUANTUM)

start_time = time.perf_counter()
boundary, payload = format_multipart_upload({}, data)
response = self.client.post(
"/upload/storage/v1/b/bucket-name/o",
query_string={"uploadType": "multipart", "name": "stall"},
content_type="multipart/related; boundary=" + boundary,
headers={
"x-retry-test-id": test_id,
},
data=payload,
)
end_time = time.perf_counter()
elapsed_time = end_time - start_time
self.assertEqual(response.status_code, 200)
self.assertGreater(elapsed_time, 1)

# Upload the data again and check that stall not happen.
start_time = time.perf_counter()
response = self.client.post(
"/upload/storage/v1/b/bucket-name/o",
query_string={"uploadType": "multipart", "name": "stall"},
content_type="multipart/related; boundary=" + boundary,
headers={
"x-retry-test-id": test_id,
},
data=payload,
)
end_time = time.perf_counter()
elapsed_time = end_time - start_time
self.assertLess(elapsed_time, 1)
self.assertEqual(response.status_code, 200)

def test_write_retry_test_stall_single_shot_while_upload_size_less_than_stall_size(
self,
):
# Create a new bucket
response = self.client.post(
"/storage/v1/b", data=json.dumps({"name": "bucket-name"})
)
self.assertEqual(response.status_code, 200)

# Setup a stall for reading back the object.
response = self.client.post(
"/retry_test",
data=json.dumps(
{
"instructions": {
"storage.objects.insert": [
"stall-for-1s-after-250K",
]
}
}
),
content_type="application/json",
)
self.assertEqual(response.status_code, 200)
self.assertTrue(
response.headers.get("content-type").startswith("application/json")
)

create_rest = json.loads(response.data)
self.assertIn("id", create_rest)
test_id = create_rest.get("id")

# Upload the 200KiB of data and check stall not happen.
data = self._create_block(200 * 1024)
self.assertEqual(len(data), 200 * 1024)

start_time = time.perf_counter()
boundary, payload = format_multipart_upload({}, data)
response = self.client.post(
"/upload/storage/v1/b/bucket-name/o",
query_string={"uploadType": "multipart", "name": "stall"},
content_type="multipart/related; boundary=" + boundary,
headers={
"x-retry-test-id": test_id,
},
data=payload,
)
end_time = time.perf_counter()
elapsed_time = end_time - start_time
self.assertEqual(response.status_code, 200)
self.assertLess(elapsed_time, 1)


class TestTestbenchRetryGrpc(unittest.TestCase):
def setUp(self):
Expand Down

0 comments on commit dc200a3

Please sign in to comment.