Skip to content

Commit

Permalink
Tracking status of Transaction as well as success / failure.
Browse files Browse the repository at this point in the history
Fixes #496.

NOTE: Some of these changes may belong on Batch, but the concept
of "tombstone"-ing is unique to a Transaction (i.e. once started,
can only be committed once and the transaction ID can never be
used again).
  • Loading branch information
dhermes committed Feb 6, 2015
1 parent 204389a commit 52f3fa8
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 11 deletions.
58 changes: 58 additions & 0 deletions gcloud/datastore/test_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ def test_ctor(self):
self.assertEqual(xact.dataset_id, _DATASET)
self.assertEqual(xact.connection, connection)
self.assertEqual(xact.id, None)
self.assertEqual(xact._status, None)
self.assertTrue(xact._commit_success is False)
self.assertTrue(isinstance(xact.mutation, Mutation))
self.assertEqual(len(xact._auto_id_entities), 0)

Expand All @@ -64,6 +66,8 @@ def test_ctor_with_env(self):
self.assertEqual(xact.id, None)
self.assertEqual(xact.dataset_id, DATASET_ID)
self.assertEqual(xact.connection, CONNECTION)
self.assertEqual(xact._status, None)
self.assertTrue(xact._commit_success is False)

def test_current(self):
from gcloud.datastore.test_api import _NoCommitBatch
Expand All @@ -90,6 +94,47 @@ def test_current(self):
self.assertTrue(xact1.current() is None)
self.assertTrue(xact2.current() is None)

def test_succeeded_fresh_transaction(self):
_DATASET = 'DATASET'
connection = _Connection()
xact = self._makeOne(dataset_id=_DATASET, connection=connection)
self.assertEqual(xact._status, None)

success = marker = object()
with self.assertRaises(ValueError):
success = xact.succeeded
self.assertTrue(success is marker)

def test_succeeded_in_progress(self):
_DATASET = 'DATASET'
connection = _Connection()
xact = self._makeOne(dataset_id=_DATASET, connection=connection)
xact.begin()
self.assertEqual(xact._status, self._getTargetClass()._IN_PROGRESS)

success = marker = object()
with self.assertRaises(ValueError):
success = xact.succeeded
self.assertTrue(success is marker)

def test_succeeded_on_success(self):
_DATASET = 'DATASET'
connection = _Connection()
xact = self._makeOne(dataset_id=_DATASET, connection=connection)
xact.begin()
xact.commit()
self.assertEqual(xact._status, self._getTargetClass()._FINISHED)
self.assertTrue(xact.succeeded is True)

def test_succeeded_on_failure(self):
_DATASET = 'DATASET'
connection = _Connection()
xact = self._makeOne(dataset_id=_DATASET, connection=connection)
xact.begin()
xact.rollback()
self.assertEqual(xact._status, self._getTargetClass()._FINISHED)
self.assertTrue(xact.succeeded is False)

def test_begin(self):
_DATASET = 'DATASET'
connection = _Connection(234)
Expand All @@ -98,6 +143,19 @@ def test_begin(self):
self.assertEqual(xact.id, 234)
self.assertEqual(connection._begun, _DATASET)

def test_begin_tombstoned(self):
_DATASET = 'DATASET'
connection = _Connection(234)
xact = self._makeOne(dataset_id=_DATASET, connection=connection)
xact.begin()
self.assertEqual(xact.id, 234)
self.assertEqual(connection._begun, _DATASET)

xact.rollback()
self.assertEqual(xact.id, None)

self.assertRaises(ValueError, xact.begin)

def test_rollback(self):
_DATASET = 'DATASET'
connection = _Connection(234)
Expand Down
73 changes: 62 additions & 11 deletions gcloud/datastore/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ class Transaction(Batch):
>>> datastore.set_defaults()
>>> with Transaction() as xact:
... datastore.put(entity1)
... datastore.put(entity2)
>>> with Transaction():
... datastore.put([entity1, entity2])
Because it derives from :class:`Batch`, :class`Transaction` also provides
:meth:`put` and :meth:`delete` methods::
Expand All @@ -46,7 +45,7 @@ class Transaction(Batch):
By default, the transaction is rolled back if the transaction block
exits with an error::
>>> with Transaction() as txn:
>>> with Transaction():
... do_some_work()
... raise SomeException() # rolls back
Expand All @@ -71,16 +70,34 @@ class Transaction(Batch):
... entity = Entity(key=Key('Thing'))
... datastore.put([entity])
... assert entity.key.is_partial # There is no ID on this key.
...
>>> assert not entity.key.is_partial # There *is* an ID.
After completion, you can determine if a commit succeeded or failed.
For example, trying to delete a key that doesn't exist::
>>> with Transaction() as xact:
... xact.delete(key)
...
>>> xact.succeeded
False
or successfully storing two entities:
>>> with Transaction() as xact:
... datastore.put([entity1, entity2])
...
>>> xact.succeeded
True
If you don't want to use the context manager you can initialize a
transaction manually::
>>> transaction = Transaction()
>>> transaction.begin()
>>> entity = Entity(key=Key('Thing'))
>>> transaction.put([entity])
>>> transaction.put(entity)
>>> if error:
... transaction.rollback()
Expand All @@ -97,9 +114,17 @@ class Transaction(Batch):
are not set.
"""

_IN_PROGRESS = 1
"""Enum value for _IN_PROGRESS status of transaction."""

_FINISHED = 2
"""Enum value for _FINISHED status of transaction."""

def __init__(self, dataset_id=None, connection=None):
super(Transaction, self).__init__(dataset_id, connection)
self._id = None
self._status = None
self._commit_success = False

@property
def id(self):
Expand All @@ -123,13 +148,32 @@ def current():
if isinstance(top, Transaction):
return top

@property
def succeeded(self):
"""Determines if transaction has succeeded or failed.
:rtype: boolean
:returns: Boolean indicating successful commit.
:raises: :class:`ValueError` if the transaction is still in progress.
"""
if self._status != self._FINISHED:
raise ValueError('Transaction not yet finished. '
'Success not known.')

return self._commit_success

def begin(self):
"""Begins a transaction.
This method is called automatically when entering a with
statement, however it can be called explicitly if you don't want
to use a context manager.
:raises: :class:`ValueError` if the transaction has already begun.
"""
if self._status is not None:
raise ValueError('Transaction already started previously.')
self._status = self._IN_PROGRESS
self._id = self.connection.begin_transaction(self._dataset_id)

def rollback(self):
Expand All @@ -140,8 +184,12 @@ def rollback(self):
- Sets the current connection's transaction reference to None.
- Sets the current transaction's ID to None.
"""
self.connection.rollback(self._dataset_id, self._id)
self._id = None
try:
self.connection.rollback(self._dataset_id, self._id)
finally:
self._status = self._FINISHED
# Clear our own ID in case this gets accidentally reused.
self._id = None

def commit(self):
"""Commits the transaction.
Expand All @@ -154,7 +202,10 @@ def commit(self):
- Sets the current transaction's ID to None.
"""
super(Transaction, self).commit()

# Clear our own ID in case this gets accidentally reused.
self._id = None
try:
super(Transaction, self).commit()
finally:
self._commit_success = True
self._status = self._FINISHED
# Clear our own ID in case this gets accidentally reused.
self._id = None

0 comments on commit 52f3fa8

Please sign in to comment.