-
Notifications
You must be signed in to change notification settings - Fork 99
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(python/adbc_driver_manager): experiment with using PyCapsules #702
feat(python/adbc_driver_manager): experiment with using PyCapsules #702
Conversation
Small showcase: import adbc_driver_sqlite.dbapi
conn = adbc_driver_sqlite.dbapi.connect()
cursor = conn.cursor()
# using those private methods for now, to get a handle object
# (instead of already a pyarrow object)
cursor._prepare_execute("SELECT 1 as a, 2.0 as b, 'Hello, world!' as c")
handle, _ = cursor._stmt.execute_query()
# manually getting the capsule and passing it to pyarrow for now
capsule = handle._to_capsule()
pa.RecordBatchReader._import_from_c_capsule(capsule).read_all()
# pyarrow.Table
# a: int64
# b: double
# c: string
# ----
# a: [[1]]
# b: [[2]]
# c: [["Hello, world!"]]
# trying to import it a second time raises an error
pa.RecordBatchReader._import_from_c_capsule(capsule).read_all()
# ...
# ArrowInvalid: Cannot import released ArrowArrayStream
# when the capsule object gets deleted/collected -> release callback is not called
# because it was already consumed
del capsule
# but when the stream was not consumed, the capsule deleter will call the release callback
cursor._prepare_execute("SELECT 1 as a, 2.0 as b, 'Hello, world!' as c")
handle, _ = cursor._stmt.execute_query()
capsule = handle._to_capsule()
del capsule
# calling the release Some design questions about this for the adbc manager side:
|
The slight concern there is then you wouldn't get any errors until you actually go to fetch data (this can be confusing; gRPC does this and it trips me up still), but this is probably OK |
Or, oh, if it's just delaying the Python-side initialization that makes perfect sense. |
Is your idea to create separate capsules for every object or just one capsule to expose the API? We do the latter in pandas to better share functions across extensions: Not sure all the merits of a capsule per object versus one main capsule to define your API, but figured worth sharing |
Another question I have here is regarding lifetime of the various objects: @paleolimbot mentions in apache/arrow-nanoarrow#194 that "the ADBC spec requires that the AdbcStatement must outlive the stream" (although I can't directly find something about this in the spec/docs, but certainly makes sense). I can't directly get it to fail or crash by trying to consume the capsule after the connection/cursor when out of scope with a toy example, but should we try to protect against this? (instead of only documenting that the consumer should keep the object alive). Is there already some code right now to handle this? Dewey mentioned "the Python bindings do some reference counting to ensure that you can't release (say) a connection while there's still an open statement", but that's only at the level of adbc objects (connections, statements) and not for resulting streams? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may not matter for the general case, but in this particular instance the stream has a very specific lifetime (must be contained within that of a statement, if I understand correctly). Does calling __arrow_c_stream__()
and exporting a capsule imply that the lifetime is entirely handled by the caller? Or is that still implementation-defined?
stream = <CArrowArrayStream*>cpython.PyCapsule_GetPointer( | ||
stream_capsule, 'arrowarraystream' | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any situation in which stream
can be NULL? (In R this happens if somebody tries the equivalent of pickling and unpickling, but I presume that would error at the pickling stage here?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it can be. You can see this in the CPython documentation:
https://docs.python.org/3/c-api/capsule.html#c.PyCapsule_GetPointer
I think just need to immediately return if that is NULL
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is pretty tricky though. PyCapsule_GetPointer will set the global python error but I'm not sure how you'd know to check for that after this is executed; so this could potentially be a pitfall of segfaults or leaked pointers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need to check for error, Cython will do it for you thanks to its PyCapsule_GetPointer
declaration here:
https://github.com/cython/cython/blob/d73164b56544def09b65d250d72b227a38944bb1/Cython/Includes/cpython/pycapsule.pxd#L50
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As for having a NULL stream pointer in a C ArrowArrayStream capsule, this should probably be disallowed by the spec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like it is not possible anyway ( https://docs.python.org/3/c-api/capsule.html#c.PyCapsule_New ) and would only ever occur on error (perhaps if there was a capsule name mismatch). The fact that this can happen in R is a peculiarity of R's save/load...it seems unlikely in the destructor here but perhaps worth checking to avoid a crash.
Yeah, that's a bit a different kind of use case for capsules. The idea here is not to expose some module's C API, but indeed the individual objects (result sets). For context, see also the descriptions in apache/arrow#34031 and apache/arrow#35531. The end goal would be to be able to do something like |
@jorisvandenbossche I didn't see your comment before writing that! I (experimentally) handle that in the R bindings by returning a stream wrapper and moving the statement to a pointer at a new address (i.e., invalidating any previous R objects that had been pointing to that). There is a function |
we should probably take the chance to clarify this For here: we could wrap the stream in a new stream that delegates to the original, and keeps a strong reference to the Python object? |
@@ -498,6 +511,11 @@ cdef class ArrowArrayStreamHandle: | |||
"""The address of the ArrowArrayStream.""" | |||
return <uintptr_t> &self.stream | |||
|
|||
def _to_capsule(self): | |||
return cpython.PyCapsule_New( | |||
&self.stream, 'arrowarraystream', pycapsule_stream_deleter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will not keep self
alive and therefore the capsule pointer may become unreachable.
I think the capsule needs to own a malloc'ed ArrowArrayStream pointer. @lidavidm Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, once we have a standard capsule format, I'm not sure ArrowArrayStreamHandle
is still useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, once we have a standard capsule format, I'm not sure
ArrowArrayStreamHandle
is still useful.
Sure, it was a stopgap.
I think the capsule needs to own a malloc'ed ArrowArrayStream pointer.
Either that or it should keep the Python object itself alive (which may be desirable since it'll also keep associated ADBC state alive)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keeping the Python object alive from the capsule would probably be harder?
Keep in mind any producer of the capsule format will need to reimplement its own destructor function (we can publish sample code though).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not saying all producers have to, just here, in this library.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it a bug if the ADBC-produced capsule needs to keep a Python object alive, though? The release callback should take care of the lifetime of any required state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking of just stuffing it behind the stream above, yes. #702 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose then, yes, it would be a malloc'd stream, not the stream pointer here. Sorry for getting mixed up.
I wonder if you'd want the stream to take full ownership of the statement (if it was exported via |
Related to the lifetime management inline discussion (#702 (comment)):
I think this should be possible relatively easily using the capsule's context something like: # adding this to the destructor
@@ -469,9 +469,13 @@ cdef void pycapsule_stream_deleter(object stream_capsule):
if stream.release != NULL:
stream.release(stream)
+ parent = cpython.PyCapsule_GetContext(stream_capsule)
+ cpython.Py_DECREF(<object>parent)
# adding this where creating the capsule
@@ -512,9 +516,12 @@ cdef class ArrowArrayStreamHandle:
return <uintptr_t> &self.stream
def _to_capsule(self):
- return cpython.PyCapsule_New(
+ capsule = cpython.PyCapsule_New(
&self.stream, 'arrowarraystream', pycapsule_stream_deleter
)
+ cpython.PyCapsule_SetContext(capsule, <void *>self)
+ cpython.Py_INCREF(self)
+ return capsule However, I am not sure how much this solves (apart from not needing to malloc the stream struct). Assuming that a typical consumer of a capsule object will move the stream, they would still need to keep the capsule python object alive for the purpose of ensuring the statement/connection outlives the stream (assuming that the python object referenced by the capsule would keep those alive).
@lidavidm what do you mean with "wrapping" here exactly? Creating a new, actual |
The former. |
In case it saves anybody any typing, R's version of the "wrap + keep strong reference" is here: https://github.com/apache/arrow-nanoarrow/blob/main/r/src/array_stream.c#L144-L227 . |
The last commit I pushed (7d62ef9) contains an implementation of "wrap stream in a new stream keeping reference to python object" (as far as I understood, largely based on Dewey's example from nanoarrow-r). What it can do is avoiding a segfault from trying to consume the stream outside of the connection/cursor context (or after closing the cursor and connection): import adbc_driver_sqlite.dbapi
import adbc_driver_manager
with adbc_driver_sqlite.dbapi.connect("test_db.sqlite") as conn:
with conn.cursor() as cursor:
# this still needs to be integrated in the DBAPI, eg as `capsule = cursor.fetch_arrow_stream()`
cursor._prepare_execute("SELECT * FROM test_arrow")
capsule, _ = cursor._stmt.execute_query()
# without the next line (i.e. using the "raw" stream), this snippet segfaults
capsule = adbc_driver_manager._lib.export_array_stream(capsule, cursor._stmt)
import pyarrow as pa
table = pa.RecordBatchReader._import_from_c_capsule(capsule).read_all() Without the "export array stream" with the wrapper, this example segfaults (because it tries to get the schema and next batch of a stream that references a closed transaction). With exporting the stream as a wrapped stream, the example raises a proper python exception about trying to consume the stream from a closed transaction. Now, that exception only happens because in the wrapped ArrowArrayStream methods, I added a check for a closed transaction (without this check, the wrapped stream doesn't seem to serve much purpose, because it is not that it can keep the transaction "open" by adding a reference to it). |
+1 |
Well, of course things shouldn't crash even if you try the wrong operation (barring things we can't guard against well like using an uninitialized object) |
Yes, but so currently both the sqlite and postgresql driver crash when consuming a stream from a closed transaction (I can take a look at fixing that, though). If we say it's the driver's responsibility to error for this, is there still any value in the "wrapped array stream" that keeps a reference to the Python AdbcStatement object? I assume that the original ArrowArrayStream references the C transaction/connection (or TupleReader in the postgres driver), and there is not necessarily a need to explicitly track the python object? Or is that still needed to also keep the C object alive? (from a quick experiment letting the postgres-produced stream raise if the statement was released, this doesn't seem needed) |
I think it's fine to just replace the current pointer with a PyCapsule for now without worrying about these problems (I'd rather we have the right type now than hold this up on fixing a bunch of other things) |
I would prefer that consumers explicitly hold open the connection, but it would certainly be more convenient for the PyCapsule/stream to do this implicitly. |
Perhaps I missed it, but I think the question is not whether the stream should close the connection/statement, but whether the stream should hold a strong reference to the Python object that keeps the memory for the Joris noted earlier that you could also use the capsule's "context" ( https://docs.python.org/3/c-api/capsule.html#c.PyCapsule_SetContext ) instead of wrapping the stream (the documentation is a bit vague, but presumably that does the same reference increment/decrement that Joris' wrapper is doing here). I don't think you want to do that because as soon as the stream is moved to a different memory location (i.e., |
|
||
cdef struct ArrowArrayStreamWrapper: | ||
cpython.PyObject* parent_statement | ||
CArrowArrayStream* parent_array_stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you may want this struct to own the memory here rather than just a pointer (i.e., CArrowArrayStream parent_array_stream
).
if (<AdbcStatement>data.parent_statement).closed: | ||
data.error_set = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure that you need error_set
: in theory, the array stream that you are wrapping should be performing that check before doing something that might crash (although I get that right now it might not be)?
I don't think this is actually possible (or desirable): the stream capsule itself cannot keep the connection/statement open, because it is the exiting from the
Based on my current understanding, you don't actually need the Python object for that, since the stream's private data holds the actual C AdbcConnection/Statement, and it's the driver (which itself is not implemented in Python) that already ensures the stream keeps that alive? (not super familiar with the driver implementations, though, so not sure about this. See also second paragraph in #702 (comment)) |
I'm pretty sure it just holds a pointer to the |
I think we can close this now? |
Superseded by #1346 |
Experiment for using PyCapsules to communicate Arrow C Interface pointers instead of raw pointers (ints), see apache/arrow#34031