Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python/adbc_driver_manager): experiment with using PyCapsules #702

Conversation

jorisvandenbossche
Copy link
Member

Experiment for using PyCapsules to communicate Arrow C Interface pointers instead of raw pointers (ints), see apache/arrow#34031

@jorisvandenbossche jorisvandenbossche marked this pull request as draft May 24, 2023 13:20
@jorisvandenbossche
Copy link
Member Author

Small showcase:

import adbc_driver_sqlite.dbapi

conn = adbc_driver_sqlite.dbapi.connect()
cursor = conn.cursor()

# using those private methods for now, to get a handle object
# (instead of already a pyarrow object)
cursor._prepare_execute("SELECT 1 as a, 2.0 as b, 'Hello, world!' as c")
handle, _ = cursor._stmt.execute_query()

# manually getting the capsule and passing it to pyarrow for now
capsule = handle._to_capsule()
pa.RecordBatchReader._import_from_c_capsule(capsule).read_all()
# pyarrow.Table
# a: int64
# b: double
# c: string
# ----
# a: [[1]]
# b: [[2]]
# c: [["Hello, world!"]]

# trying to import it a second time raises an error
pa.RecordBatchReader._import_from_c_capsule(capsule).read_all()
# ...
# ArrowInvalid: Cannot import released ArrowArrayStream

# when the capsule object gets deleted/collected -> release callback is not called
# because it was already consumed
del capsule

# but when the stream was not consumed, the capsule deleter will call the release callback
cursor._prepare_execute("SELECT 1 as a, 2.0 as b, 'Hello, world!' as c")
handle, _ = cursor._stmt.execute_query()
capsule = handle._to_capsule()
del capsule
# calling the release

Some design questions about this for the adbc manager side:

  • Currently the DBAPI methods like execute(..) already initializes the pyarrow RecordBatchReader. We might want to delay that creation until actually fetch_arrow_table() gets called? (or one of the other fetch variants that and up consuming the RecordBatchReader as well)
    And then we could for example have a fetch_arrow_stream() method that gives you some custom object that then has the appropriate protocol method like __arrow_c_stream__ (instead of the current dummy _to_capsule())

@lidavidm
Copy link
Member

Currently the DBAPI methods like execute(..) already initializes the pyarrow RecordBatchReader. We might want to delay that creation until actually fetch_arrow_table() gets called? (or one of the other fetch variants that and up consuming the RecordBatchReader as well)

The slight concern there is then you wouldn't get any errors until you actually go to fetch data (this can be confusing; gRPC does this and it trips me up still), but this is probably OK

@lidavidm
Copy link
Member

Or, oh, if it's just delaying the Python-side initialization that makes perfect sense.

@WillAyd
Copy link
Contributor

WillAyd commented May 24, 2023

Is your idea to create separate capsules for every object or just one capsule to expose the API? We do the latter in pandas to better share functions across extensions:

https://github.com/pandas-dev/pandas/blob/5a0230ba3fd306b02e74ade437c2f4a0089403da/pandas/_libs/pd_parser.c#L137

Not sure all the merits of a capsule per object versus one main capsule to define your API, but figured worth sharing

@jorisvandenbossche
Copy link
Member Author

Another question I have here is regarding lifetime of the various objects: @paleolimbot mentions in apache/arrow-nanoarrow#194 that "the ADBC spec requires that the AdbcStatement must outlive the stream" (although I can't directly find something about this in the spec/docs, but certainly makes sense).

I can't directly get it to fail or crash by trying to consume the capsule after the connection/cursor when out of scope with a toy example, but should we try to protect against this? (instead of only documenting that the consumer should keep the object alive).
It seems that a PyCapsule has a "context" that can be set (https://docs.python.org/3/c-api/capsule.html#c.PyCapsule_SetContext), which could maybe be used for this.

Is there already some code right now to handle this? Dewey mentioned "the Python bindings do some reference counting to ensure that you can't release (say) a connection while there's still an open statement", but that's only at the level of adbc objects (connections, statements) and not for resulting streams?

Copy link
Member

@paleolimbot paleolimbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may not matter for the general case, but in this particular instance the stream has a very specific lifetime (must be contained within that of a statement, if I understand correctly). Does calling __arrow_c_stream__() and exporting a capsule imply that the lifetime is entirely handled by the caller? Or is that still implementation-defined?

Comment on lines +468 to +470
stream = <CArrowArrayStream*>cpython.PyCapsule_GetPointer(
stream_capsule, 'arrowarraystream'
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any situation in which stream can be NULL? (In R this happens if somebody tries the equivalent of pickling and unpickling, but I presume that would error at the pickling stage here?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it can be. You can see this in the CPython documentation:

https://docs.python.org/3/c-api/capsule.html#c.PyCapsule_GetPointer

I think just need to immediately return if that is NULL

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is pretty tricky though. PyCapsule_GetPointer will set the global python error but I'm not sure how you'd know to check for that after this is executed; so this could potentially be a pitfall of segfaults or leaked pointers

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to check for error, Cython will do it for you thanks to its PyCapsule_GetPointer declaration here:
https://github.com/cython/cython/blob/d73164b56544def09b65d250d72b227a38944bb1/Cython/Includes/cpython/pycapsule.pxd#L50

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for having a NULL stream pointer in a C ArrowArrayStream capsule, this should probably be disallowed by the spec.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like it is not possible anyway ( https://docs.python.org/3/c-api/capsule.html#c.PyCapsule_New ) and would only ever occur on error (perhaps if there was a capsule name mismatch). The fact that this can happen in R is a peculiarity of R's save/load...it seems unlikely in the destructor here but perhaps worth checking to avoid a crash.

@jorisvandenbossche
Copy link
Member Author

Is your idea to create separate capsules for every object or just one capsule to expose the API? We do the latter in pandas to better share functions across extensions:

Yeah, that's a bit a different kind of use case for capsules. The idea here is not to expose some module's C API, but indeed the individual objects (result sets). For context, see also the descriptions in apache/arrow#34031 and apache/arrow#35531.

The end goal would be to be able to do something like cursor.fetch_arrow_stream() which returns an object that has a __arrow_c_stream__ method which returns a capsule wrapping a C Stream struct pointer, and so which can be consumed by any library that can consume the C interface (duckdb, datafusion, etc) without relying on pyarrow.

@paleolimbot
Copy link
Member

@jorisvandenbossche I didn't see your comment before writing that!

I (experimentally) handle that in the R bindings by returning a stream wrapper and moving the statement to a pointer at a new address (i.e., invalidating any previous R objects that had been pointing to that). There is a function nanoarrow_pointer_export() that very specifically exports a schema/array/stream with a self-contained lifecycle (such that it can be passed anywhere safely). Before that happens, dependencies are managed via references (presumably that's what the capsule's Context is doing but that doesn't seem to be documented?).

@lidavidm
Copy link
Member

although I can't directly find something about this in the spec/docs, but certainly makes sense

we should probably take the chance to clarify this

For here: we could wrap the stream in a new stream that delegates to the original, and keeps a strong reference to the Python object?

@@ -498,6 +511,11 @@ cdef class ArrowArrayStreamHandle:
"""The address of the ArrowArrayStream."""
return <uintptr_t> &self.stream

def _to_capsule(self):
return cpython.PyCapsule_New(
&self.stream, 'arrowarraystream', pycapsule_stream_deleter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will not keep self alive and therefore the capsule pointer may become unreachable.

I think the capsule needs to own a malloc'ed ArrowArrayStream pointer. @lidavidm Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, once we have a standard capsule format, I'm not sure ArrowArrayStreamHandle is still useful.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, once we have a standard capsule format, I'm not sure ArrowArrayStreamHandle is still useful.

Sure, it was a stopgap.

I think the capsule needs to own a malloc'ed ArrowArrayStream pointer.

Either that or it should keep the Python object itself alive (which may be desirable since it'll also keep associated ADBC state alive)

Copy link
Member

@pitrou pitrou May 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping the Python object alive from the capsule would probably be harder?

Keep in mind any producer of the capsule format will need to reimplement its own destructor function (we can publish sample code though).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not saying all producers have to, just here, in this library.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it a bug if the ADBC-produced capsule needs to keep a Python object alive, though? The release callback should take care of the lifetime of any required state.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of just stuffing it behind the stream above, yes. #702 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose then, yes, it would be a malloc'd stream, not the stream pointer here. Sorry for getting mixed up.

@paleolimbot
Copy link
Member

For here: we could wrap the stream in a new stream that delegates to the original, and keeps a strong reference to the Python object?

I wonder if you'd want the stream to take full ownership of the statement (if it was exported via __arrow_c_stream__()). Leaving the destruction of the statement up to the garbage collector can be sort of a pain (I've gotten a lot of "connection busy" errors while trying to get the lifecycles right in R).

@jorisvandenbossche
Copy link
Member Author

Related to the lifetime management inline discussion (#702 (comment)):

Keeping the Python object alive from the capsule would probably be harder?

I think this should be possible relatively easily using the capsule's context something like:

# adding this to the destructor
@@ -469,9 +469,13 @@ cdef void pycapsule_stream_deleter(object stream_capsule):
 
     if stream.release != NULL:
         stream.release(stream)

+    parent = cpython.PyCapsule_GetContext(stream_capsule)
+    cpython.Py_DECREF(<object>parent)

 
# adding this where creating the capsule
@@ -512,9 +516,12 @@ cdef class ArrowArrayStreamHandle:
         return <uintptr_t> &self.stream
 
     def _to_capsule(self):
-        return cpython.PyCapsule_New(
+        capsule = cpython.PyCapsule_New(
             &self.stream, 'arrowarraystream', pycapsule_stream_deleter
         )
+        cpython.PyCapsule_SetContext(capsule, <void *>self)
+        cpython.Py_INCREF(self)
+        return capsule

However, I am not sure how much this solves (apart from not needing to malloc the stream struct). Assuming that a typical consumer of a capsule object will move the stream, they would still need to keep the capsule python object alive for the purpose of ensuring the statement/connection outlives the stream (assuming that the python object referenced by the capsule would keep those alive).
So either way, the consumer needs to keep a reference to some python object (either the capsule, or the connection/statement of which a method returned a capsule)

For here: we could wrap the stream in a new stream that delegates to the original, and keeps a strong reference to the Python object?

@lidavidm what do you mean with "wrapping" here exactly? Creating a new, actual ArrowArrayStream* struct (and it would keep a strong reference to the python object in the private_data?), or creating a python wrapper object (although that would be similar to the existing ArrowArrayStreamHandle, so probably not this)?

@lidavidm
Copy link
Member

The former.

@paleolimbot
Copy link
Member

In case it saves anybody any typing, R's version of the "wrap + keep strong reference" is here: https://github.com/apache/arrow-nanoarrow/blob/main/r/src/array_stream.c#L144-L227 .

@jorisvandenbossche
Copy link
Member Author

jorisvandenbossche commented Jun 8, 2023

The last commit I pushed (7d62ef9) contains an implementation of "wrap stream in a new stream keeping reference to python object" (as far as I understood, largely based on Dewey's example from nanoarrow-r).

What it can do is avoiding a segfault from trying to consume the stream outside of the connection/cursor context (or after closing the cursor and connection):

import adbc_driver_sqlite.dbapi
import adbc_driver_manager

with adbc_driver_sqlite.dbapi.connect("test_db.sqlite") as conn:
    with conn.cursor() as cursor:
        # this still needs to be integrated in the DBAPI, eg as `capsule = cursor.fetch_arrow_stream()`
        cursor._prepare_execute("SELECT * FROM test_arrow")
        capsule, _ = cursor._stmt.execute_query()
        # without the next line (i.e. using the "raw" stream), this snippet segfaults
        capsule = adbc_driver_manager._lib.export_array_stream(capsule, cursor._stmt)

import pyarrow as pa
table = pa.RecordBatchReader._import_from_c_capsule(capsule).read_all()

Without the "export array stream" with the wrapper, this example segfaults (because it tries to get the schema and next batch of a stream that references a closed transaction). With exporting the stream as a wrapped stream, the example raises a proper python exception about trying to consume the stream from a closed transaction.

Now, that exception only happens because in the wrapped ArrowArrayStream methods, I added a check for a closed transaction (without this check, the wrapped stream doesn't seem to serve much purpose, because it is not that it can keep the transaction "open" by adding a reference to it).
But I do wonder if that is not something that the actual driver implementation could also do in their implementation of get_schema/get_next (I don't know the driver's implementations well enough, though), so I don't have to do this here in the wrapper. The wrapper implementation that I now added adds quite some complexity (although it is nice that we can avoid the python snippet causing a segfault), but wondering if we can't say it is the responsibility of the driver to return an error in case of a closed transaction.

@pitrou
Copy link
Member

pitrou commented Jun 8, 2023

wondering if we can't say it is the responsibility of the driver to return an error in case of a closed transaction.

+1

@lidavidm
Copy link
Member

lidavidm commented Jun 8, 2023

Well, of course things shouldn't crash even if you try the wrong operation (barring things we can't guard against well like using an uninitialized object)

@jorisvandenbossche
Copy link
Member Author

Yes, but so currently both the sqlite and postgresql driver crash when consuming a stream from a closed transaction (I can take a look at fixing that, though).

If we say it's the driver's responsibility to error for this, is there still any value in the "wrapped array stream" that keeps a reference to the Python AdbcStatement object? I assume that the original ArrowArrayStream references the C transaction/connection (or TupleReader in the postgres driver), and there is not necessarily a need to explicitly track the python object? Or is that still needed to also keep the C object alive? (from a quick experiment letting the postgres-produced stream raise if the statement was released, this doesn't seem needed)

@lidavidm
Copy link
Member

lidavidm commented Jun 8, 2023

I think it's fine to just replace the current pointer with a PyCapsule for now without worrying about these problems (I'd rather we have the right type now than hold this up on fixing a bunch of other things)

@lidavidm
Copy link
Member

lidavidm commented Jun 8, 2023

I would prefer that consumers explicitly hold open the connection, but it would certainly be more convenient for the PyCapsule/stream to do this implicitly.

@paleolimbot
Copy link
Member

I would prefer that consumers explicitly hold open the connection, but it would certainly be more convenient for the PyCapsule/stream to do this implicitly.

Perhaps I missed it, but I think the question is not whether the stream should close the connection/statement, but whether the stream should hold a strong reference to the Python object that keeps the memory for the AdbcConnection/AdbcStatement from being freed. If it does not hold a strong reference to that object, the failure mode could be a crash because the check for "valid connection" (connection|statement->private_data != NULL) involves accessing memory that has (or could have been) freed. That crash might be difficult to trigger in practice if PyMem_Malloc() was used (which I think is the case here).

Joris noted earlier that you could also use the capsule's "context" ( https://docs.python.org/3/c-api/capsule.html#c.PyCapsule_SetContext ) instead of wrapping the stream (the documentation is a bit vague, but presumably that does the same reference increment/decrement that Joris' wrapper is doing here). I don't think you want to do that because as soon as the stream is moved to a different memory location (i.e., RecordBatch._import_from_c()) that reference disappears.


cdef struct ArrowArrayStreamWrapper:
cpython.PyObject* parent_statement
CArrowArrayStream* parent_array_stream
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you may want this struct to own the memory here rather than just a pointer (i.e., CArrowArrayStream parent_array_stream).

Comment on lines +1226 to +1227
if (<AdbcStatement>data.parent_statement).closed:
data.error_set = True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure that you need error_set: in theory, the array stream that you are wrapping should be performing that check before doing something that might crash (although I get that right now it might not be)?

@jorisvandenbossche
Copy link
Member Author

I would prefer that consumers explicitly hold open the connection, but it would certainly be more convenient for the PyCapsule/stream to do this implicitly.

I don't think this is actually possible (or desirable): the stream capsule itself cannot keep the connection/statement open, because it is the exiting from the with context that closes this connection/statement. Keeping a reference to the connection/statement object does not prevent it from being closed in the context exit (unless we would change that, but I don't think we want that).

Perhaps I missed it, but I think the question is not whether the stream should close the connection/statement, but whether the stream should hold a strong reference to the Python object that keeps the memory for the AdbcConnection/AdbcStatement from being freed

Based on my current understanding, you don't actually need the Python object for that, since the stream's private data holds the actual C AdbcConnection/Statement, and it's the driver (which itself is not implemented in Python) that already ensures the stream keeps that alive? (not super familiar with the driver implementations, though, so not sure about this. See also second paragraph in #702 (comment))

@paleolimbot
Copy link
Member

the stream's private data holds the actual C AdbcConnection/Statement

I'm pretty sure it just holds a pointer to the AdbcStatement (and even that is optional e.g., https://github.com/apache/arrow-adbc/blob/main/c/driver/postgresql/statement.h#L37-L70 ).

@lidavidm
Copy link
Member

I think we can close this now?

@lidavidm
Copy link
Member

Superseded by #1346

@lidavidm lidavidm closed this Feb 29, 2024
@jorisvandenbossche jorisvandenbossche deleted the poc-c-interface-protocol branch March 15, 2024 15:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants