-
Notifications
You must be signed in to change notification settings - Fork 38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(extensions/nanoarrow_device): Implement asynchronous buffer copying #509
Conversation
955fa68
to
1bf2f71
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't review the CUDA async details, but just took a look if I could follow the general logic, and added a few questions
src/nanoarrow/nanoarrow_device.h
Outdated
/// \brief Copy an ArrowDeviceArray to a device | ||
ArrowErrorCode (*array_copy)(struct ArrowDeviceArrayView* src, | ||
struct ArrowDevice* device_dst, | ||
struct ArrowDeviceArray* dst); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assumption here is that the ArrowDevice instance calling this method is the source device? (i.e. equal to src->device
?) Or is it meant for either copying to and from the device?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ArrowDeviceArrayViewCopy()
function does the dispatch here...it tries src->device->array_copy()
, then dst->array_copy()
, and then falls back to an implementation based on just buffer copying if both of those return ENOTSUP
. Basically, either the source or the destination has to know how to do the copy (and the CPU device returns ENOTSUP for everything except cpu--cpu).
dst_view.data.data = &(array_view->buffer_views[2].size_bytes); | ||
dst_view.size_bytes = sizeof(int32_t); | ||
|
||
// Note that this strategy (copying four bytes into an int64_t) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where are you copying it into an int64_t? (the dst_view
above is also set to be the size of int32)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, because essentially you are copying this into array_view->buffer_views[2].size_bytes
and size_bytes
is an int64?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a comment that hopefully clarifies this!
// Don't initialize a buffer that has already been initialized | ||
if (buffer_dst->size_bytes > 0) { | ||
continue; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This ensures a buffer (that was already copied) doesn't get copied a second time when calling ArrowDeviceCudaArrayViewCopyBuffers
a second time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's so that we can just call this function again (instead of keeping track of a sorted list of flattened buffer indexes or something to know exactly which buffers to initialize a copy for on the second pass). I added a comment that hopefully clarifies this!
static ArrowErrorCode ArrowDeviceCudaAllocateBuffer(struct ArrowDevice* device, | ||
struct ArrowBuffer* buffer, | ||
int64_t size_bytes) { | ||
return ArrowDeviceCudaAllocateBufferAsync(device, buffer, size_bytes, | ||
NANOARROW_CUDA_DEFAULT_STREAM); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is safe to do. CUDA Streams can be created that are non-blocking with respect to the default stream where this could lead to races if someone assumes that ArrowDeviceCudaAllocateBuffer
is synchronous and safe to use from any stream.
Additionally, the default stream can be controlled at compile time where it can be set to a per thread stream which would make this extra problematic.
cuMemcpyDtoD((CUdeviceptr)dst.data.data, (CUdeviceptr)src.data.data, | ||
(size_t)src.size_bytes), | ||
cuMemcpyDtoDAsync((CUdeviceptr)dst.data.data, (CUdeviceptr)src.data.data, | ||
(size_t)src.size_bytes, hstream), | ||
"cuMemcpytoD", error); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May want to update to make this say Async
as well?
"cuMemcpytoD", error); | |
"cuMemcpyDtoD", error); |
static ArrowErrorCode ArrowDeviceCudaBufferCopy(struct ArrowDevice* device_src, | ||
struct ArrowBufferView src, | ||
struct ArrowDevice* device_dst, | ||
struct ArrowBuffer* dst) { | ||
struct ArrowBufferView dst) { | ||
return ArrowDeviceCudaBufferCopyAsync(device_src, src, device_dst, dst, | ||
NANOARROW_CUDA_DEFAULT_STREAM); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same issue here where if this is assumed to be synchronous then this can cause races for other non-blocking streams
static ArrowErrorCode ArrowDeviceCudaBufferInit(struct ArrowDevice* device_src, | ||
struct ArrowBufferView src, | ||
struct ArrowDevice* device_dst, | ||
struct ArrowBuffer* dst) { | ||
return ArrowDeviceCudaBufferInitAsync(device_src, src, device_dst, dst, | ||
NANOARROW_CUDA_DEFAULT_STREAM); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
@@ -392,6 +417,251 @@ static ArrowErrorCode ArrowDeviceCudaArrayMove(struct ArrowDevice* device_src, | |||
return ENOTSUP; | |||
} | |||
|
|||
static ArrowErrorCode ArrowDeviceCudaResolveBufferSizesAsync( | |||
struct ArrowDevice* device, struct ArrowArrayView* array_view, CUstream hstream) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
array_view
is host memory here, correct? If so, if the allocated memory is CUDA pinned host memory, then the copies will run truly asynchronously here and return before the memory is actually set to the resolved sizes, but nothing will be blocking the CPU. Is that a user's responsibility in this situation?
if (hstream != NANOARROW_CUDA_DEFAULT_STREAM) { | ||
NANOARROW_CUDA_ASSERT_OK(cuStreamDestroy(hstream)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In addition to the default stream there's also the legacy default stream and per thread default stream (see CU_STREAM_LEGACY
and CU_STREAM_PER_THREAD
here)
CUstream hstream = NANOARROW_CUDA_DEFAULT_STREAM; | ||
NANOARROW_RETURN_NOT_OK( | ||
ArrowDeviceCudaCreateStream(device_src, device_dst, &hstream, error)); | ||
|
||
code = ArrowDeviceCudaArrayViewCopyBuffers(device_src, &src->array_view, device_dst, | ||
dst, hstream, &unknown_buffer_size_count); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If src
has a sync event then we need to wait on that event in hstream
otherwise we have a possible race here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In hindsight, I think this is an issue across most of the functions in this PR where we always need to be checking the sync event in source arrays and waiting on them in the stream used for the function
// If buffers with unknown size were encountered, we need to copy all of their | ||
// sizes to the CPU, synchronize, then copy all of the buffers once the CPU | ||
// knows how many bytes from each buffer need to be copied. | ||
code = ArrowDeviceCudaCreateStream(device_src, device_dst, &hstream_data, error); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 here, need to wait on the sync event possibly provided by src
on hstream_data
if (hstream != NANOARROW_CUDA_DEFAULT_STREAM) { | ||
err = cuStreamSynchronize(hstream); | ||
ArrowDeviceCudaDestroyStream(hstream); | ||
if (err != CUDA_SUCCESS) { | ||
ArrowDeviceCudaDestroyStream(hstream_data); | ||
NANOARROW_CUDA_RETURN_NOT_OK(err, "cuStreamSynchronize(hstream)", error); | ||
} | ||
} | ||
|
||
if (hstream_data != NANOARROW_CUDA_DEFAULT_STREAM) { | ||
err = cuStreamSynchronize(hstream_data); | ||
ArrowDeviceCudaDestroyStream(hstream_data); | ||
if (err != CUDA_SUCCESS) { | ||
NANOARROW_CUDA_RETURN_NOT_OK(err, "cuStreamSynchronize(hstream_data)", error); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're not populating the sync event we probably want to synchronize the stream regardless of whether it's the default stream or not
// cuCtxSynchronize() is probably necessary there (since the implementation | ||
// calls raw memcpys) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should only need to synchronize the relevant stream still. cuCtxSynchronize()
synchronizes the entire device I believe.
@paleolimbot A couple of higher level thoughts I have here:
|
Thank you for the review! There are still many TODOs, but it seems like forcing the user to provide a stream to use the helpers makes it easier for nanoarrow to get things right (and simplifies the resource cleanup considerably!). |
src/nanoarrow/nanoarrow_device.c
Outdated
ArrowErrorCode ArrowDeviceBufferInit(struct ArrowDevice* device_src, | ||
struct ArrowBufferView src, | ||
struct ArrowDevice* device_dst, | ||
struct ArrowBuffer* dst) { | ||
int result = device_dst->buffer_init(device_src, src, device_dst, dst); | ||
ArrowErrorCode ArrowDeviceBufferInitAsync(struct ArrowDevice* device_src, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we sure we want to remove the non-async versions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They're still there! (they're just static inline
now and pass NULL
as the sync event). It's not a perfect system since streams don't really apply to any other implementation but basically users of the CUDA version will be required to use the async versions (maybe we can do something smarter in the future).
src/nanoarrow/nanoarrow_device.c
Outdated
ArrowErrorCode ArrowDeviceArrayViewCopy(struct ArrowDeviceArrayView* src, | ||
struct ArrowDevice* device_dst, | ||
struct ArrowDeviceArray* dst) { | ||
static ArrowErrorCode ArrowDeviceArrayViewCopyDefault(struct ArrowDeviceArrayView* src, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this one specifically getting static
added?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is currently only referred to from within the file (as a fallback if neither src or dst provides an implementation). I'm still workshopping this (with the stream
now available to pass through everywhere, we might be able to make this implementation-agnostic again).
int code; | ||
int64_t unknown_buffer_size_count = 0; | ||
|
||
NANOARROW_RETURN_NOT_OK_WITH_ERROR( | ||
ArrowDeviceCudaArrayViewCopyBuffers(device_src, &src->array_view, device_dst, dst, | ||
stream, &unknown_buffer_size_count), | ||
error); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You probably need to call cuStreamWaitEvent
if there's a source event before you pass it to these functions
|
||
NANOARROW_CUDA_RETURN_NOT_OK(cuStreamSynchronize(*stream), "cuStreamSynchronize", | ||
error); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As long as you call cuStreamWaitEvent
you don't have to manually synchronize the stream, you can leave that to the caller/consumer. We don't need the CPU to synchronize and wait, we just need the stream to wait for the event before it gets there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're not populating a sync event then if call cuStreamWaitEvent
with the given stream and someone tries to touch dst
from a different stream we end up with a race. We either need to synchronize the stream or populate the sync event.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the sync event is now handled (i.e., we always call cuStreamWaitEvent()
when consuming, and we always cuEventRecord()
when copying to the GPU (and cuStreamSynchronize()
when copying back to the CPU).
@@ -122,7 +123,7 @@ static ArrowErrorCode ArrowDeviceCudaAllocateBuffer(struct ArrowDevice* device, | |||
switch (device->device_type) { | |||
case ARROW_DEVICE_CUDA: { | |||
CUdeviceptr dptr = 0; | |||
err = cuMemAlloc(&dptr, (size_t)size_bytes); | |||
err = cuMemAllocAsync(&dptr, (size_t)size_bytes, *stream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably worth checking if the finding in #534 is relevant here. cuMemAlloc()
is documented not to like 0
input for the byte_size
parameter, but cuMemAllocAsync()
has no such limitation AFAI can see.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I merged that and rebased for the extra quality-of-life improvements in the tests 🙂
e4f3e5d
to
6996a7c
Compare
src/nanoarrow/nanoarrow_device.c
Outdated
// TODO: Right now we have to let these events through and ignore them | ||
// because we don't have a good way to separate the source and destination | ||
// streams. | ||
// if (sync_event != NULL || stream != NULL) { | ||
// return EINVAL; | ||
// } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The purpose of using a sync event is so that we don't need to care about the source and destination streams. We just need to tell the destination stream to wait for the event (if it exists) before passing it to the memcpy calls. If there's no sync_event, then we don't need to be concerned about the source stream at all
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I fixed this one! This "init" is done by the producer, who in this case has a stream that needs to be propagated to the consumer with as an event. That implementation (create an event and have the stream record it) lives in the "array init".
This implementation is for the CPU device, so if a stream or event get passed here something has gone wrong (that commented code has been updated). In this case, the issue was that the array copier takes a stream
argument, but it wasn't clear if this was for the source or destination so it was getting passed to both.
This should now have been updated such that the stream is synchronized before the call to this function (and a NULL stream and sync event is passed here).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds like a good solution and handling to me, though as far as taking the stream argument is concerned. The stream should always be considered to be "the stream that any operations should be put on", i.e. the destination. By using the whole sync-event handling that we do, we bypass the need to logically separate a source stream and destination stream: either we need to sync (there should be a sync event and the stream should be given a cuStreamWaitEvent
on it) or there is no sync-event and thus no need to sync. The stream is always the stream to be used for any memcpy operations. Just mentioning for future reference to hopefully clear this up and make it easier to understand.
obviously @kkraus14 can correct me if i'm wrong here... lol
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we're all on the same page! The stream
argument in ArrowDeviceArrayInit()
is "the stream that any operations were put on", which is handy for the likely case where a consumer just built a bunch of buffers using a stream, and is ready to hand a ArrowDeviceArray
off to a consumer.
There is plenty of flexibility to refactor/rename/document exactly how/where this happens (e.g., perhaps the ArrowDevice
abstraction is just not needed since the idioms for Metal mostly don't apply here). For now, I needed to implement the stream
-> event
logic somewhere and was a hopefully not unreasonable place to put it.
from @kkraus14's comment:
I think we should definitely allow folks to provide their own implementations for copies. @paleolimbot how difficult is it for a user to inject their own implementations of the ArrowDevice object or callbacks to use for the copying (instead of using the defaults we are providing in this PR)? |
It would probably take another PR or two to get to that point. I think this PR is on the right track, though, in that it implements the asynchronous array copy purely in terms of an asynchronous buffer allocation and an asynchronous buffer copy. The two PRs might be:
For something like cudf that has well-abstracted tools for doing these kinds of things, I might suggest building the buffers separately (e.g., how it currently exports a column to an |
e49246b
to
4f7fe85
Compare
4f7fe85
to
5bf4139
Compare
src/nanoarrow/device/cuda.c
Outdated
static ArrowErrorCode ArrowDeviceCudaAllocateBufferAsync(struct ArrowDevice* device, | ||
struct ArrowBuffer* buffer, | ||
int64_t size_bytes, | ||
CUstream* stream) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CUstream
is actually a typedef'd pointer, so you can probably pass it by value instead of via pointer
src/nanoarrow/device/cuda.c
Outdated
void* sync_event) { | ||
static ArrowErrorCode ArrowDeviceCudaArrayInitInternal( | ||
struct ArrowDevice* device, struct ArrowDeviceArray* device_array, | ||
struct ArrowArray* array, CUevent* cu_event) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same thing as with the stream, you can probably just pass this by value instead of pointer since you never need the actual address of this and CUevent
is a typedef'd pointer anyway
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated all the internal functions to take a CUevent/CUstream
instead of a CUevent*/CUstream*
!
src/nanoarrow/device/cuda.c
Outdated
// If the stream was passed, we must have an event to work with | ||
CUevent cu_event_tmp = NULL; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is necessarily true. If there's no event then that just means that the producer said we don't need to worry about synchronizing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a self-imposed truth (I added the ability for ArrowDeviceArray()
to accept a void* stream
, and made this the behaviour). A user is also free to not provide an event OR a stream, in which case we don't have to worry about synchronizing.
I added a comment here and will add a test, too.
return NANOARROW_OK; | ||
} | ||
|
||
CUstream* get() { return &hstream_; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably just return CUstream
not a pointer to it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just tried this, but because the user-facing device-agnostic functions have to take a void*
, it's slightly more convenient to have this return the CUstream*
. (For consistency with how the CUevent
is exposed int the device array (as a CUevent*
, aka struct something**
), I'm assuming that void* stream
for CUDA is a CUstream*
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your assumption is correct, that makes sense.
@@ -292,7 +297,7 @@ static int ArrowDeviceMetalCopyRequiredCpuToMetal(MTL::Device* mtl_device, | |||
} | |||
|
|||
static ArrowErrorCode ArrowDeviceMetalSynchronize(struct ArrowDevice* device, | |||
void* sync_event, | |||
void* sync_event, void* stream, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we return a different status than NANOARROW_OK
since it's not implemented?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely!
This PR implements asychronous buffer copying when copying CUDA buffers. Before this, we had basically been issuing
cuMemCopyDtoH/HtoD()
a lot of times in a row with a synchronize up front and a synchronize at the end. This was probably not great for performance. Additionally, for copying String/Binary/Large String/Large Binary arrays from CUDA to the CPU, we were issuing very tiny copies on the offsets buffer and synchronizing with the CPU to get the number of bytes to copy for the data buffer.After this PR, when copying from CPU to CUDA, we will be able to return before the copy is necessarily completed by setting the output
sync_event
.When copying from CUDA to CPU, the copy is done in one pass if there are no string/binary arrays, or two passes if there are. When copying string/binary arrays, the implementation walks the entire tree of arrays and issues asynchronous copies for the last offset value. Then, the stream is synchronized with the CPU, and a second set of asynchronous copies are issued for the buffers whose size we now know.
I don't have much experience with CUDA async programming to know if this approach could be simplified (e.g., I do this in two streams, but it might be that one stream is sufficient since perhaps all of the device -> host copies are getting queued against eachother regardless of what stream they're on).
This will be easier to test when (e.g., bigger, non-trivial data) it is wired up to Python.
TODO:
sync_event
integration (both for source and destination)Closes #245.