-
Notifications
You must be signed in to change notification settings - Fork 38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(extensions/nanoarrow_device): Implement asynchronous buffer copying #509
Changes from 38 commits
518538d
fa864be
f5d4f0a
872f3a9
56b937c
4721d26
febfc9a
54327cb
9729a5e
c62d848
7643aa7
be04b3f
6712937
95938d8
dc660df
3d8f76f
103e127
5170fd8
32bfa20
19a5793
d108575
b634c62
22f7196
35daae1
0e1aa5a
3aedee6
2b8671a
4a93294
73a7f41
835c574
7e5fb8f
4d2a5e8
45e0ad8
b48d4bb
72cf059
c6520ab
1bb3b46
5bf4139
4aa114d
9f8e69b
45d07cc
05d053f
20fc135
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -100,9 +100,10 @@ static void ArrowDeviceCudaDeallocator(struct ArrowBufferAllocator* allocator, | |
ArrowFree(allocator_private); | ||
} | ||
|
||
static ArrowErrorCode ArrowDeviceCudaAllocateBuffer(struct ArrowDevice* device, | ||
struct ArrowBuffer* buffer, | ||
int64_t size_bytes) { | ||
static ArrowErrorCode ArrowDeviceCudaAllocateBufferAsync(struct ArrowDevice* device, | ||
struct ArrowBuffer* buffer, | ||
int64_t size_bytes, | ||
CUstream* stream) { | ||
struct ArrowDeviceCudaPrivate* private_data = | ||
(struct ArrowDeviceCudaPrivate*)device->private_data; | ||
|
||
|
@@ -124,11 +125,14 @@ static ArrowErrorCode ArrowDeviceCudaAllocateBuffer(struct ArrowDevice* device, | |
switch (device->device_type) { | ||
case ARROW_DEVICE_CUDA: { | ||
CUdeviceptr dptr = 0; | ||
if (size_bytes > 0) { // cuMemalloc requires non-zero size_bytes | ||
err = cuMemAlloc(&dptr, (size_t)size_bytes); | ||
|
||
// cuMemalloc requires non-zero size_bytes | ||
if (size_bytes > 0) { | ||
err = cuMemAllocAsync(&dptr, (size_t)size_bytes, *stream); | ||
} else { | ||
err = CUDA_SUCCESS; | ||
} | ||
|
||
ptr = (void*)dptr; | ||
op = "cuMemAlloc"; | ||
break; | ||
|
@@ -181,10 +185,9 @@ static void ArrowDeviceCudaArrayRelease(struct ArrowArray* array) { | |
array->release = NULL; | ||
} | ||
|
||
static ArrowErrorCode ArrowDeviceCudaArrayInit(struct ArrowDevice* device, | ||
struct ArrowDeviceArray* device_array, | ||
struct ArrowArray* array, | ||
void* sync_event) { | ||
static ArrowErrorCode ArrowDeviceCudaArrayInitInternal( | ||
struct ArrowDevice* device, struct ArrowDeviceArray* device_array, | ||
struct ArrowArray* array, CUevent* cu_event) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same thing as with the stream, you can probably just pass this by value instead of pointer since you never need the actual address of this and There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I updated all the internal functions to take a |
||
struct ArrowDeviceCudaPrivate* device_private = | ||
(struct ArrowDeviceCudaPrivate*)device->private_data; | ||
// One can create an event with cuEventCreate(&cu_event, CU_EVENT_DEFAULT); | ||
|
@@ -206,9 +209,9 @@ static ArrowErrorCode ArrowDeviceCudaArrayInit(struct ArrowDevice* device, | |
device_array->device_id = device->device_id; | ||
device_array->device_type = device->device_type; | ||
|
||
if (sync_event != NULL) { | ||
private_data->cu_event = *((CUevent*)sync_event); | ||
device_array->sync_event = sync_event; | ||
if (cu_event != NULL) { | ||
private_data->cu_event = *cu_event; | ||
device_array->sync_event = &private_data->cu_event; | ||
} else { | ||
private_data->cu_event = NULL; | ||
device_array->sync_event = NULL; | ||
|
@@ -217,15 +220,63 @@ static ArrowErrorCode ArrowDeviceCudaArrayInit(struct ArrowDevice* device, | |
return NANOARROW_OK; | ||
} | ||
|
||
// TODO: All these buffer copiers would benefit from cudaMemcpyAsync but there is | ||
// no good way to incorporate that just yet | ||
static ArrowErrorCode ArrowDeviceCudaArrayInitAsync(struct ArrowDevice* device, | ||
struct ArrowDeviceArray* device_array, | ||
struct ArrowArray* array, | ||
void* sync_event, void* stream) { | ||
struct ArrowDeviceCudaPrivate* private_data = | ||
(struct ArrowDeviceCudaPrivate*)device->private_data; | ||
|
||
static ArrowErrorCode ArrowDeviceCudaBufferCopyInternal(struct ArrowDevice* device_src, | ||
struct ArrowBufferView src, | ||
struct ArrowDevice* device_dst, | ||
struct ArrowBufferView dst, | ||
int* n_pop_context, | ||
struct ArrowError* error) { | ||
NANOARROW_CUDA_RETURN_NOT_OK(cuCtxPushCurrent(private_data->cu_context), | ||
"cuCtxPushCurrent", NULL); | ||
CUcontext unused; // needed for cuCtxPopCurrent() | ||
|
||
CUevent* cu_event = (CUevent*)sync_event; | ||
|
||
// If the stream was passed, we must have an event to work with | ||
CUevent cu_event_tmp = NULL; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this is necessarily true. If there's no event then that just means that the producer said we don't need to worry about synchronizing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a self-imposed truth (I added the ability for I added a comment here and will add a test, too. |
||
CUresult err; | ||
|
||
if (stream != NULL && cu_event == NULL) { | ||
// TODO: Should this use the disable timing flag as well? | ||
err = cuEventCreate(&cu_event_tmp, CU_EVENT_DEFAULT); | ||
if (err != CUDA_SUCCESS) { | ||
NANOARROW_CUDA_ASSERT_OK(cuCtxPopCurrent(&unused)); | ||
NANOARROW_CUDA_RETURN_NOT_OK(err, "cuEventCreate", NULL); | ||
} | ||
|
||
cu_event = &cu_event_tmp; | ||
} | ||
|
||
if (stream != NULL) { | ||
err = cuEventRecord(*cu_event, *((CUstream*)stream)); | ||
if (err != CUDA_SUCCESS) { | ||
NANOARROW_CUDA_ASSERT_OK(cuCtxPopCurrent(&unused)); | ||
if (cu_event_tmp != NULL) { | ||
NANOARROW_CUDA_ASSERT_OK(cuEventDestroy(cu_event_tmp)); | ||
} | ||
|
||
NANOARROW_CUDA_RETURN_NOT_OK(err, "cuEventCreate", NULL); | ||
} | ||
} | ||
|
||
int result = ArrowDeviceCudaArrayInitInternal(device, device_array, array, cu_event); | ||
NANOARROW_CUDA_ASSERT_OK(cuCtxPopCurrent(&unused)); | ||
if (result != NANOARROW_OK) { | ||
if (cu_event_tmp != NULL) { | ||
NANOARROW_CUDA_ASSERT_OK(cuEventDestroy(cu_event_tmp)); | ||
} | ||
|
||
return result; | ||
} | ||
|
||
return NANOARROW_OK; | ||
} | ||
|
||
static ArrowErrorCode ArrowDeviceCudaBufferCopyAsyncInternal( | ||
struct ArrowDevice* device_src, struct ArrowBufferView src, | ||
struct ArrowDevice* device_dst, struct ArrowBufferView dst, int* n_pop_context, | ||
struct ArrowError* error, CUstream hstream) { | ||
// Note: the device_src/sync event must be synchronized before calling these methods, | ||
// even though the cuMemcpyXXX() functions may do this automatically in some cases. | ||
|
||
|
@@ -238,7 +289,8 @@ static ArrowErrorCode ArrowDeviceCudaBufferCopyInternal(struct ArrowDevice* devi | |
(*n_pop_context)++; | ||
|
||
NANOARROW_CUDA_RETURN_NOT_OK( | ||
cuMemcpyHtoD((CUdeviceptr)dst.data.data, src.data.data, (size_t)src.size_bytes), | ||
cuMemcpyHtoDAsync((CUdeviceptr)dst.data.data, src.data.data, | ||
(size_t)src.size_bytes, hstream), | ||
"cuMemcpyHtoD", error); | ||
|
||
} else if (device_src->device_type == ARROW_DEVICE_CUDA && | ||
|
@@ -252,9 +304,9 @@ static ArrowErrorCode ArrowDeviceCudaBufferCopyInternal(struct ArrowDevice* devi | |
(*n_pop_context)++; | ||
|
||
NANOARROW_CUDA_RETURN_NOT_OK( | ||
cuMemcpyDtoD((CUdeviceptr)dst.data.data, (CUdeviceptr)src.data.data, | ||
(size_t)src.size_bytes), | ||
"cuMemcpytoD", error); | ||
cuMemcpyDtoDAsync((CUdeviceptr)dst.data.data, (CUdeviceptr)src.data.data, | ||
(size_t)src.size_bytes, hstream), | ||
"cuMemcpyDtoDAsync", error); | ||
|
||
} else if (device_src->device_type == ARROW_DEVICE_CUDA && | ||
device_dst->device_type == ARROW_DEVICE_CUDA) { | ||
|
@@ -264,10 +316,10 @@ static ArrowErrorCode ArrowDeviceCudaBufferCopyInternal(struct ArrowDevice* devi | |
(struct ArrowDeviceCudaPrivate*)device_dst->private_data; | ||
|
||
NANOARROW_CUDA_RETURN_NOT_OK( | ||
cuMemcpyPeer((CUdeviceptr)dst.data.data, dst_private->cu_context, | ||
(CUdeviceptr)src.data.data, src_private->cu_context, | ||
(size_t)src.size_bytes), | ||
"cuMemcpyPeer", error); | ||
cuMemcpyPeerAsync((CUdeviceptr)dst.data.data, dst_private->cu_context, | ||
(CUdeviceptr)src.data.data, src_private->cu_context, | ||
(size_t)src.size_bytes, hstream), | ||
"cuMemcpyPeerAsync", error); | ||
|
||
} else if (device_src->device_type == ARROW_DEVICE_CUDA && | ||
device_dst->device_type == ARROW_DEVICE_CPU) { | ||
|
@@ -278,9 +330,9 @@ static ArrowErrorCode ArrowDeviceCudaBufferCopyInternal(struct ArrowDevice* devi | |
"cuCtxPushCurrent", error); | ||
(*n_pop_context)++; | ||
NANOARROW_CUDA_RETURN_NOT_OK( | ||
cuMemcpyDtoH((void*)dst.data.data, (CUdeviceptr)src.data.data, | ||
(size_t)src.size_bytes), | ||
"cuMemcpyDtoH", error); | ||
cuMemcpyDtoHAsync((void*)dst.data.data, (CUdeviceptr)src.data.data, | ||
(size_t)src.size_bytes, hstream), | ||
"cuMemcpyDtoHAsync", error); | ||
|
||
} else if (device_src->device_type == ARROW_DEVICE_CPU && | ||
device_dst->device_type == ARROW_DEVICE_CUDA_HOST) { | ||
|
@@ -301,15 +353,22 @@ static ArrowErrorCode ArrowDeviceCudaBufferCopyInternal(struct ArrowDevice* devi | |
return NANOARROW_OK; | ||
} | ||
|
||
static ArrowErrorCode ArrowDeviceCudaBufferCopy(struct ArrowDevice* device_src, | ||
struct ArrowBufferView src, | ||
struct ArrowDevice* device_dst, | ||
struct ArrowBufferView dst) { | ||
static ArrowErrorCode ArrowDeviceCudaBufferCopyAsync(struct ArrowDevice* device_src, | ||
struct ArrowBufferView src, | ||
struct ArrowDevice* device_dst, | ||
struct ArrowBufferView dst, | ||
void* stream) { | ||
if (stream == NULL) { | ||
return EINVAL; | ||
} | ||
|
||
CUstream hstream = *((CUstream*)stream); | ||
|
||
int n_pop_context = 0; | ||
struct ArrowError error; | ||
|
||
int result = ArrowDeviceCudaBufferCopyInternal(device_src, src, device_dst, dst, | ||
&n_pop_context, &error); | ||
int result = ArrowDeviceCudaBufferCopyAsyncInternal(device_src, src, device_dst, dst, | ||
&n_pop_context, &error, hstream); | ||
for (int i = 0; i < n_pop_context; i++) { | ||
CUcontext unused; | ||
NANOARROW_CUDA_ASSERT_OK(cuCtxPopCurrent(&unused)); | ||
|
@@ -318,17 +377,24 @@ static ArrowErrorCode ArrowDeviceCudaBufferCopy(struct ArrowDevice* device_src, | |
return result; | ||
} | ||
|
||
static ArrowErrorCode ArrowDeviceCudaBufferInit(struct ArrowDevice* device_src, | ||
struct ArrowBufferView src, | ||
struct ArrowDevice* device_dst, | ||
struct ArrowBuffer* dst) { | ||
static ArrowErrorCode ArrowDeviceCudaBufferInitAsync(struct ArrowDevice* device_src, | ||
struct ArrowBufferView src, | ||
struct ArrowDevice* device_dst, | ||
struct ArrowBuffer* dst, | ||
void* stream) { | ||
if (stream == NULL) { | ||
return EINVAL; | ||
} | ||
|
||
CUstream hstream = *((CUstream*)stream); | ||
|
||
struct ArrowBuffer tmp; | ||
|
||
switch (device_dst->device_type) { | ||
case ARROW_DEVICE_CUDA: | ||
case ARROW_DEVICE_CUDA_HOST: | ||
NANOARROW_RETURN_NOT_OK( | ||
ArrowDeviceCudaAllocateBuffer(device_dst, &tmp, src.size_bytes)); | ||
ArrowDeviceCudaAllocateBufferAsync(device_dst, &tmp, src.size_bytes, &hstream)); | ||
break; | ||
case ARROW_DEVICE_CPU: | ||
ArrowBufferInit(&tmp); | ||
|
@@ -341,7 +407,8 @@ static ArrowErrorCode ArrowDeviceCudaBufferInit(struct ArrowDevice* device_src, | |
struct ArrowBufferView tmp_view; | ||
tmp_view.data.data = tmp.data; | ||
tmp_view.size_bytes = tmp.size_bytes; | ||
int result = ArrowDeviceCudaBufferCopy(device_src, src, device_dst, tmp_view); | ||
int result = | ||
ArrowDeviceCudaBufferCopyAsync(device_src, src, device_dst, tmp_view, &hstream); | ||
if (result != NANOARROW_OK) { | ||
ArrowBufferReset(&tmp); | ||
return result; | ||
|
@@ -352,7 +419,7 @@ static ArrowErrorCode ArrowDeviceCudaBufferInit(struct ArrowDevice* device_src, | |
} | ||
|
||
static ArrowErrorCode ArrowDeviceCudaSynchronize(struct ArrowDevice* device, | ||
void* sync_event, | ||
void* sync_event, void* stream, | ||
struct ArrowError* error) { | ||
if (sync_event == NULL) { | ||
return NANOARROW_OK; | ||
|
@@ -363,11 +430,33 @@ static ArrowErrorCode ArrowDeviceCudaSynchronize(struct ArrowDevice* device, | |
return ENOTSUP; | ||
} | ||
|
||
// Sync functions require a context to be set | ||
struct ArrowDeviceCudaPrivate* private_data = | ||
(struct ArrowDeviceCudaPrivate*)device->private_data; | ||
|
||
NANOARROW_CUDA_RETURN_NOT_OK(cuCtxPushCurrent(private_data->cu_context), | ||
"cuCtxPushCurrent", NULL); | ||
CUcontext unused; // needed for cuCtxPopCurrent() | ||
|
||
// Memory for cuda_event is owned by the ArrowArray member of the ArrowDeviceArray | ||
CUevent* cuda_event = (CUevent*)sync_event; | ||
NANOARROW_CUDA_RETURN_NOT_OK(cuEventSynchronize(*cuda_event), "cuEventSynchronize", | ||
error); | ||
CUevent* cu_event = (CUevent*)sync_event; | ||
CUstream* cu_stream = (CUstream*)stream; | ||
CUresult err; | ||
const char* op = ""; | ||
|
||
if (cu_stream == NULL && cu_event != NULL) { | ||
err = cuEventSynchronize(*cu_event); | ||
op = "cuEventSynchronize"; | ||
} else if (cu_stream != NULL && cu_event == NULL) { | ||
err = cuStreamSynchronize(*cu_stream); | ||
op = "cuStreamSynchronize"; | ||
} else if (cu_stream != NULL && cu_event != NULL) { | ||
err = cuStreamWaitEvent(*cu_stream, *cu_event, CU_EVENT_WAIT_DEFAULT); | ||
op = "cuStreamWaitEvent"; | ||
} | ||
|
||
NANOARROW_ASSERT_OK(cuCtxPopCurrent(&unused)); | ||
NANOARROW_CUDA_RETURN_NOT_OK(err, op, error); | ||
return NANOARROW_OK; | ||
} | ||
|
||
|
@@ -384,7 +473,7 @@ static ArrowErrorCode ArrowDeviceCudaArrayMove(struct ArrowDevice* device_src, | |
// We do have to wait on the sync event, though, because this has to be NULL | ||
// for a CPU device array. | ||
NANOARROW_RETURN_NOT_OK( | ||
ArrowDeviceCudaSynchronize(device_src, src->sync_event, NULL)); | ||
ArrowDeviceCudaSynchronize(device_src, src->sync_event, NULL, NULL)); | ||
ArrowDeviceArrayMove(src, dst); | ||
dst->device_type = device_dst->device_type; | ||
dst->device_id = device_dst->device_id; | ||
|
@@ -436,11 +525,11 @@ static ArrowErrorCode ArrowDeviceCudaInitDevice(struct ArrowDevice* device, | |
|
||
device->device_type = device_type; | ||
device->device_id = device_id; | ||
device->array_init = &ArrowDeviceCudaArrayInit; | ||
device->array_init = &ArrowDeviceCudaArrayInitAsync; | ||
device->array_move = &ArrowDeviceCudaArrayMove; | ||
device->buffer_init = &ArrowDeviceCudaBufferInit; | ||
device->buffer_init = &ArrowDeviceCudaBufferInitAsync; | ||
device->buffer_move = NULL; | ||
device->buffer_copy = &ArrowDeviceCudaBufferCopy; | ||
device->buffer_copy = &ArrowDeviceCudaBufferCopyAsync; | ||
device->synchronize_event = &ArrowDeviceCudaSynchronize; | ||
device->release = &ArrowDeviceCudaRelease; | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CUstream
is actually a typedef'd pointer, so you can probably pass it by value instead of via pointer