From 2d4036596ff4fa836df33dc083215ac5ecbe4b65 Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Wed, 17 Dec 2014 15:51:02 -0800 Subject: [PATCH] Removing cursor and more results from query object. This is a follow up to a discussion in #423 and #425. Also - Adding fetch_page() method to return the needed properties no longer available on the Query object. - Made fetch_page() fail if more_results is `NOT_FINISHED`, which seems to be intended for batched requests. - Added start_cursor and end_cursor properties to Query. - Updated docstrings to reflect change - Updated tests to ensure the returned cursor was always set. - Streamlined fetch()/fetch_page() tests to run via a single tweakable helper. - Updated a datastore.connection test to construct and return an initialized QueryResultBatch to simulate a valid cursor. - Updated regression.clear_datastore and regression.datastore to reflect changed API surface. --- gcloud/datastore/connection.py | 9 +- gcloud/datastore/query.py | 127 +++++++++++++++-------- gcloud/datastore/test_connection.py | 7 +- gcloud/datastore/test_query.py | 154 ++++++++++++---------------- regression/clear_datastore.py | 16 +-- regression/datastore.py | 15 +-- 6 files changed, 169 insertions(+), 159 deletions(-) diff --git a/gcloud/datastore/connection.py b/gcloud/datastore/connection.py index 4edc005be6c2b..97735be781353 100644 --- a/gcloud/datastore/connection.py +++ b/gcloud/datastore/connection.py @@ -289,10 +289,13 @@ def run_query(self, dataset_id, query_pb, namespace=None): Using the `fetch`` method... - >>> query.fetch() + >>> entities, cursor, more_results = query.fetch_page() + >>> entities [] - >>> query.cursor() + >>> cursor + >>> more_results + Under the hood this is doing... @@ -318,7 +321,7 @@ def run_query(self, dataset_id, query_pb, namespace=None): datastore_pb.RunQueryResponse) return ( [e.entity for e in response.batch.entity_result], - response.batch.end_cursor, + response.batch.end_cursor, # Assume response always has cursor. response.batch.more_results, response.batch.skipped_results, ) diff --git a/gcloud/datastore/query.py b/gcloud/datastore/query.py index 6528087715efc..e32952204d23d 100644 --- a/gcloud/datastore/query.py +++ b/gcloud/datastore/query.py @@ -56,6 +56,8 @@ class Query(object): :param dataset: The namespace to which to restrict results. """ + _MORE_RESULTS = datastore_pb.QueryResultBatch.MORE_RESULTS_AFTER_LIMIT + _NO_MORE_RESULTS = datastore_pb.QueryResultBatch.NO_MORE_RESULTS OPERATORS = { '<=': datastore_pb.PropertyFilter.LESS_THAN_OR_EQUAL, '>=': datastore_pb.PropertyFilter.GREATER_THAN_OR_EQUAL, @@ -69,7 +71,6 @@ def __init__(self, kind=None, dataset=None, namespace=None): self._dataset = dataset self._namespace = namespace self._pb = datastore_pb.Query() - self._cursor = self._more_results = None self._offset = 0 if kind: @@ -84,8 +85,6 @@ def _clone(self): clone = self.__class__(dataset=self._dataset, namespace=self._namespace) clone._pb.CopyFrom(self._pb) - clone._cursor = self._cursor - clone._more_results = self._more_results return clone def namespace(self): @@ -239,8 +238,8 @@ def kind(self, *kinds): :type kinds: string :param kinds: The entity kinds for which to query. - :rtype: string or :class:`Query` - :returns: If no arguments, returns the kind. + :rtype: string, list of strings, or :class:`Query` + :returns: If no arguments, returns the kind or list of kinds. If a kind is provided, returns a clone of the :class:`Query` with those kinds set. """ @@ -250,7 +249,13 @@ def kind(self, *kinds): clone._pb.kind.add().name = kind return clone else: - return self._pb.kind + # In the proto definition for Query, `kind` is repeated. + kind_names = [kind_expr.name for kind_expr in self._pb.kind] + num_kinds = len(kind_names) + if num_kinds == 1: + return kind_names[0] + elif num_kinds > 1: + return kind_names def limit(self, limit=None): """Get or set the limit of the Query. @@ -302,8 +307,12 @@ def dataset(self, dataset=None): else: return self._dataset - def fetch(self, limit=None): - """Executes the Query and returns all matching entities. + def fetch_page(self, limit=None): + """Executes the Query and returns matching entities, and paging info. + + In addition to the fetched entities, it also returns a cursor to allow + paging through a results set and a boolean `more_results` indicating + if there are any more. This makes an API call to the Cloud Datastore, sends the Query as a protobuf, parses the responses to Entity protobufs, and @@ -315,10 +324,10 @@ def fetch(self, limit=None): >>> from gcloud import datastore >>> dataset = datastore.get_dataset('dataset-id') >>> query = dataset.query('Person').filter('name', '=', 'Sally') - >>> query.fetch() - [, , ...] - >>> query.fetch(1) - [] + >>> query.fetch_page() + [, , ...], 'cursorbase64', True + >>> query.fetch_page(1) + [], 'cursorbase64', True >>> query.limit() None @@ -328,8 +337,13 @@ def fetch(self, limit=None): but the limit will be applied to the query before it is executed. - :rtype: list of :class:`gcloud.datastore.entity.Entity`'s - :returns: The list of entities matching this query's criteria. + :rtype: tuple of mixed types + :returns: The first entry is a :class:`gcloud.datastore.entity.Entity` + list matching this query's criteria. The second is a base64 + encoded cursor for paging and the third is a boolean + indicating if there are more results. + :raises: `ValueError` if more_results is not one of the enums + MORE_RESULTS_AFTER_LIMIT or NO_MORE_RESULTS. """ clone = self @@ -350,46 +364,71 @@ def fetch(self, limit=None): # results. See # https://github.com/GoogleCloudPlatform/gcloud-python/issues/280 # for discussion. - entity_pbs, self._cursor, self._more_results = query_results[:3] + entity_pbs, cursor_as_bytes, more_results_enum = query_results[:3] - return [helpers.entity_from_protobuf(entity, dataset=self.dataset()) - for entity in entity_pbs] + entities = [helpers.entity_from_protobuf(entity, + dataset=self.dataset()) + for entity in entity_pbs] - def cursor(self): - """Returns cursor ID from most recent ``fetch()``. + cursor = base64.b64encode(cursor_as_bytes) - .. warning:: Invoking this method on a query that has not yet - been executed will raise a RuntimeError. + if more_results_enum == self._MORE_RESULTS: + more_results = True + elif more_results_enum == self._NO_MORE_RESULTS: + more_results = False + else: + # Note this covers the value NOT_FINISHED since this fetch does + # not occur within a batch, we don't expect to see NOT_FINISHED. + raise ValueError('Unexpected value returned for `more_results`.') - :rtype: string - :returns: base64-encoded cursor ID string denoting the last position - consumed in the query's result set. - """ - if not self._cursor: - raise RuntimeError('No cursor') - return base64.b64encode(self._cursor) + return entities, cursor, more_results - def more_results(self): - """Returns ``more_results`` flag from most recent ``fetch()``. + def fetch(self, limit=None): + """Executes the Query and returns matching entities - .. warning:: Invoking this method on a query that has not yet - been executed will raise a RuntimeError. + This calls `fetch_page()` but does not use the paging information. - .. note:: + For example:: + + >>> from gcloud import datastore + >>> dataset = datastore.get_dataset('dataset-id') + >>> query = dataset.query('Person').filter('name', '=', 'Sally') + >>> query.fetch() + [, , ...] + >>> query.fetch(1) + [] + >>> query.limit() + None - The `more_results` is not currently useful because it is - always returned by the back-end as ``MORE_RESULTS_AFTER_LIMIT`` - even if there are no more results. See - https://github.com/GoogleCloudPlatform/gcloud-python/issues/280 - for discussion. + :type limit: integer + :param limit: An optional limit to apply temporarily to this query. + That is, the Query itself won't be altered, + but the limit will be applied to the query + before it is executed. - :rtype: :class:`gcloud.datastore.datastore_v1_pb2. - QueryResultBatch.MoreResultsType` - :returns: enumerated value: are there more results available. + :rtype: list of :class:`gcloud.datastore.entity.Entity`'s + :returns: The list of entities matching this query's criteria. """ - if self._more_results is None: - raise RuntimeError('No results') - return self._more_results + entities, _, _ = self.fetch_page(limit=limit) + return entities + + @property + def start_cursor(self): + """Property to encode start cursor bytes as base64.""" + if not self._pb.HasField('start_cursor'): + return None + + start_as_bytes = self._pb.start_cursor + return base64.b64encode(start_as_bytes) + + @property + def end_cursor(self): + """Property to encode end cursor bytes as base64.""" + if not self._pb.HasField('end_cursor'): + return None + + end_as_bytes = self._pb.end_cursor + return base64.b64encode(end_as_bytes) def with_cursor(self, start_cursor, end_cursor=None): """Specifies the starting / ending positions in a query's result set. diff --git a/gcloud/datastore/test_connection.py b/gcloud/datastore/test_connection.py index ccd93f843dbd9..752caa6be8ccd 100644 --- a/gcloud/datastore/test_connection.py +++ b/gcloud/datastore/test_connection.py @@ -449,8 +449,13 @@ def test_run_query_wo_namespace_empty_result(self): DATASET_ID = 'DATASET' KIND = 'Nonesuch' + CURSOR = b'\x00' q_pb = Query(KIND, DATASET_ID).to_protobuf() rsp_pb = datastore_pb.RunQueryResponse() + rsp_pb.batch.end_cursor = CURSOR + no_more = datastore_pb.QueryResultBatch.NO_MORE_RESULTS + rsp_pb.batch.more_results = no_more + rsp_pb.batch.entity_result_type = datastore_pb.EntityResult.FULL conn = self._makeOne() URI = '/'.join([ conn.API_BASE_URL, @@ -463,7 +468,7 @@ def test_run_query_wo_namespace_empty_result(self): http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString()) pbs, end, more, skipped = conn.run_query(DATASET_ID, q_pb) self.assertEqual(pbs, []) - self.assertEqual(end, '') + self.assertEqual(end, CURSOR) self.assertTrue(more) self.assertEqual(skipped, 0) cw = http._called_with diff --git a/gcloud/datastore/test_query.py b/gcloud/datastore/test_query.py index 4b02b69253665..5faa218cc4d94 100644 --- a/gcloud/datastore/test_query.py +++ b/gcloud/datastore/test_query.py @@ -28,7 +28,7 @@ def _makeOne(self, kind=None, dataset=None, namespace=None): def test_ctor_defaults(self): query = self._getTargetClass()() self.assertEqual(query.dataset(), None) - self.assertEqual(list(query.kind()), []) + self.assertEqual(query.kind(), None) self.assertEqual(query.limit(), 0) self.assertEqual(query.namespace(), None) @@ -41,8 +41,7 @@ def test_ctor_explicit(self): dataset = Dataset(_DATASET) query = self._makeOne(_KIND, dataset, _NAMESPACE) self.assertTrue(query.dataset() is dataset) - kq_pb, = list(query.kind()) - self.assertEqual(kq_pb.name, _KIND) + self.assertEqual(query.kind(), _KIND) self.assertEqual(query.namespace(), _NAMESPACE) def test__clone(self): @@ -50,22 +49,15 @@ def test__clone(self): _DATASET = 'DATASET' _KIND = 'KIND' - _CURSOR = 'DEADBEEF' - _MORE_RESULTS = 2 _NAMESPACE = 'NAMESPACE' dataset = Dataset(_DATASET) query = self._makeOne(_KIND, dataset, _NAMESPACE) - query._cursor = _CURSOR - query._more_results = _MORE_RESULTS clone = query._clone() self.assertFalse(clone is query) self.assertTrue(isinstance(clone, self._getTargetClass())) self.assertTrue(clone.dataset() is dataset) self.assertEqual(clone.namespace(), _NAMESPACE) - kq_pb, = list(clone.kind()) - self.assertEqual(kq_pb.name, _KIND) - self.assertEqual(clone._cursor, _CURSOR) - self.assertEqual(clone._more_results, _MORE_RESULTS) + self.assertEqual(clone.kind(), _KIND) def test_to_protobuf_empty(self): query = self._makeOne() @@ -260,8 +252,7 @@ def test_kind_setter_wo_existing(self): self.assertFalse(after is query) self.assertTrue(isinstance(after, self._getTargetClass())) self.assertTrue(after.dataset() is dataset) - kq_pb, = list(after.kind()) - self.assertEqual(kq_pb.name, _KIND) + self.assertEqual(after.kind(), _KIND) def test_kind_setter_w_existing(self): from gcloud.datastore.dataset import Dataset @@ -274,9 +265,7 @@ def test_kind_setter_w_existing(self): self.assertFalse(after is query) self.assertTrue(isinstance(after, self._getTargetClass())) self.assertTrue(after.dataset() is dataset) - kq_pb1, kq_pb2 = list(after.kind()) - self.assertEqual(kq_pb1.name, _KIND_BEFORE) - self.assertEqual(kq_pb2.name, _KIND_AFTER) + self.assertEqual(after.kind(), [_KIND_BEFORE, _KIND_AFTER]) def test_limit_setter_wo_existing(self): from gcloud.datastore.dataset import Dataset @@ -290,8 +279,7 @@ def test_limit_setter_wo_existing(self): self.assertTrue(isinstance(after, self._getTargetClass())) self.assertTrue(after.dataset() is dataset) self.assertEqual(after.limit(), _LIMIT) - kq_pb, = list(after.kind()) - self.assertEqual(kq_pb.name, _KIND) + self.assertEqual(after.kind(), _KIND) def test_dataset_setter(self): from gcloud.datastore.dataset import Dataset @@ -303,39 +291,15 @@ def test_dataset_setter(self): self.assertFalse(after is query) self.assertTrue(isinstance(after, self._getTargetClass())) self.assertTrue(after.dataset() is dataset) - kq_pb, = list(query.kind()) - self.assertEqual(kq_pb.name, _KIND) + self.assertEqual(query.kind(), _KIND) - def test_fetch_default_limit(self): - from gcloud.datastore.datastore_v1_pb2 import Entity - _DATASET = 'DATASET' - _KIND = 'KIND' - _ID = 123 - entity_pb = Entity() - path_element = entity_pb.key.path_element.add() - path_element.kind = _KIND - path_element.id = _ID - prop = entity_pb.property.add() - prop.name = 'foo' - prop.value.string_value = u'Foo' - connection = _Connection(entity_pb) - dataset = _Dataset(_DATASET, connection) - query = self._makeOne(_KIND, dataset) - entities = query.fetch() - self.assertEqual(len(entities), 1) - self.assertEqual(entities[0].key().path(), - [{'kind': _KIND, 'id': _ID}]) - expected_called_with = { - 'dataset_id': _DATASET, - 'query_pb': query.to_protobuf(), - 'namespace': None, - } - self.assertEqual(connection._called_with, expected_called_with) - - def test_fetch_explicit_limit(self): + def _fetch_page_helper(self, cursor=b'\x00', limit=None, more_results=True, + _more_pb=None, use_fetch=False): import base64 from gcloud.datastore.datastore_v1_pb2 import Entity - _CURSOR = 'CURSOR' + _CURSOR_FOR_USER = (None if cursor is None + else base64.b64encode(cursor)) + _MORE_RESULTS = more_results _DATASET = 'DATASET' _KIND = 'KIND' _ID = 123 @@ -347,60 +311,58 @@ def test_fetch_explicit_limit(self): prop = entity_pb.property.add() prop.name = 'foo' prop.value.string_value = u'Foo' - connection = _Connection(entity_pb) - connection._cursor = _CURSOR + if _more_pb is None: + connection = _Connection(entity_pb) + else: + connection = _Connection(entity_pb, _more=_more_pb) + connection._cursor = cursor dataset = _Dataset(_DATASET, connection) + query = self._makeOne(_KIND, dataset, _NAMESPACE) - limited = query.limit(13) - entities = query.fetch(13) - self.assertEqual(query.cursor(), base64.b64encode(_CURSOR)) - self.assertEqual(query.more_results(), connection._more) + if use_fetch: + entities = query.fetch(limit) + else: + entities, cursor, more_results = query.fetch_page(limit) + self.assertEqual(cursor, _CURSOR_FOR_USER) + self.assertEqual(more_results, _MORE_RESULTS) + self.assertEqual(len(entities), 1) self.assertEqual(entities[0].key().path(), [{'kind': _KIND, 'id': _ID}]) + limited_query = query + if limit is not None: + limited_query = query.limit(limit) expected_called_with = { 'dataset_id': _DATASET, - 'query_pb': limited.to_protobuf(), + 'query_pb': limited_query.to_protobuf(), 'namespace': _NAMESPACE, } self.assertEqual(connection._called_with, expected_called_with) - def test_more_results_not_fetched(self): - _DATASET = 'DATASET' - _KIND = 'KIND' - connection = _Connection() - dataset = _Dataset(_DATASET, connection) - query = self._makeOne(_KIND, dataset) - self.assertRaises(RuntimeError, query.more_results) + def test_fetch_page_default_limit(self): + self._fetch_page_helper() - def test_more_results_fetched(self): - _MORE_RESULTS = 2 - _DATASET = 'DATASET' - _KIND = 'KIND' - connection = _Connection() - dataset = _Dataset(_DATASET, connection) - query = self._makeOne(_KIND, dataset) - query._more_results = _MORE_RESULTS - self.assertEqual(query.more_results(), _MORE_RESULTS) + def test_fetch_defaults(self): + self._fetch_page_helper(use_fetch=True) - def test_cursor_not_fetched(self): - _DATASET = 'DATASET' - _KIND = 'KIND' - connection = _Connection() - dataset = _Dataset(_DATASET, connection) - query = self._makeOne(_KIND, dataset) - self.assertRaises(RuntimeError, query.cursor) + def test_fetch_page_explicit_limit(self): + self._fetch_page_helper(cursor='CURSOR', limit=13) - def test_cursor_fetched(self): - import base64 - _CURSOR = 'CURSOR' - _DATASET = 'DATASET' - _KIND = 'KIND' - connection = _Connection() - dataset = _Dataset(_DATASET, connection) - query = self._makeOne(_KIND, dataset) - query._cursor = _CURSOR - self.assertEqual(query.cursor(), base64.b64encode(_CURSOR)) + def test_fetch_page_no_more_results(self): + from gcloud.datastore import datastore_v1_pb2 as datastore_pb + no_more = datastore_pb.QueryResultBatch.NO_MORE_RESULTS + self._fetch_page_helper(cursor='CURSOR', limit=13, more_results=False, + _more_pb=no_more) + + def test_fetch_page_more_results_ill_formed(self): + from gcloud.datastore import datastore_v1_pb2 as datastore_pb + not_finished = datastore_pb.QueryResultBatch.NOT_FINISHED + # Try a valid enum but not allowed. + self.assertRaises(ValueError, self._fetch_page_helper, + _more_pb=not_finished) + # Try an invalid enum but not allowed. + self.assertRaises(ValueError, self._fetch_page_helper, + _more_pb=object()) def test_with_cursor_neither(self): _DATASET = 'DATASET' @@ -423,7 +385,9 @@ def test_with_cursor_w_start(self): self.assertFalse(after is query) q_pb = after.to_protobuf() self.assertEqual(q_pb.start_cursor, _CURSOR) + self.assertEqual(after.start_cursor, _CURSOR_B64) self.assertEqual(q_pb.end_cursor, '') + self.assertEqual(after.end_cursor, None) def test_with_cursor_w_end(self): import base64 @@ -438,7 +402,9 @@ def test_with_cursor_w_end(self): self.assertFalse(after is query) q_pb = after.to_protobuf() self.assertEqual(q_pb.start_cursor, '') + self.assertEqual(after.start_cursor, None) self.assertEqual(q_pb.end_cursor, _CURSOR) + self.assertEqual(after.end_cursor, _CURSOR_B64) def test_with_cursor_w_both(self): import base64 @@ -455,7 +421,9 @@ def test_with_cursor_w_both(self): self.assertFalse(after is query) q_pb = after.to_protobuf() self.assertEqual(q_pb.start_cursor, _START) + self.assertEqual(after.start_cursor, _START_B64) self.assertEqual(q_pb.end_cursor, _END) + self.assertEqual(after.end_cursor, _END_B64) def test_order_empty(self): _KIND = 'KIND' @@ -598,13 +566,17 @@ def connection(self): class _Connection(object): + _called_with = None - _cursor = '' - _more = 2 + _cursor = b'\x00' _skipped = 0 - def __init__(self, *result): + def __init__(self, *result, **kwargs): + from gcloud.datastore import datastore_v1_pb2 as datastore_pb + self._result = list(result) + more_default = datastore_pb.QueryResultBatch.MORE_RESULTS_AFTER_LIMIT + self._more = kwargs.get('_more', more_default) def run_query(self, **kw): self._called_with = kw diff --git a/regression/clear_datastore.py b/regression/clear_datastore.py index b7a1ee84ad742..ba3efedd55da2 100644 --- a/regression/clear_datastore.py +++ b/regression/clear_datastore.py @@ -32,16 +32,15 @@ TRANSACTION_MAX_GROUPS = 5 -def fetch_keys(dataset, kind, fetch_max=FETCH_MAX, query=None): +def fetch_keys(dataset, kind, fetch_max=FETCH_MAX, query=None, cursor=None): if query is None: query = dataset.query(kind=kind).limit( fetch_max).projection(['__key__']) - # Make new query with start cursor if a previously set cursor - # exists. - if query._cursor is not None: - query = query.with_cursor(query.cursor()) + # Make new query with start cursor. Will be ignored if None. + query = query.with_cursor(cursor) - return query, query.fetch() + entities, cursor, _ = query.fetch_page() + return query, entities, cursor def get_ancestors(entities): @@ -67,10 +66,11 @@ def remove_kind(dataset, kind): with dataset.transaction(): results = [] - query, curr_results = fetch_keys(dataset, kind) + query, curr_results, cursor = fetch_keys(dataset, kind) results.extend(curr_results) while curr_results: - query, curr_results = fetch_keys(dataset, kind, query=query) + query, curr_results, cursor = fetch_keys( + dataset, kind, query=query, cursor=cursor) results.extend(curr_results) if not results: diff --git a/regression/datastore.py b/regression/datastore.py index 811e5ac6b24ee..c72ce550cfafc 100644 --- a/regression/datastore.py +++ b/regression/datastore.py @@ -181,15 +181,12 @@ def _base_query(self): def test_limit_queries(self): limit = 5 query = self._base_query().limit(limit) - # Verify there is not cursor before fetch(). - self.assertRaises(RuntimeError, query.cursor) # Fetch characters. - character_entities = query.fetch() + character_entities, cursor, _ = query.fetch_page() self.assertEqual(len(character_entities), limit) # Check cursor after fetch. - cursor = query.cursor() self.assertTrue(cursor is not None) # Fetch next batch of characters. @@ -286,18 +283,15 @@ def test_query_paginate_with_offset(self): offset = 2 limit = 3 page_query = query.offset(offset).limit(limit).order('appearances') - # Make sure no query set before fetch. - self.assertRaises(RuntimeError, page_query.cursor) # Fetch characters. - entities = page_query.fetch() + entities, cursor, _ = page_query.fetch_page() self.assertEqual(len(entities), limit) self.assertEqual(entities[0]['name'], 'Robb') self.assertEqual(entities[1]['name'], 'Bran') self.assertEqual(entities[2]['name'], 'Catelyn') # Use cursor to begin next query. - cursor = page_query.cursor() next_query = page_query.with_cursor(cursor).offset(0) self.assertEqual(next_query.limit(), limit) # Fetch next set of characters. @@ -312,15 +306,12 @@ def test_query_paginate_with_start_cursor(self): offset = 2 limit = 2 page_query = query.offset(offset).limit(limit).order('appearances') - # Make sure no query set before fetch. - self.assertRaises(RuntimeError, page_query.cursor) # Fetch characters. - entities = page_query.fetch() + entities, cursor, _ = page_query.fetch_page() self.assertEqual(len(entities), limit) # Use cursor to create a fresh query. - cursor = page_query.cursor() fresh_query = self._base_query() fresh_query = fresh_query.order('appearances').with_cursor(cursor)