Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #272: add 'allocate_ids' RPC to connection #299

Merged
merged 4 commits into from
Oct 28, 2014
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 112 additions & 86 deletions gcloud/datastore/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,48 +153,72 @@ def dataset(self, *args, **kwargs):
kwargs['connection'] = self
return Dataset(*args, **kwargs)

def begin_transaction(self, dataset_id, serializable=False):
"""Begin a transaction.
#

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

# Protobuf RPCs for DatastoreService
#
def lookup(self, dataset_id, key_pbs):
"""Lookup keys from a dataset in the Cloud Datastore.

This comment was marked as spam.


:type dataset_id: string
:param dataset_id: The dataset over which to execute the transaction.
"""
This method deals only with protobufs
(:class:`gcloud.datastore.datastore_v1_pb2.Key`
and
:class:`gcloud.datastore.datastore_v1_pb2.Entity`)
and is used under the hood for methods like
:func:`gcloud.datastore.dataset.Dataset.get_entity`:

if self.transaction():
raise ValueError('Cannot start a transaction with another already '
'in progress.')
>>> from gcloud import datastore
>>> from gcloud.datastore.key import Key
>>> connection = datastore.get_connection(email, key_path)
>>> dataset = connection.dataset('dataset-id')
>>> key = Key(dataset=dataset).kind('MyKind').id(1234)

request = datastore_pb.BeginTransactionRequest()
Using the :class:`gcloud.datastore.dataset.Dataset` helper:

if serializable:
request.isolation_level = (
datastore_pb.BeginTransactionRequest.SERIALIZABLE)
else:
request.isolation_level = (
datastore_pb.BeginTransactionRequest.SNAPSHOT)
>>> dataset.get_entity(key)
<Entity object>

response = self._rpc(dataset_id, 'beginTransaction', request,
datastore_pb.BeginTransactionResponse)
Using the ``connection`` class directly:

return response.transaction
>>> connection.lookup('dataset-id', key.to_protobuf())
<Entity protobuf>

def rollback_transaction(self, dataset_id):
"""Rollback the connection's existing transaction.
:type dataset_id: string
:param dataset_id: The dataset to look up the keys.

Raises a ``ValueError``
if the connection isn't currently in a transaction.
:type key_pbs: list of :class:`gcloud.datastore.datastore_v1_pb2.Key`
(or a single Key)

This comment was marked as spam.

:param key_pbs: The key (or keys) to retrieve from the datastore.

:type dataset_id: string
:param dataset_id: The dataset to which the transaction belongs.
:rtype: list of :class:`gcloud.datastore.datastore_v1_pb2.Entity`
(or a single Entity)
:returns: The entities corresponding to the keys provided.
If a single key was provided and no results matched,
this will return None.
If multiple keys were provided and no results matched,
this will return an empty list.
"""
if not self.transaction() or not self.transaction().id():
raise ValueError('No transaction to rollback.')
lookup_request = datastore_pb.LookupRequest()

request = datastore_pb.RollbackRequest()
request.transaction = self.transaction().id()
# Nothing to do with this response, so just execute the method.
self._rpc(dataset_id, 'rollback', request,
datastore_pb.RollbackResponse)
single_key = isinstance(key_pbs, datastore_pb.Key)

if single_key:
key_pbs = [key_pbs]

for key_pb in key_pbs:
lookup_request.key.add().CopyFrom(key_pb)

lookup_response = self._rpc(dataset_id, 'lookup', lookup_request,
datastore_pb.LookupResponse)

results = [result.entity for result in lookup_response.found]

if single_key:
if results:
return results[0]
else:
return None

return results

def run_query(self, dataset_id, query_pb, namespace=None):
"""Run a query on the Cloud Datastore.
Expand Down Expand Up @@ -250,69 +274,30 @@ def run_query(self, dataset_id, query_pb, namespace=None):
response.batch.skipped_results,
)

def lookup(self, dataset_id, key_pbs):
"""Lookup keys from a dataset in the Cloud Datastore.

This method deals only with protobufs
(:class:`gcloud.datastore.datastore_v1_pb2.Key`
and
:class:`gcloud.datastore.datastore_v1_pb2.Entity`)
and is used under the hood for methods like
:func:`gcloud.datastore.dataset.Dataset.get_entity`:

>>> from gcloud import datastore
>>> from gcloud.datastore.key import Key
>>> connection = datastore.get_connection(email, key_path)
>>> dataset = connection.dataset('dataset-id')
>>> key = Key(dataset=dataset).kind('MyKind').id(1234)

Using the :class:`gcloud.datastore.dataset.Dataset` helper:

>>> dataset.get_entity(key)
<Entity object>

Using the ``connection`` class directly:

>>> connection.lookup('dataset-id', key.to_protobuf())
<Entity protobuf>
def begin_transaction(self, dataset_id, serializable=False):
"""Begin a transaction.

:type dataset_id: string
:param dataset_id: The dataset to look up the keys.

:type key_pbs: list of :class:`gcloud.datastore.datastore_v1_pb2.Key`
(or a single Key)
:param key_pbs: The key (or keys) to retrieve from the datastore.

:rtype: list of :class:`gcloud.datastore.datastore_v1_pb2.Entity`
(or a single Entity)
:returns: The entities corresponding to the keys provided.
If a single key was provided and no results matched,
this will return None.
If multiple keys were provided and no results matched,
this will return an empty list.
:param dataset_id: The dataset over which to execute the transaction.
"""
lookup_request = datastore_pb.LookupRequest()

single_key = isinstance(key_pbs, datastore_pb.Key)

if single_key:
key_pbs = [key_pbs]

for key_pb in key_pbs:
lookup_request.key.add().CopyFrom(key_pb)
if self.transaction():
raise ValueError('Cannot start a transaction with another already '
'in progress.')

lookup_response = self._rpc(dataset_id, 'lookup', lookup_request,
datastore_pb.LookupResponse)
request = datastore_pb.BeginTransactionRequest()

results = [result.entity for result in lookup_response.found]
if serializable:
request.isolation_level = (
datastore_pb.BeginTransactionRequest.SERIALIZABLE)
else:
request.isolation_level = (
datastore_pb.BeginTransactionRequest.SNAPSHOT)

if single_key:
if results:
return results[0]
else:
return None
response = self._rpc(dataset_id, 'beginTransaction', request,
datastore_pb.BeginTransactionResponse)

return results
return response.transaction

def commit(self, dataset_id, mutation_pb):
"""Commit dataset mutations in context of current transation (if any).
Expand All @@ -339,6 +324,47 @@ def commit(self, dataset_id, mutation_pb):
datastore_pb.CommitResponse)
return response.mutation_result

def rollback(self, dataset_id):
"""Rollback the connection's existing transaction.

Raises a ``ValueError``
if the connection isn't currently in a transaction.

:type dataset_id: string
:param dataset_id: The dataset to which the transaction belongs.
"""
if not self.transaction() or not self.transaction().id():
raise ValueError('No transaction to rollback.')

request = datastore_pb.RollbackRequest()
request.transaction = self.transaction().id()
# Nothing to do with this response, so just execute the method.
self._rpc(dataset_id, 'rollback', request,
datastore_pb.RollbackResponse)

def allocate_ids(self, dataset_id, key_pbs):
"""Obtain backend-generated IDs for a set of keys.

:type dataset_id: string
:param dataset_id: The dataset to which the transaction belongs.

:type key_pbs: list of :class:`gcloud.datastore.datastore_v1_pb2.Key`
:param key_pbs: The keys for which the backend should allocate IDs.

:rtype: list of :class:`gcloud.datastore.datastore_v1_pb2.Key`
:returns: An equal number of keys, with IDs filled in by the backend.
"""
request = datastore_pb.AllocateIdsRequest()
for key_pb in key_pbs:
request.key.add().CopyFrom(key_pb)
# Nothing to do with this response, so just execute the method.
response = self._rpc(dataset_id, 'allocateIds', request,
datastore_pb.AllocateIdsResponse)
return list(response.key)

#
# Entity-related helper methods.
#
def save_entity(self, dataset_id, key_pb, properties):
"""Save an entity to the Cloud Datastore with the provided properties.

Expand Down
Loading