-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement multi-use snapshots #3615
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No real issues, just cosmetics.
Sorry for the delay in review.
@@ -389,8 +389,7 @@ def batch(self): | |||
""" | |||
return BatchCheckout(self) | |||
|
|||
def snapshot(self, read_timestamp=None, min_read_timestamp=None, | |||
max_staleness=None, exact_staleness=None): | |||
def snapshot(self, **kw): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@@ -168,11 +174,17 @@ def __init__(self, session, read_timestamp=None, min_read_timestamp=None, | |||
if len(flagged) > 1: | |||
raise ValueError("Supply zero or one options.") | |||
|
|||
if multi_use and (min_read_timestamp or max_staleness): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@@ -168,11 +174,17 @@ def __init__(self, session, read_timestamp=None, min_read_timestamp=None, | |||
if len(flagged) > 1: | |||
raise ValueError("Supply zero or one options.") | |||
|
|||
if multi_use and (min_read_timestamp or max_staleness): | |||
raise ValueError( | |||
"'multi_use' is incompatile with " |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@@ -130,6 +134,7 @@ def consume_next(self): | |||
self._resume_token = response.resume_token | |||
|
|||
if self._metadata is None: # first response | |||
# XXX: copy implicit txn ID to snapshot, if present. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
spanner/tests/unit/test_snapshot.py
Outdated
@@ -274,6 +292,12 @@ def test_execute_sql_normal(self): | |||
self.assertEqual(options.kwargs['metadata'], | |||
[('google-cloud-resource-prefix', database.name)]) | |||
|
|||
def test_execute_sql_wo_mulit_use(self): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
spanner/tests/unit/test_streamed.py
Outdated
@@ -99,15 +99,52 @@ def _makeListValue(values=(), value_pbs=None): | |||
return Value(list_value=ListValue(values=value_pbs)) | |||
return Value(list_value=_make_list_value_pb(values)) | |||
|
|||
@staticmethod | |||
def _makeResultSetMetadata(fields=(), transaction_id=None): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
spanner/tests/unit/test_streamed.py
Outdated
@@ -13,6 +13,7 @@ | |||
# limitations under the License. | |||
|
|||
|
|||
import mock |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
ad62692
to
d445a73
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spanner/tests/system/test_system.py
Outdated
|
||
def test_multiuse_snapshot_read_isolation_exact_staleness(self): | ||
import time | ||
from datetime import timedelta |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
I haven't added any: I had to rebase to fix the conflicts with your |
if self._multi_use: | ||
return StreamedResultSet(iterator, source=self) | ||
else: | ||
return StreamedResultSet(iterator) | ||
|
||
def execute_sql(self, sql, params=None, param_types=None, query_mode=None, | ||
resume_token=b''): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@@ -157,9 +165,17 @@ class Snapshot(_SnapshotBase): | |||
:type exact_staleness: :class:`datetime.timedelta` | |||
:param exact_staleness: Execute all reads at a timestamp that is | |||
``exact_staleness`` old. | |||
|
|||
:type multi_use: :class:`bool` | |||
:param multi_use: If true, the first read operation creates a read-only |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
if self._multi_use: | ||
return StreamedResultSet(iterator, source=self) | ||
else: | ||
return StreamedResultSet(iterator) |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
The documentation on this page suggests that snapshot already allows you to do multiple reads at a consistent snapshot.
In light of this PR that is clearly incorrect. @jonparrott We also claim the same thing in our [sample code]) @lukesneeringer @bjwatson Can we please get this merged asap. This is a severe bug. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seeing no actual concerns (either in my own review or the other comments), this is good to go out.
@lukesneeringer I believe my comments need to be addressed before merging this. Specifically |
@tseaver Any updates on this? |
I can see adding an explicit In addition, I had thought of having a |
- Convert 'Database.snapshot' and 'Session.snapshot' factories to take / forward '**kw'.
- When reading / executing SQL for a multi-use snapshot, pass the snapshot as the iterator's source.
- PartialResultSet - ResultSetMetadata - ResultSetStats.
- Source will only be set for multi-use snapshots.
- Valid only for multi-use snapshots. - Raises if the snapshot already has a transaction ID.
799ce25
to
230d715
Compare
To implement the single-use request guard, the snapshot needs to maintain a request counter, which we can use to detect the "interleaved" case. |
That sounds good. We can have an explicit |
Actually why do we need |
|
1fff028
to
5dc09f8
Compare
@vkedia I've just updated the system tests to pass ________________ TestSessionAPI.test_execute_sql_w_query_param _________________
Traceback (most recent call last):
File "/home/tseaver/projects/agendaless/Google/src/google-cloud-python/spanner/tests/system/test_system.py", line 960, in test_execute_sql_w_query_param
expected=[(19,), (99,)],
File "/home/tseaver/projects/agendaless/Google/src/google-cloud-python/spanner/tests/system/test_system.py", line 888, in _check_sql_results
sql, params=params, param_types=param_types))
File "/home/tseaver/projects/agendaless/Google/src/google-cloud-python/spanner/.nox/sys-3-6/lib/python3.6/site-packages/google/cloud/spanner/streamed.py", line 166, in __iter__
self.consume_next() # raises StopIteration
File "/home/tseaver/projects/agendaless/Google/src/google-cloud-python/spanner/.nox/sys-3-6/lib/python3.6/site-packages/google/cloud/spanner/streamed.py", line 132, in consume_next
response = six.next(self._response_iterator)
File "/home/tseaver/projects/agendaless/Google/src/google-cloud-python/spanner/.nox/sys-3-6/lib/python3.6/site-packages/grpc/_channel.py", line 363, in __next__
return self._next()
File "/home/tseaver/projects/agendaless/Google/src/google-cloud-python/spanner/.nox/sys-3-6/lib/python3.6/site-packages/grpc/_channel.py", line 357, in _next
raise self
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.INVALID_ARGUMENT, Transaction was started in a different session.)>
______________________ TestSessionAPI.test_read_w_ranges _______________________
Traceback (most recent call last):
File "/home/tseaver/projects/agendaless/Google/src/google-cloud-python/spanner/tests/system/test_system.py", line 846, in test_read_w_ranges
self.TABLE, self.COLUMNS, keyset))
File "/home/tseaver/projects/agendaless/Google/src/google-cloud-python/spanner/.nox/sys-3-6/lib/python3.6/site-packages/google/cloud/spanner/streamed.py", line 166, in __iter__
self.consume_next() # raises StopIteration
File "/home/tseaver/projects/agendaless/Google/src/google-cloud-python/spanner/.nox/sys-3-6/lib/python3.6/site-packages/google/cloud/spanner/streamed.py", line 132, in consume_next
response = six.next(self._response_iterator)
File "/home/tseaver/projects/agendaless/Google/src/google-cloud-python/spanner/.nox/sys-3-6/lib/python3.6/site-packages/grpc/_channel.py", line 363, in __next__
return self._next()
File "/home/tseaver/projects/agendaless/Google/src/google-cloud-python/spanner/.nox/sys-3-6/lib/python3.6/site-packages/grpc/_channel.py", line 357, in _next
raise self
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.INVALID_ARGUMENT, Transaction was started in a different session.)> I tried putting an explicit |
Looks like the back-end passes back an empty transaction ID for read requests after the first one. 0966806 keeps us from clearing it in that case. |
Thats right. Transaction field is only set if the read or query started a new transaction. |
The new system tests are passing. |
@tseaver What are you looking for here in terms of review? |
@dhermes the commits which are new since @lukesneeringer gave an LGTM on Thursday (the rest are just rebase to fix conflicts). a5219a5 is the hash of that rebased commit, so the diff would be: a5219a5...spanner-multi_use_snapshot |
LGTM |
@tseaver I just saw this. I think that @vkedia will want to review this more when he returns from vacation on Monday. I guess if he finds anything else, it can be addressed in a separate PR. Let's remain Alpha until next week. (FYI @lukesneeringer) |
@vkedia Do you plan to finish reviewing this post-merge? |
Multi-use snapshots trigger an "implicit" server-side transaction, and capture its ID on the first request. Subsequent requests return that ID, allowing for isolation from other changes. We default to
multi_use=False
because that mode is much more performant for the simple case.This feature is one which we originally decided to leave out, but the P0 system test list requires that it be implemented.
I think that a commitwise review might be easier than reviewing the whole enchilada.