Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(extensions/nanoarrow_device): Implement asynchronous buffer copying #509

Merged
merged 43 commits into from
Jun 27, 2024

Conversation

paleolimbot
Copy link
Member

This PR implements asychronous buffer copying when copying CUDA buffers. Before this, we had basically been issuing cuMemCopyDtoH/HtoD() a lot of times in a row with a synchronize up front and a synchronize at the end. This was probably not great for performance. Additionally, for copying String/Binary/Large String/Large Binary arrays from CUDA to the CPU, we were issuing very tiny copies on the offsets buffer and synchronizing with the CPU to get the number of bytes to copy for the data buffer.

After this PR, when copying from CPU to CUDA, we will be able to return before the copy is necessarily completed by setting the output sync_event.

When copying from CUDA to CPU, the copy is done in one pass if there are no string/binary arrays, or two passes if there are. When copying string/binary arrays, the implementation walks the entire tree of arrays and issues asynchronous copies for the last offset value. Then, the stream is synchronized with the CPU, and a second set of asynchronous copies are issued for the buffers whose size we now know.

I don't have much experience with CUDA async programming to know if this approach could be simplified (e.g., I do this in two streams, but it might be that one stream is sufficient since perhaps all of the device -> host copies are getting queued against eachother regardless of what stream they're on).

This will be easier to test when (e.g., bigger, non-trivial data) it is wired up to Python.

TODO:

  • Implement sync_event integration (both for source and destination)
  • Test more than just a few string arrays

Closes #245.

@paleolimbot paleolimbot marked this pull request as ready for review June 7, 2024 19:50
Copy link
Member

@jorisvandenbossche jorisvandenbossche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't review the CUDA async details, but just took a look if I could follow the general logic, and added a few questions

Comment on lines 197 to 200
/// \brief Copy an ArrowDeviceArray to a device
ArrowErrorCode (*array_copy)(struct ArrowDeviceArrayView* src,
struct ArrowDevice* device_dst,
struct ArrowDeviceArray* dst);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assumption here is that the ArrowDevice instance calling this method is the source device? (i.e. equal to src->device?) Or is it meant for either copying to and from the device?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ArrowDeviceArrayViewCopy() function does the dispatch here...it tries src->device->array_copy(), then dst->array_copy(), and then falls back to an implementation based on just buffer copying if both of those return ENOTSUP. Basically, either the source or the destination has to know how to do the copy (and the CPU device returns ENOTSUP for everything except cpu--cpu).

src/nanoarrow/nanoarrow_device_cuda.c Outdated Show resolved Hide resolved
dst_view.data.data = &(array_view->buffer_views[2].size_bytes);
dst_view.size_bytes = sizeof(int32_t);

// Note that this strategy (copying four bytes into an int64_t)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are you copying it into an int64_t? (the dst_view above is also set to be the size of int32)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, because essentially you are copying this into array_view->buffer_views[2].size_bytes and size_bytes is an int64?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment that hopefully clarifies this!

Comment on lines 515 to 518
// Don't initialize a buffer that has already been initialized
if (buffer_dst->size_bytes > 0) {
continue;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ensures a buffer (that was already copied) doesn't get copied a second time when calling ArrowDeviceCudaArrayViewCopyBuffers a second time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's so that we can just call this function again (instead of keeping track of a sorted list of flattened buffer indexes or something to know exactly which buffers to initialize a copy for on the second pass). I added a comment that hopefully clarifies this!

Comment on lines 163 to 168
static ArrowErrorCode ArrowDeviceCudaAllocateBuffer(struct ArrowDevice* device,
struct ArrowBuffer* buffer,
int64_t size_bytes) {
return ArrowDeviceCudaAllocateBufferAsync(device, buffer, size_bytes,
NANOARROW_CUDA_DEFAULT_STREAM);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is safe to do. CUDA Streams can be created that are non-blocking with respect to the default stream where this could lead to races if someone assumes that ArrowDeviceCudaAllocateBuffer is synchronous and safe to use from any stream.

Additionally, the default stream can be controlled at compile time where it can be set to a per thread stream which would make this extra problematic.

cuMemcpyDtoD((CUdeviceptr)dst.data.data, (CUdeviceptr)src.data.data,
(size_t)src.size_bytes),
cuMemcpyDtoDAsync((CUdeviceptr)dst.data.data, (CUdeviceptr)src.data.data,
(size_t)src.size_bytes, hstream),
"cuMemcpytoD", error);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May want to update to make this say Async as well?

Suggested change
"cuMemcpytoD", error);
"cuMemcpyDtoD", error);

Comment on lines 322 to 328
static ArrowErrorCode ArrowDeviceCudaBufferCopy(struct ArrowDevice* device_src,
struct ArrowBufferView src,
struct ArrowDevice* device_dst,
struct ArrowBuffer* dst) {
struct ArrowBufferView dst) {
return ArrowDeviceCudaBufferCopyAsync(device_src, src, device_dst, dst,
NANOARROW_CUDA_DEFAULT_STREAM);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue here where if this is assumed to be synchronous then this can cause races for other non-blocking streams

Comment on lines 365 to 371
static ArrowErrorCode ArrowDeviceCudaBufferInit(struct ArrowDevice* device_src,
struct ArrowBufferView src,
struct ArrowDevice* device_dst,
struct ArrowBuffer* dst) {
return ArrowDeviceCudaBufferInitAsync(device_src, src, device_dst, dst,
NANOARROW_CUDA_DEFAULT_STREAM);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@@ -392,6 +417,251 @@ static ArrowErrorCode ArrowDeviceCudaArrayMove(struct ArrowDevice* device_src,
return ENOTSUP;
}

static ArrowErrorCode ArrowDeviceCudaResolveBufferSizesAsync(
struct ArrowDevice* device, struct ArrowArrayView* array_view, CUstream hstream) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

array_view is host memory here, correct? If so, if the allocated memory is CUDA pinned host memory, then the copies will run truly asynchronously here and return before the memory is actually set to the resolved sizes, but nothing will be blocking the CPU. Is that a user's responsibility in this situation?

Comment on lines 570 to 572
if (hstream != NANOARROW_CUDA_DEFAULT_STREAM) {
NANOARROW_CUDA_ASSERT_OK(cuStreamDestroy(hstream));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to the default stream there's also the legacy default stream and per thread default stream (see CU_STREAM_LEGACY and CU_STREAM_PER_THREAD here)

Comment on lines 585 to 590
CUstream hstream = NANOARROW_CUDA_DEFAULT_STREAM;
NANOARROW_RETURN_NOT_OK(
ArrowDeviceCudaCreateStream(device_src, device_dst, &hstream, error));

code = ArrowDeviceCudaArrayViewCopyBuffers(device_src, &src->array_view, device_dst,
dst, hstream, &unknown_buffer_size_count);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If src has a sync event then we need to wait on that event in hstream otherwise we have a possible race here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In hindsight, I think this is an issue across most of the functions in this PR where we always need to be checking the sync event in source arrays and waiting on them in the stream used for the function

// If buffers with unknown size were encountered, we need to copy all of their
// sizes to the CPU, synchronize, then copy all of the buffers once the CPU
// knows how many bytes from each buffer need to be copied.
code = ArrowDeviceCudaCreateStream(device_src, device_dst, &hstream_data, error);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 here, need to wait on the sync event possibly provided by src on hstream_data

Comment on lines 645 to 660
if (hstream != NANOARROW_CUDA_DEFAULT_STREAM) {
err = cuStreamSynchronize(hstream);
ArrowDeviceCudaDestroyStream(hstream);
if (err != CUDA_SUCCESS) {
ArrowDeviceCudaDestroyStream(hstream_data);
NANOARROW_CUDA_RETURN_NOT_OK(err, "cuStreamSynchronize(hstream)", error);
}
}

if (hstream_data != NANOARROW_CUDA_DEFAULT_STREAM) {
err = cuStreamSynchronize(hstream_data);
ArrowDeviceCudaDestroyStream(hstream_data);
if (err != CUDA_SUCCESS) {
NANOARROW_CUDA_RETURN_NOT_OK(err, "cuStreamSynchronize(hstream_data)", error);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're not populating the sync event we probably want to synchronize the stream regardless of whether it's the default stream or not

Comment on lines 678 to 679
// cuCtxSynchronize() is probably necessary there (since the implementation
// calls raw memcpys)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should only need to synchronize the relevant stream still. cuCtxSynchronize() synchronizes the entire device I believe.

@kkraus14
Copy link
Contributor

@paleolimbot A couple of higher level thoughts I have here:

  • Creating and destroying CUDA streams / events is unfortunately a bit expensive. See here for a benchmark on CUDA event creation and destruction. I've seen CUDA stream / event pools implemented in libraries / frameworks for this reason.
    • We may want to make the constructors / destructors pluggable in some way for someone to provide their own pool implementation? I'm not clear if this is already accomplished here.
  • There could be numerous options as far as buffer copying strategies that could be employed here. I.E. copy different buffers on different streams, using a copy kernel, some mix, etc. I don't think it's feasible to capture / implement all of these different options here.
    • Should we allow folks to provide their own implementations for copies? Again, I'm not clear if this is already accomplished here.

@paleolimbot
Copy link
Member Author

Thank you for the review!

There are still many TODOs, but it seems like forcing the user to provide a stream to use the helpers makes it easier for nanoarrow to get things right (and simplifies the resource cleanup considerably!).

Comment on lines 176 to 186
ArrowErrorCode ArrowDeviceBufferInit(struct ArrowDevice* device_src,
struct ArrowBufferView src,
struct ArrowDevice* device_dst,
struct ArrowBuffer* dst) {
int result = device_dst->buffer_init(device_src, src, device_dst, dst);
ArrowErrorCode ArrowDeviceBufferInitAsync(struct ArrowDevice* device_src,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure we want to remove the non-async versions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They're still there! (they're just static inline now and pass NULL as the sync event). It's not a perfect system since streams don't really apply to any other implementation but basically users of the CUDA version will be required to use the async versions (maybe we can do something smarter in the future).

ArrowErrorCode ArrowDeviceArrayViewCopy(struct ArrowDeviceArrayView* src,
struct ArrowDevice* device_dst,
struct ArrowDeviceArray* dst) {
static ArrowErrorCode ArrowDeviceArrayViewCopyDefault(struct ArrowDeviceArrayView* src,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this one specifically getting static added?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is currently only referred to from within the file (as a fallback if neither src or dst provides an implementation). I'm still workshopping this (with the stream now available to pass through everywhere, we might be able to make this implementation-agnostic again).

Comment on lines 561 to 567
int code;
int64_t unknown_buffer_size_count = 0;

NANOARROW_RETURN_NOT_OK_WITH_ERROR(
ArrowDeviceCudaArrayViewCopyBuffers(device_src, &src->array_view, device_dst, dst,
stream, &unknown_buffer_size_count),
error);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably need to call cuStreamWaitEvent if there's a source event before you pass it to these functions

Comment on lines 596 to 598

NANOARROW_CUDA_RETURN_NOT_OK(cuStreamSynchronize(*stream), "cuStreamSynchronize",
error);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as you call cuStreamWaitEvent you don't have to manually synchronize the stream, you can leave that to the caller/consumer. We don't need the CPU to synchronize and wait, we just need the stream to wait for the event before it gets there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're not populating a sync event then if call cuStreamWaitEvent with the given stream and someone tries to touch dst from a different stream we end up with a race. We either need to synchronize the stream or populate the sync event.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the sync event is now handled (i.e., we always call cuStreamWaitEvent() when consuming, and we always cuEventRecord() when copying to the GPU (and cuStreamSynchronize() when copying back to the CPU).

@paleolimbot paleolimbot marked this pull request as draft June 20, 2024 19:33
@@ -122,7 +123,7 @@ static ArrowErrorCode ArrowDeviceCudaAllocateBuffer(struct ArrowDevice* device,
switch (device->device_type) {
case ARROW_DEVICE_CUDA: {
CUdeviceptr dptr = 0;
err = cuMemAlloc(&dptr, (size_t)size_bytes);
err = cuMemAllocAsync(&dptr, (size_t)size_bytes, *stream);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably worth checking if the finding in #534 is relevant here. cuMemAlloc() is documented not to like 0 input for the byte_size parameter, but cuMemAllocAsync() has no such limitation AFAI can see.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I merged that and rebased for the extra quality-of-life improvements in the tests 🙂

Comment on lines 178 to 183
// TODO: Right now we have to let these events through and ignore them
// because we don't have a good way to separate the source and destination
// streams.
// if (sync_event != NULL || stream != NULL) {
// return EINVAL;
// }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of using a sync event is so that we don't need to care about the source and destination streams. We just need to tell the destination stream to wait for the event (if it exists) before passing it to the memcpy calls. If there's no sync_event, then we don't need to be concerned about the source stream at all

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I fixed this one! This "init" is done by the producer, who in this case has a stream that needs to be propagated to the consumer with as an event. That implementation (create an event and have the stream record it) lives in the "array init".

This implementation is for the CPU device, so if a stream or event get passed here something has gone wrong (that commented code has been updated). In this case, the issue was that the array copier takes a stream argument, but it wasn't clear if this was for the source or destination so it was getting passed to both.

This should now have been updated such that the stream is synchronized before the call to this function (and a NULL stream and sync event is passed here).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds like a good solution and handling to me, though as far as taking the stream argument is concerned. The stream should always be considered to be "the stream that any operations should be put on", i.e. the destination. By using the whole sync-event handling that we do, we bypass the need to logically separate a source stream and destination stream: either we need to sync (there should be a sync event and the stream should be given a cuStreamWaitEvent on it) or there is no sync-event and thus no need to sync. The stream is always the stream to be used for any memcpy operations. Just mentioning for future reference to hopefully clear this up and make it easier to understand.

obviously @kkraus14 can correct me if i'm wrong here... lol

Copy link
Member Author

@paleolimbot paleolimbot Jun 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're all on the same page! The stream argument in ArrowDeviceArrayInit() is "the stream that any operations were put on", which is handy for the likely case where a consumer just built a bunch of buffers using a stream, and is ready to hand a ArrowDeviceArray off to a consumer.

There is plenty of flexibility to refactor/rename/document exactly how/where this happens (e.g., perhaps the ArrowDevice abstraction is just not needed since the idioms for Metal mostly don't apply here). For now, I needed to implement the stream -> event logic somewhere and was a hopefully not unreasonable place to put it.

@zeroshade
Copy link
Member

from @kkraus14's comment:

There could be numerous options as far as buffer copying strategies that could be employed here. I.E. copy different buffers on different streams, using a copy kernel, some mix, etc. I don't think it's feasible to capture / implement all of these different options here.

  • Should we allow folks to provide their own implementations for copies? Again, I'm not clear if this is already accomplished here.

I think we should definitely allow folks to provide their own implementations for copies. @paleolimbot how difficult is it for a user to inject their own implementations of the ArrowDevice object or callbacks to use for the copying (instead of using the defaults we are providing in this PR)?

@paleolimbot
Copy link
Member Author

how difficult is it for a user to inject their own implementations of the ArrowDevice object or callbacks to use for the copying

It would probably take another PR or two to get to that point. I think this PR is on the right track, though, in that it implements the asynchronous array copy purely in terms of an asynchronous buffer allocation and an asynchronous buffer copy. The two PRs might be:

  • Better separate the allocation and copying steps in ArrowBufferInit()
  • Let one or more methods accept a custom buffer copier/allocator
  • Better support splitting up children of a device array (if somebody had more than one stream they had available to use, one might suggest copying children in parallel but right now you might have to do some fudging to get all the pieces in the right place).

For something like cudf that has well-abstracted tools for doing these kinds of things, I might suggest building the buffers separately (e.g., how it currently exports a column to an ArrowArray using the ArrowBufferDeallocator) and using ArrowDeviceArrayInit() to do the final export (as opposed to using this generic array copier). That said, I think the generic array copier is still important (if we want people to build device things, they have to test them/print them somehow).

static ArrowErrorCode ArrowDeviceCudaAllocateBufferAsync(struct ArrowDevice* device,
struct ArrowBuffer* buffer,
int64_t size_bytes,
CUstream* stream) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CUstream is actually a typedef'd pointer, so you can probably pass it by value instead of via pointer

void* sync_event) {
static ArrowErrorCode ArrowDeviceCudaArrayInitInternal(
struct ArrowDevice* device, struct ArrowDeviceArray* device_array,
struct ArrowArray* array, CUevent* cu_event) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing as with the stream, you can probably just pass this by value instead of pointer since you never need the actual address of this and CUevent is a typedef'd pointer anyway

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated all the internal functions to take a CUevent/CUstream instead of a CUevent*/CUstream*!

Comment on lines 236 to 237
// If the stream was passed, we must have an event to work with
CUevent cu_event_tmp = NULL;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is necessarily true. If there's no event then that just means that the producer said we don't need to worry about synchronizing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a self-imposed truth (I added the ability for ArrowDeviceArray() to accept a void* stream, and made this the behaviour). A user is also free to not provide an event OR a stream, in which case we don't have to worry about synchronizing.

I added a comment here and will add a test, too.

return NANOARROW_OK;
}

CUstream* get() { return &hstream_; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably just return CUstream not a pointer to it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just tried this, but because the user-facing device-agnostic functions have to take a void*, it's slightly more convenient to have this return the CUstream*. (For consistency with how the CUevent is exposed int the device array (as a CUevent*, aka struct something**), I'm assuming that void* stream for CUDA is a CUstream*).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your assumption is correct, that makes sense.

@@ -292,7 +297,7 @@ static int ArrowDeviceMetalCopyRequiredCpuToMetal(MTL::Device* mtl_device,
}

static ArrowErrorCode ArrowDeviceMetalSynchronize(struct ArrowDevice* device,
void* sync_event,
void* sync_event, void* stream,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we return a different status than NANOARROW_OK since it's not implemented?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely!

@paleolimbot paleolimbot merged commit 008a3bb into apache:main Jun 27, 2024
32 of 33 checks passed
@paleolimbot paleolimbot deleted the c-device-buffer-copy branch June 27, 2024 17:18
@paleolimbot paleolimbot added this to the nanoarrow 0.6.0 milestone Sep 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Device extension should implement async buffer copying for CUDA
5 participants