Skip to content

Commit

Permalink
Introduce (Async)ManagedTransaction (#658)
Browse files Browse the repository at this point in the history
Transaction functions (a.k.a. managed transactions):
The first argument of transaction functions is now a `(Async)ManagedTransaction`
object. It behaves exactly like a regular `(Async)Transaction` object, except it
does not offer the `commit`, `rollback`, `close`, and `closed` methods.
Those methods would have caused a hard to interpreted error previously. Hence,
they have been removed.
  • Loading branch information
robsdedude authored Feb 11, 2022
1 parent 06f0767 commit e74c6f1
Show file tree
Hide file tree
Showing 52 changed files with 898 additions and 709 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@
They are now ignored and will be removed in a future release.
- The undocumented return value has been removed. If you need information
about the remote server, use `driver.get_server_info()` instead.
- Transaction functions (a.k.a. managed transactions):
The first argument of transaction functions is now a `ManagedTransaction`
object. It behaves exactly like a regular `Transaction` object, except it
does not offer the `commit`, `rollback`, `close`, and `closed` methods.
Those methods would have caused a hard to interpreted error previously. Hence,
they have been removed.


## Version 4.4
Expand Down
1 change: 1 addition & 0 deletions bin/make-unasync
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ def apply_unasync(files):
"_async": "_sync",
"mark_async_test": "mark_sync_test",
"assert_awaited_once": "assert_called_once",
"assert_awaited_once_with": "assert_called_once_with",
}
additional_testkit_backend_replacements = {}
rules = [
Expand Down
17 changes: 12 additions & 5 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -422,16 +422,18 @@ Will result in:
***********************
Sessions & Transactions
***********************
All database activity is co-ordinated through two mechanisms: the :class:`neo4j.Session` and the :class:`neo4j.Transaction`.
All database activity is co-ordinated through two mechanisms:
**sessions** (:class:`neo4j.AsyncSession`) and **transactions**
(:class:`neo4j.Transaction`, :class:`neo4j.ManagedTransaction`).

A :class:`neo4j.Session` is a logical container for any number of causally-related transactional units of work.
A **session** is a logical container for any number of causally-related transactional units of work.
Sessions automatically provide guarantees of causal consistency within a clustered environment but multiple sessions can also be causally chained if required.
Sessions provide the top level of containment for database activity.
Session creation is a lightweight operation and *sessions are not thread safe*.

Connections are drawn from the :class:`neo4j.Driver` connection pool as required.

A :class:`neo4j.Transaction` is a unit of work that is either committed in its entirety or is rolled back on failure.
A **transaction** is a unit of work that is either committed in its entirety or is rolled back on failure.


.. _session-construction-ref:
Expand Down Expand Up @@ -724,7 +726,6 @@ Example:
node_id = create_person_node(tx)
set_person_name(tx, node_id, name)
tx.commit()
tx.close()
def create_person_node(tx):
query = "CREATE (a:Person { name: $name }) RETURN id(a) AS node_id"
Expand Down Expand Up @@ -753,6 +754,12 @@ This function is called one or more times, within a configurable time limit, unt
Results should be fully consumed within the function and only aggregate or status values should be returned.
Returning a live result object would prevent the driver from correctly managing connections and would break retry guarantees.

This function will receive a :class:`neo4j.ManagedTransaction` object as its first parameter.

.. autoclass:: neo4j.ManagedTransaction

.. automethod:: run

Example:

.. code-block:: python
Expand Down Expand Up @@ -811,7 +818,7 @@ A :class:`neo4j.Result` is attached to an active connection, through a :class:`n

.. automethod:: closed

See https://neo4j.com/docs/driver-manual/current/cypher-workflow/#driver-type-mapping for more about type mapping.
See https://neo4j.com/docs/python-manual/current/cypher-workflow/#python-driver-type-mapping for more about type mapping.


Graph
Expand Down
19 changes: 13 additions & 6 deletions docs/source/async_api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -235,16 +235,18 @@ Will result in:
*********************************
AsyncSessions & AsyncTransactions
*********************************
All database activity is co-ordinated through two mechanisms: the :class:`neo4j.AsyncSession` and the :class:`neo4j.AsyncTransaction`.
All database activity is co-ordinated through two mechanisms:
**sessions** (:class:`neo4j.AsyncSession`) and **transactions**
(:class:`neo4j.AsyncTransaction`, :class:`neo4j.AsyncManagedTransaction`).

A :class:`neo4j.AsyncSession` is a logical container for any number of causally-related transactional units of work.
A **session** is a logical container for any number of causally-related transactional units of work.
Sessions automatically provide guarantees of causal consistency within a clustered environment but multiple sessions can also be causally chained if required.
Sessions provide the top level of containment for database activity.
Session creation is a lightweight operation and *sessions cannot be shared between coroutines*.
Session creation is a lightweight operation and *sessions are not thread safe*.

Connections are drawn from the :class:`neo4j.AsyncDriver` connection pool as required.

A :class:`neo4j.AsyncTransaction` is a unit of work that is either committed in its entirety or is rolled back on failure.
A **transaction** is a unit of work that is either committed in its entirety or is rolled back on failure.


.. _async-session-construction-ref:
Expand Down Expand Up @@ -417,7 +419,6 @@ Example:
node_id = await create_person_node(tx)
await set_person_name(tx, node_id, name)
await tx.commit()
await tx.close()
async def create_person_node(tx):
query = "CREATE (a:Person { name: $name }) RETURN id(a) AS node_id"
Expand Down Expand Up @@ -447,6 +448,12 @@ This function is called one or more times, within a configurable time limit, unt
Results should be fully consumed within the function and only aggregate or status values should be returned.
Returning a live result object would prevent the driver from correctly managing connections and would break retry guarantees.

This function will receive a :class:`neo4j.AsyncManagedTransaction` object as its first parameter.

.. autoclass:: neo4j.AsyncManagedTransaction

.. automethod:: run

Example:

.. code-block:: python
Expand Down Expand Up @@ -505,4 +512,4 @@ A :class:`neo4j.AsyncResult` is attached to an active connection, through a :cla

.. automethod:: closed

See https://neo4j.com/docs/driver-manual/current/cypher-workflow/#driver-type-mapping for more about type mapping.
See https://neo4j.com/docs/python-manual/current/cypher-workflow/#python-driver-type-mapping for more about type mapping.
4 changes: 4 additions & 0 deletions neo4j/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"AsyncDriver",
"AsyncGraphDatabase",
"AsyncNeo4jDriver",
"AsyncManagedTransaction",
"AsyncResult",
"AsyncSession",
"AsyncTransaction",
Expand All @@ -42,6 +43,7 @@
"IPv4Address",
"IPv6Address",
"kerberos_auth",
"ManagedTransaction",
"Neo4jDriver",
"PoolConfig",
"Query",
Expand Down Expand Up @@ -72,6 +74,7 @@
AsyncNeo4jDriver,
)
from ._async.work import (
AsyncManagedTransaction,
AsyncResult,
AsyncSession,
AsyncTransaction,
Expand All @@ -83,6 +86,7 @@
Neo4jDriver,
)
from ._sync.work import (
ManagedTransaction,
Result,
Session,
Transaction,
Expand Down
6 changes: 5 additions & 1 deletion neo4j/_async/work/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@
from .session import (
AsyncResult,
AsyncSession,
AsyncTransaction,
AsyncWorkspace,
)
from .transaction import (
AsyncManagedTransaction,
AsyncTransaction,
)


__all__ = [
"AsyncResult",
"AsyncSession",
"AsyncTransaction",
"AsyncManagedTransaction",
"AsyncWorkspace",
]
23 changes: 14 additions & 9 deletions neo4j/_async/work/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
# limitations under the License.


import asyncio
from logging import getLogger
from random import random
from time import perf_counter
Expand Down Expand Up @@ -44,7 +43,10 @@
)
from ...work import Query
from .result import AsyncResult
from .transaction import AsyncTransaction
from .transaction import (
AsyncManagedTransaction,
AsyncTransaction,
)
from .workspace import AsyncWorkspace


Expand Down Expand Up @@ -157,8 +159,9 @@ async def close(self):
self._state_failed = True

if self._transaction:
if self._transaction.closed() is False:
await self._transaction.rollback() # roll back the transaction if it is not closed
if self._transaction._closed() is False:
# roll back the transaction if it is not closed
await self._transaction._rollback()
self._transaction = None

try:
Expand Down Expand Up @@ -306,7 +309,7 @@ async def last_bookmarks(self):
if self._auto_result:
await self._auto_result.consume()

if self._transaction and self._transaction._closed:
if self._transaction and self._transaction._closed():
self._collect_bookmark(self._transaction._bookmark)
self._transaction = None

Expand All @@ -323,10 +326,10 @@ async def _transaction_error_handler(self, _):
self._transaction = None
await self._disconnect()

async def _open_transaction(self, *, access_mode, metadata=None,
async def _open_transaction(self, *, tx_cls, access_mode, metadata=None,
timeout=None):
await self._connect(access_mode=access_mode)
self._transaction = AsyncTransaction(
self._transaction = tx_cls(
self._connection, self._config.fetch_size,
self._transaction_closed_handler,
self._transaction_error_handler
Expand Down Expand Up @@ -372,6 +375,7 @@ async def begin_transaction(self, metadata=None, timeout=None):
raise TransactionError("Explicit transaction already open")

await self._open_transaction(
tx_cls=AsyncTransaction,
access_mode=self._config.default_access_mode, metadata=metadata,
timeout=timeout
)
Expand All @@ -396,17 +400,18 @@ async def _run_transaction(
while True:
try:
await self._open_transaction(
tx_cls=AsyncManagedTransaction,
access_mode=access_mode, metadata=metadata,
timeout=timeout
)
tx = self._transaction
try:
result = await transaction_function(tx, *args, **kwargs)
except Exception:
await tx.close()
await tx._close()
raise
else:
await tx.commit()
await tx._commit()
except IncompleteCommit:
raise
except (ServiceUnavailable, SessionExpired) as error:
Expand Down
Loading

0 comments on commit e74c6f1

Please sign in to comment.