Skip to content

Commit

Permalink
fix: add PROTO in streaming chunks (#1213)
Browse files Browse the repository at this point in the history
b/372956316
When the row size exceeds a certain limit, the rows are divided into chunks and sent to the client in multiple parts. The client is responsible for merging these chunks to reconstruct the full row. 
However, for PROTO and ENUM types, this chunk-merging logic was not implemented, causing a KeyError: 13 when attempting to merge proto chunks.


#### Sample to reproduce the test case
[Python file](https://gist.github.com/harshachinta/95a81eeda81c422814353a5995d01e20)
[proto file
](https://gist.github.com/harshachinta/fd15bf558bd4f40443411ddd164638cc)

#### Steps to generate descriptors.pb and code file from proto
```
protoc --proto_path=testdata/ --include_imports --descriptor_set_out=testdata/descriptors.pb --python_out=testdata/ testdata/wrapper.proto
```
  • Loading branch information
harshachinta authored Oct 28, 2024
1 parent 68551c2 commit 43c190b
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 0 deletions.
2 changes: 2 additions & 0 deletions google/cloud/spanner_v1/streamed.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,8 @@ def _merge_struct(lhs, rhs, type_):
TypeCode.TIMESTAMP: _merge_string,
TypeCode.NUMERIC: _merge_string,
TypeCode.JSON: _merge_string,
TypeCode.PROTO: _merge_string,
TypeCode.ENUM: _merge_string,
}


Expand Down
40 changes: 40 additions & 0 deletions tests/unit/test_streamed.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,46 @@ def test__merge_chunk_string_w_bytes(self):
)
self.assertIsNone(streamed._pending_chunk)

def test__merge_chunk_proto(self):
from google.cloud.spanner_v1 import TypeCode

iterator = _MockCancellableIterator()
streamed = self._make_one(iterator)
FIELDS = [self._make_scalar_field("proto", TypeCode.PROTO)]
streamed._metadata = self._make_result_set_metadata(FIELDS)
streamed._pending_chunk = self._make_value(
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAAAAAA"
"6fptVAAAACXBIWXMAAAsTAAALEwEAmpwYAAAA\n"
)
chunk = self._make_value(
"B3RJTUUH4QQGFwsBTL3HMwAAABJpVFh0Q29tbWVudAAAAAAAU0FNUExF"
"MG3E+AAAAApJREFUCNdj\nYAAAAAIAAeIhvDMAAAAASUVORK5CYII=\n"
)

merged = streamed._merge_chunk(chunk)

self.assertEqual(
merged.string_value,
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAAAAAA6fptVAAAACXBIWXMAAAsTAAAL"
"EwEAmpwYAAAA\nB3RJTUUH4QQGFwsBTL3HMwAAABJpVFh0Q29tbWVudAAAAAAAU0"
"FNUExFMG3E+AAAAApJREFUCNdj\nYAAAAAIAAeIhvDMAAAAASUVORK5CYII=\n",
)
self.assertIsNone(streamed._pending_chunk)

def test__merge_chunk_enum(self):
from google.cloud.spanner_v1 import TypeCode

iterator = _MockCancellableIterator()
streamed = self._make_one(iterator)
FIELDS = [self._make_scalar_field("age", TypeCode.ENUM)]
streamed._metadata = self._make_result_set_metadata(FIELDS)
streamed._pending_chunk = self._make_value(42)
chunk = self._make_value(13)

merged = streamed._merge_chunk(chunk)
self.assertEqual(merged.string_value, "4213")
self.assertIsNone(streamed._pending_chunk)

def test__merge_chunk_array_of_bool(self):
from google.cloud.spanner_v1 import TypeCode

Expand Down

0 comments on commit 43c190b

Please sign in to comment.