Skip to content

Commit

Permalink
Extend Result's API
Browse files Browse the repository at this point in the history
 * Introduce `Result.fetch(n)`
 * Revert `Result.single()` to be lenient again when not exactly one record is
   left in the stream.
   Partially reverts neo4j#646
 * Add `strict` parameter to `Result.single()` to enable strict checking of the
   number of records in the stream.
  • Loading branch information
robsdedude committed Mar 8, 2022
1 parent 2af7588 commit 5c70481
Show file tree
Hide file tree
Showing 10 changed files with 322 additions and 51 deletions.
2 changes: 0 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@
- Creation of a driver with `bolt[+s[sc]]://` scheme has been deprecated and
will raise an error in the Future. The routing context was and will be
silently ignored until then.
- `Result.single` now raises `ResultNotSingleError` if not exactly one result is
available.
- Bookmarks
- `Session.last_bookmark` was deprecated. Its behaviour is partially incorrect
and cannot be fixed without breaking its signature.
Expand Down
5 changes: 5 additions & 0 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,8 @@ A :class:`neo4j.Result` is attached to an active connection, through a :class:`n

.. automethod:: single

.. automethod:: fetch

.. automethod:: peek

.. automethod:: graph
Expand Down Expand Up @@ -1368,6 +1370,9 @@ Connectivity Errors
.. autoclass:: neo4j.exceptions.ResultConsumedError
:show-inheritance:
.. autoclass:: neo4j.exceptions.ResultNotSingleError
:show-inheritance:
Internal Driver Errors
Expand Down
2 changes: 2 additions & 0 deletions docs/source/async_api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,8 @@ A :class:`neo4j.AsyncResult` is attached to an active connection, through a :cla

.. automethod:: single

.. automethod:: fetch

.. automethod:: peek

.. automethod:: graph
Expand Down
92 changes: 72 additions & 20 deletions neo4j/_async/work/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@


from collections import deque
from warnings import warn

from ..._async_compat.util import AsyncUtil
from ...data import DataDehydrator
Expand Down Expand Up @@ -248,11 +249,11 @@ async def _buffer(self, n=None):
record_buffer.append(record)
if n is not None and len(record_buffer) >= n:
break
self._exhausted = False
if n is None:
self._record_buffer = record_buffer
else:
self._record_buffer.extend(record_buffer)
self._exhausted = not self._record_buffer

async def _buffer_all(self):
"""Sets the Result object in an detached state by fetching all records
Expand Down Expand Up @@ -286,12 +287,20 @@ def keys(self):
"""
return self._keys

async def _exhaust(self):
# Exhaust the result, ditching all remaining records.
if not self._exhausted:
self._discarding = True
self._record_buffer.clear()
async for _ in self:
pass

async def _tx_end(self):
# Handle closure of the associated transaction.
#
# This will consume the result and mark it at out of scope.
# Subsequent calls to `next` will raise a ResultConsumedError.
await self.consume()
await self._exhaust()
self._out_of_scope = True

async def consume(self):
Expand Down Expand Up @@ -330,42 +339,85 @@ async def get_two_tx(tx):
:returns: The :class:`neo4j.ResultSummary` for this result
"""
if self._exhausted is False:
self._discarding = True
async for _ in self:
pass
if self._exhausted:
if self._out_of_scope:
raise ResultConsumedError(self, _RESULT_OUT_OF_SCOPE_ERROR)
if self._consumed:
raise ResultConsumedError(self, _RESULT_CONSUMED_ERROR)
else:
await self._exhaust()

summary = self._obtain_summary()
self._consumed = True
return summary

async def single(self):
"""Obtain the next and only remaining record from this result if available else return None.
async def single(self, strict=False):
"""Obtain the next and only remaining record or None.
Calling this method always exhausts the result.
A warning is generated if more than one record is available but
the first of these is still returned.
:returns: the next :class:`neo4j.AsyncRecord`.
:param strict:
If :const:`True`, raise a :class:`neo4j.ResultNotSingleError`
instead of returning None if there is more than one record or
warning if there are more than 1 record.
:const:`False` by default.
:type strict: bool
:raises ResultNotSingleError: if not exactly one record is available.
:raises ResultConsumedError: if the transaction from which this result was
obtained has been closed.
:returns: the next :class:`neo4j.Record` or :const:`None` if none remain
:warns: if more than one record is available
:raises ResultNotSingleError:
If ``strict=True`` and not exactly one record is available.
:raises ResultConsumedError: if the transaction from which this result
was obtained has been closed.
.. versionchanged:: 5.0
Added ``strict`` parameter.
"""
await self._buffer(2)
if not self._record_buffer:
buffer = self._record_buffer
self._record_buffer = deque()
await self._exhaust()
if not buffer:
if not strict:
return None
raise ResultNotSingleError(
self,
"No records found. "
"Make sure your query returns exactly one record."
)
elif len(self._record_buffer) > 1:
raise ResultNotSingleError(
self,
"More than one record found. "
"Make sure your query returns exactly one record."
)
return self._record_buffer.popleft()
elif len(buffer) > 1:
res = buffer.popleft()
if not strict:
warn("Expected a result with a single record, "
"but found multiple.")
return res
else:
raise ResultNotSingleError(
self,
"More than one record found. "
"Make sure your query returns exactly one record."
)
return buffer.popleft()

async def fetch(self, n):
"""Obtain up to n records from this result.
:param n: the maximum number of records to fetch.
:type n: int
:returns: list of :class:`neo4j.AsyncRecord`
.. versionadded:: 5.0
"""
await self._buffer(n)
return [
self._record_buffer.popleft()
for _ in range(min(n, len(self._record_buffer)))
]

async def peek(self):
"""Obtain the next record from this result without consuming it.
Expand Down
92 changes: 72 additions & 20 deletions neo4j/_sync/work/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@


from collections import deque
from warnings import warn

from ..._async_compat.util import Util
from ...data import DataDehydrator
Expand Down Expand Up @@ -248,11 +249,11 @@ def _buffer(self, n=None):
record_buffer.append(record)
if n is not None and len(record_buffer) >= n:
break
self._exhausted = False
if n is None:
self._record_buffer = record_buffer
else:
self._record_buffer.extend(record_buffer)
self._exhausted = not self._record_buffer

def _buffer_all(self):
"""Sets the Result object in an detached state by fetching all records
Expand Down Expand Up @@ -286,12 +287,20 @@ def keys(self):
"""
return self._keys

def _exhaust(self):
# Exhaust the result, ditching all remaining records.
if not self._exhausted:
self._discarding = True
self._record_buffer.clear()
for _ in self:
pass

def _tx_end(self):
# Handle closure of the associated transaction.
#
# This will consume the result and mark it at out of scope.
# Subsequent calls to `next` will raise a ResultConsumedError.
self.consume()
self._exhaust()
self._out_of_scope = True

def consume(self):
Expand Down Expand Up @@ -330,42 +339,85 @@ def get_two_tx(tx):
:returns: The :class:`neo4j.ResultSummary` for this result
"""
if self._exhausted is False:
self._discarding = True
for _ in self:
pass
if self._exhausted:
if self._out_of_scope:
raise ResultConsumedError(self, _RESULT_OUT_OF_SCOPE_ERROR)
if self._consumed:
raise ResultConsumedError(self, _RESULT_CONSUMED_ERROR)
else:
self._exhaust()

summary = self._obtain_summary()
self._consumed = True
return summary

def single(self):
"""Obtain the next and only remaining record from this result if available else return None.
def single(self, strict=False):
"""Obtain the next and only remaining record or None.
Calling this method always exhausts the result.
A warning is generated if more than one record is available but
the first of these is still returned.
:returns: the next :class:`neo4j.Record`.
:param strict:
If :const:`True`, raise a :class:`neo4j.ResultNotSingleError`
instead of returning None if there is more than one record or
warning if there are more than 1 record.
:const:`False` by default.
:type strict: bool
:raises ResultNotSingleError: if not exactly one record is available.
:raises ResultConsumedError: if the transaction from which this result was
obtained has been closed.
:returns: the next :class:`neo4j.Record` or :const:`None` if none remain
:warns: if more than one record is available
:raises ResultNotSingleError:
If ``strict=True`` and not exactly one record is available.
:raises ResultConsumedError: if the transaction from which this result
was obtained has been closed.
.. versionchanged:: 5.0
Added ``strict`` parameter.
"""
self._buffer(2)
if not self._record_buffer:
buffer = self._record_buffer
self._record_buffer = deque()
self._exhaust()
if not buffer:
if not strict:
return None
raise ResultNotSingleError(
self,
"No records found. "
"Make sure your query returns exactly one record."
)
elif len(self._record_buffer) > 1:
raise ResultNotSingleError(
self,
"More than one record found. "
"Make sure your query returns exactly one record."
)
return self._record_buffer.popleft()
elif len(buffer) > 1:
res = buffer.popleft()
if not strict:
warn("Expected a result with a single record, "
"but found multiple.")
return res
else:
raise ResultNotSingleError(
self,
"More than one record found. "
"Make sure your query returns exactly one record."
)
return buffer.popleft()

def fetch(self, n):
"""Obtain up to n records from this result.
:param n: the maximum number of records to fetch.
:type n: int
:returns: list of :class:`neo4j.Record`
.. versionadded:: 5.0
"""
self._buffer(n)
return [
self._record_buffer.popleft()
for _ in range(min(n, len(self._record_buffer)))
]

def peek(self):
"""Obtain the next record from this result without consuming it.
Expand Down
2 changes: 1 addition & 1 deletion neo4j/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ class ResultConsumedError(ResultError):


class ResultNotSingleError(ResultError):
"""Raised when result.single() detects not exactly one record in result."""
"""Raised when a result should have exactly one record but does not."""


class ServiceUnavailable(DriverError):
Expand Down
2 changes: 1 addition & 1 deletion testkitbackend/_async/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ async def ResultNext(backend, data):
async def ResultSingle(backend, data):
result = backend.results[data["resultId"]]
await backend.send_response("Record", totestkit.record(
await result.single()
await result.single(strict=True)
))


Expand Down
2 changes: 1 addition & 1 deletion testkitbackend/_sync/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ def ResultNext(backend, data):
def ResultSingle(backend, data):
result = backend.results[data["resultId"]]
backend.send_response("Record", totestkit.record(
result.single()
result.single(strict=True)
))


Expand Down
Loading

0 comments on commit 5c70481

Please sign in to comment.