Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce driver.execute_query #833

Merged
merged 17 commits into from
Jan 26, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 166 additions & 1 deletion docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,152 @@ Closing a driver will immediately shut down all connections in the pool.
query, use :meth:`neo4j.Driver.verify_connectivity`.

.. autoclass:: neo4j.Driver()
:members: session, encrypted, close, verify_connectivity, get_server_info
:members: session, query_bookmark_manager, encrypted, close,
verify_connectivity, get_server_info

.. method:: execute_query(query, parameters=None,routing=neo4j.RoutingControl.WRITERS, database=None, impersonated_user=None, bookmark_manager=self.query_bookmark_manager, result_transformer=Result.to_eager_result, **kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a :raises: field? Or does it raise too many different exceptions?

Copy link
Member Author

@robsdedude robsdedude Oct 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are plenty of errors this can raise. Here are the one's I can think of on the spot

  • Neo4jError (or any sub-class) - anything the server complains about
  • SessionExpired, ServiceUnavailable (or any sub-class) - lost/couldn't establish connectivity (after the retries this API includes)
  • ConfigurationError TypeError ValueError (maybe even IndexError?) - invalid configuration/parameters or trying to use a feature that's too new for the server you're connected to
  • BoltError (or any sub-class; internal error class you shouldn't ever encounter) - the server violated the protocol

There might be more that I can't think of right now. My conclusion:

  • All of there errors are fatal (there is pretty much no benefit in giving them different treatment). The user might as well just catch any exception, maybe log it or whatever they feel doing, and then give up (that could mean crash, surface some error to the end-user, whatever is suitable). Plus maybe fixing their code afterwards.
  • I can hardly give any guarantees what errors this method can raise as it pretty much exercises the whole stack of the driver. It's highly likely that I'll forget some corner of the driver and, even if not, that this piece of documentation will unsync sooner or later as it's so far up the abstraction stack.

So much for why I didn't include it. If you think it'd be helpful nonetheless, we can make it happen.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The user might as well just catch any exception

👍


Execute a query inside a retired transaction and return all results.
robsdedude marked this conversation as resolved.
Show resolved Hide resolved

This method is a handy wrapper for lower-level driver APIs like
sessions, transactions, and transaction functions. It is intended
for simple use cases where there is no need for managing all possible
options.

The method is roughly equivalent to::

def execute_query(
query, parameters, routing, database, impersonated_user,
bookmark_manager, result_transformer,
):
def work(tx):
result = tx.run(query, parameters)
return some_transformer(result)

with driver.session(
database=database,
impersonated_user=impersonated_user,
bookmark_manager=bookmark_manager,
) as session:
if routing == RoutingControl.WRITERS:
return session.execute_write(work)
elif routing == RoutingControl.READERS:
return session.execute_read(work)

Usage example::

from typing import List

import neo4j

def example(driver: neo4j.Driver) -> List[str]:
"""Get the name of all 42 year-olds."""
records, summary, keys = driver.execute_query(
"MATCH (p:Person {age: $age}) RETURN p.name",
{"age": 42},
routing=neo4j.RoutingControl.READERS, # or just "r"
database="neo4j",
)
assert keys == ["p.name"] # not needed, just for illustration
log.debug("some meta data: %s", summary)
return [str(record["p.name"]) for record in records]
# or: return [str(record[0]) for record in records]
# or even: return list(map(lambda r: str(r[0]), records))

Another example::

import neo4j

def example(driver: neo4j.Driver) -> int:
"""Call all young people "My dear" and get their count."""
record = driver.execute_query(
"MATCH (p:Person) WHERE n.age <= 15 "
"SET p.nickname = 'My dear' "
"RETURN count(*)",
routing=neo4j.RoutingControl.WRITERS, # or just "w"
database="neo4j",
result_transformer=neo4j.Result.single,
)
count = record[0]
assert isinstance(count, int)
return count

:param query: cypher query to execute
:type query: typing.Optional[str]
:param parameters: parameters to use in the query
:type parameters: typing.Optional[typing.Dict[str, typing.Any]]
:param routing:
whether to route the query to a reader (follower/read replica) or
a writer (leader) in the cluster. Default is to route to a writer.
:type routing: neo4j.RoutingControl
:param database:
database to execute the query against.

None (default) uses the database configured on the server side.

.. Note::
It is recommended to always specify the database explicitly
when possible. This allows the driver to work more efficiently,
as it will not have to resolve the default database first.

See also the Session config :ref:`database-ref`.
:type database: typing.Optional[str]
:param impersonated_user:
Name of the user to impersonate.

This means that all query will be executed in the security context
of the impersonated user. For this, the user for which the
:class:`Driver` has been created needs to have the appropriate
permissions.

See also the Session config
:type impersonated_user: typing.Optional[str]
:param result_transformer:
A function that gets passed the :class:`neo4j.Result` object
resulting from the query and converts it to a different type. The
result of the transformer function is returned by this method.

.. warning::

The transformer function must **not** return the
:class:`neo4j.Result` itself.

Example transformer that checks that exactly one record is in the
result stream, then returns the record and the result summary::

from typing import Tuple

import neo4j

def transformer(
result: neo4j.Result
) -> Tuple[neo4j.Record, neo4j.ResultSummary]:
record = result.single(strict=True)
summary = result.consume()
return record, summary

:type result_transformer:
typing.Callable[[neo4j.Result], typing.Union[T]]
:param bookmark_manager:
Specify a bookmark manager to use.

If present, the bookmark manager is used to keep the query causally
consistent with all work executed using the same bookmark manager.

Defaults to the driver's :attr:`.query_bookmark_manager`.

Pass :const:`None` to disable causal consistency.
:type bookmark_manager:
typing.Union[neo4j.BookmarkManager, neo4j.BookmarkManager,
None]
:param kwargs: additional keyword parameters.
These take precedence over parameters passed as ``parameters``.
:type kwargs: typing.Any

:returns: the result of the ``result_transformer``
:rtype: T

.. versionadded:: 5.2
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ before merging: make sure to adjust all of these!



.. _driver-configuration-ref:
Expand Down Expand Up @@ -921,11 +1066,22 @@ A :class:`neo4j.Result` is attached to an active connection, through a :class:`n

.. automethod:: to_df

.. automethod:: to_eager_result

.. automethod:: closed

See https://neo4j.com/docs/python-manual/current/cypher-workflow/#python-driver-type-mapping for more about type mapping.


***********
EagerResult
***********

.. autoclass:: neo4j.EagerResult
:show-inheritance:
:members:


Graph
=====

Expand Down Expand Up @@ -1265,6 +1421,15 @@ BookmarkManager
:members:


*************************
Constants, Enums, Helpers
*************************

.. autoclass:: neo4j.RoutingControl
:show-inheritance:
:members:


.. _errors-ref:

******
Expand Down
150 changes: 149 additions & 1 deletion docs/source/async_api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,153 @@ Closing a driver will immediately shut down all connections in the pool.
query, use :meth:`neo4j.AsyncDriver.verify_connectivity`.

.. autoclass:: neo4j.AsyncDriver()
:members: session, encrypted, close, verify_connectivity, get_server_info
:members: session, query_bookmark_manager, encrypted, close,
verify_connectivity, get_server_info

.. method:: execute_query(query, parameters=None,routing=neo4j.RoutingControl.WRITERS, database=None, impersonated_user=None, bookmark_manager=self.query_bookmark_manager, result_transformer=AsyncResult.to_eager_result, **kwargs)
:async:

Execute a query inside a retired transaction and return all results.

This method is a handy wrapper for lower-level driver APIs like
sessions, transactions, and transaction functions. It is intended
for simple use cases where there is no need for managing all possible
options.

The method is roughly equivalent to::

async def execute_query(
query, parameters, routing, database, impersonated_user,
bookmark_manager, result_transformer,
):
async def work(tx):
result = await tx.run(query, parameters)
return await some_transformer(result)

async with driver.session(
database=database,
impersonated_user=impersonated_user,
bookmark_manager=bookmark_manager,
) as session:
if routing == RoutingControl.WRITERS:
return await session.execute_write(work)
elif routing == RoutingControl.READERS:
return await session.execute_read(work)

Usage example::

from typing import List

import neo4j

async def example(driver: neo4j.AsyncDriver) -> List[str]:
"""Get the name of all 42 year-olds."""
records, summary, keys = await driver.execute_query(
"MATCH (p:Person {age: $age}) RETURN p.name",
{"age": 42},
routing=neo4j.RoutingControl.READERS, # or just "r"
database="neo4j",
)
assert keys == ["p.name"] # not needed, just for illustration
log.debug("some meta data: %s", summary)
return [str(record["p.name"]) for record in records]
# or: return [str(record[0]) for record in records]
# or even: return list(map(lambda r: str(r[0]), records))

Another example::

import neo4j

async def example(driver: neo4j.AsyncDriver) -> int:
"""Call all young people "My dear" and get their count."""
record = await driver.execute_query(
"MATCH (p:Person) WHERE n.age <= 15 "
"SET p.nickname = 'My dear' "
"RETURN count(*)",
routing=neo4j.RoutingControl.WRITERS, # or just "w"
database="neo4j",
result_transformer=neo4j.AsyncResult.single,
)
count = record[0]
assert isinstance(count, int)
return count

:param query: cypher query to execute
:type query: typing.Optional[str]
:param parameters: parameters to use in the query
:type parameters: typing.Optional[typing.Dict[str, typing.Any]]
:param routing:
whether to route the query to a reader (follower/read replica) or
a writer (leader) in the cluster. Default is to route to a writer.
:type routing: neo4j.RoutingControl
:param database:
database to execute the query against.

None (default) uses the database configured on the server side.

.. Note::
It is recommended to always specify the database explicitly
when possible. This allows the driver to work more efficiently,
as it will not have to resolve the default database first.

See also the Session config :ref:`database-ref`.
:type database: typing.Optional[str]
:param impersonated_user:
Name of the user to impersonate.

This means that all query will be executed in the security context
of the impersonated user. For this, the user for which the
:class:`Driver` has been created needs to have the appropriate
permissions.

See also the Session config
:type impersonated_user: typing.Optional[str]
:param result_transformer:
A function that gets passed the :class:`neo4j.AsyncResult` object
resulting from the query and converts it to a different type. The
result of the transformer function is returned by this method.

.. warning::

The transformer function must **not** return the
:class:`neo4j.AsyncResult` itself.

Example transformer that checks that exactly one record is in the
result stream, then returns the record and the result summary::

from typing import Tuple

import neo4j

async def transformer(
result: neo4j.AsyncResult
) -> Tuple[neo4j.Record, neo4j.ResultSummary]:
record = await result.single(strict=True)
summary = await result.consume()
return record, summary

:type result_transformer:
typing.Callable[[neo4j.AsyncResult], typing.Awaitable[T]]
:param bookmark_manager:
Specify a bookmark manager to use.

If present, the bookmark manager is used to keep the query causally
consistent with all work executed using the same bookmark manager.

Defaults to the driver's :attr:`.query_bookmark_manager`.

Pass :const:`None` to disable causal consistency.
:type bookmark_manager:
typing.Union[neo4j.AsyncBookmarkManager, neo4j.BookmarkManager,
None]
:param kwargs: additional keyword parameters.
These take precedence over parameters passed as ``parameters``.
:type kwargs: typing.Any

:returns: the result of the ``result_transformer``
:rtype: T

.. versionadded:: 5.2


.. _async-driver-configuration-ref:
Expand Down Expand Up @@ -593,6 +739,8 @@ A :class:`neo4j.AsyncResult` is attached to an active connection, through a :cla

.. automethod:: to_df

.. automethod:: to_eager_result

.. automethod:: closed

See https://neo4j.com/docs/python-manual/current/cypher-workflow/#python-driver-type-mapping for more about type mapping.
Expand Down
4 changes: 4 additions & 0 deletions neo4j/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from logging import getLogger as _getLogger

from ._api import RoutingControl
from ._async.driver import (
AsyncBoltDriver,
AsyncDriver,
Expand Down Expand Up @@ -57,6 +58,7 @@
Session,
Transaction,
)
from ._work import EagerResult
from .addressing import (
Address,
IPv4Address,
Expand Down Expand Up @@ -112,6 +114,7 @@
"custom_auth",
"DEFAULT_DATABASE",
"Driver",
"EagerResult",
"ExperimentalWarning",
"get_user_agent",
"GraphDatabase",
Expand All @@ -127,6 +130,7 @@
"Record",
"Result",
"ResultSummary",
"RoutingControl",
"ServerInfo",
"Session",
"SessionConfig",
Expand Down
Loading