Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(extensions/nanoarrow_device): Implement asynchronous buffer copying #509

Merged
merged 43 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
518538d
pipe stream parameter through
paleolimbot Jun 7, 2024
fa864be
with actual passing stream through
paleolimbot Jun 7, 2024
f5d4f0a
preliminary implementation
paleolimbot Jun 7, 2024
872f3a9
add overridable method
paleolimbot Jun 7, 2024
56b937c
wire it up
paleolimbot Jun 7, 2024
4721d26
working
paleolimbot Jun 7, 2024
febfc9a
working!
paleolimbot Jun 7, 2024
54327cb
some notes
paleolimbot Jun 7, 2024
9729a5e
fix typo
paleolimbot Jun 8, 2024
c62d848
add clarifying comments
paleolimbot Jun 19, 2024
7643aa7
start adding stream option everywhere
paleolimbot Jun 19, 2024
be04b3f
maybe stream in buffer init
paleolimbot Jun 19, 2024
6712937
one more reference
paleolimbot Jun 19, 2024
95938d8
stream passed through array copy
paleolimbot Jun 19, 2024
dc660df
not working for host maybe yet
paleolimbot Jun 19, 2024
3d8f76f
pre-commit
paleolimbot Jun 20, 2024
103e127
add sync event to the view
paleolimbot Jun 20, 2024
5170fd8
remove unused function
paleolimbot Jun 20, 2024
32bfa20
fix copy to/from cuda host
paleolimbot Jun 20, 2024
19a5793
fix async symbols
paleolimbot Jun 20, 2024
d108575
let the synchronize callback handle stream sync also
paleolimbot Jun 20, 2024
b634c62
fix test
paleolimbot Jun 20, 2024
22f7196
thought finished
paleolimbot Jun 20, 2024
35daae1
fix buffer walker
paleolimbot Jun 20, 2024
0e1aa5a
back to unified async copy
paleolimbot Jun 20, 2024
3aedee6
remove outdated impl
paleolimbot Jun 20, 2024
2b8671a
remove copy impl
paleolimbot Jun 20, 2024
4a93294
fix metal build
paleolimbot Jun 20, 2024
73a7f41
fix typos
paleolimbot Jun 20, 2024
835c574
maybe add stream to array init
paleolimbot Jun 20, 2024
7e5fb8f
fix build
paleolimbot Jun 20, 2024
4d2a5e8
maybe with working stream recorder
paleolimbot Jun 21, 2024
45e0ad8
get copy working
paleolimbot Jun 21, 2024
b48d4bb
comments
paleolimbot Jun 21, 2024
72cf059
handls stream sync on copy back to CPU case
paleolimbot Jun 21, 2024
c6520ab
fix merge
paleolimbot Jun 25, 2024
1bb3b46
add documentation
paleolimbot Jun 26, 2024
5bf4139
use non-default stream
paleolimbot Jun 26, 2024
4aa114d
some docs + fix metal
paleolimbot Jun 26, 2024
9f8e69b
fix a few pointer/handle representations
paleolimbot Jun 26, 2024
45d07cc
better event creation
paleolimbot Jun 26, 2024
05d053f
test stream-not-provided behaviour
paleolimbot Jun 26, 2024
20fc135
test init behaviour
paleolimbot Jun 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 140 additions & 51 deletions src/nanoarrow/device/cuda.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ static void ArrowDeviceCudaDeallocator(struct ArrowBufferAllocator* allocator,
ArrowFree(allocator_private);
}

static ArrowErrorCode ArrowDeviceCudaAllocateBuffer(struct ArrowDevice* device,
struct ArrowBuffer* buffer,
int64_t size_bytes) {
static ArrowErrorCode ArrowDeviceCudaAllocateBufferAsync(struct ArrowDevice* device,
struct ArrowBuffer* buffer,
int64_t size_bytes,
CUstream* stream) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CUstream is actually a typedef'd pointer, so you can probably pass it by value instead of via pointer

struct ArrowDeviceCudaPrivate* private_data =
(struct ArrowDeviceCudaPrivate*)device->private_data;

Expand All @@ -124,11 +125,14 @@ static ArrowErrorCode ArrowDeviceCudaAllocateBuffer(struct ArrowDevice* device,
switch (device->device_type) {
case ARROW_DEVICE_CUDA: {
CUdeviceptr dptr = 0;
if (size_bytes > 0) { // cuMemalloc requires non-zero size_bytes
err = cuMemAlloc(&dptr, (size_t)size_bytes);

// cuMemalloc requires non-zero size_bytes
if (size_bytes > 0) {
err = cuMemAllocAsync(&dptr, (size_t)size_bytes, *stream);
} else {
err = CUDA_SUCCESS;
}

ptr = (void*)dptr;
op = "cuMemAlloc";
break;
Expand Down Expand Up @@ -181,10 +185,9 @@ static void ArrowDeviceCudaArrayRelease(struct ArrowArray* array) {
array->release = NULL;
}

static ArrowErrorCode ArrowDeviceCudaArrayInit(struct ArrowDevice* device,
struct ArrowDeviceArray* device_array,
struct ArrowArray* array,
void* sync_event) {
static ArrowErrorCode ArrowDeviceCudaArrayInitInternal(
struct ArrowDevice* device, struct ArrowDeviceArray* device_array,
struct ArrowArray* array, CUevent* cu_event) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing as with the stream, you can probably just pass this by value instead of pointer since you never need the actual address of this and CUevent is a typedef'd pointer anyway

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated all the internal functions to take a CUevent/CUstream instead of a CUevent*/CUstream*!

struct ArrowDeviceCudaPrivate* device_private =
(struct ArrowDeviceCudaPrivate*)device->private_data;
// One can create an event with cuEventCreate(&cu_event, CU_EVENT_DEFAULT);
Expand All @@ -206,9 +209,9 @@ static ArrowErrorCode ArrowDeviceCudaArrayInit(struct ArrowDevice* device,
device_array->device_id = device->device_id;
device_array->device_type = device->device_type;

if (sync_event != NULL) {
private_data->cu_event = *((CUevent*)sync_event);
device_array->sync_event = sync_event;
if (cu_event != NULL) {
private_data->cu_event = *cu_event;
device_array->sync_event = &private_data->cu_event;
} else {
private_data->cu_event = NULL;
device_array->sync_event = NULL;
Expand All @@ -217,15 +220,63 @@ static ArrowErrorCode ArrowDeviceCudaArrayInit(struct ArrowDevice* device,
return NANOARROW_OK;
}

// TODO: All these buffer copiers would benefit from cudaMemcpyAsync but there is
// no good way to incorporate that just yet
static ArrowErrorCode ArrowDeviceCudaArrayInitAsync(struct ArrowDevice* device,
struct ArrowDeviceArray* device_array,
struct ArrowArray* array,
void* sync_event, void* stream) {
struct ArrowDeviceCudaPrivate* private_data =
(struct ArrowDeviceCudaPrivate*)device->private_data;

static ArrowErrorCode ArrowDeviceCudaBufferCopyInternal(struct ArrowDevice* device_src,
struct ArrowBufferView src,
struct ArrowDevice* device_dst,
struct ArrowBufferView dst,
int* n_pop_context,
struct ArrowError* error) {
NANOARROW_CUDA_RETURN_NOT_OK(cuCtxPushCurrent(private_data->cu_context),
"cuCtxPushCurrent", NULL);
CUcontext unused; // needed for cuCtxPopCurrent()

CUevent* cu_event = (CUevent*)sync_event;

// If the stream was passed, we must have an event to work with
CUevent cu_event_tmp = NULL;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is necessarily true. If there's no event then that just means that the producer said we don't need to worry about synchronizing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a self-imposed truth (I added the ability for ArrowDeviceArray() to accept a void* stream, and made this the behaviour). A user is also free to not provide an event OR a stream, in which case we don't have to worry about synchronizing.

I added a comment here and will add a test, too.

CUresult err;

if (stream != NULL && cu_event == NULL) {
// TODO: Should this use the disable timing flag as well?
err = cuEventCreate(&cu_event_tmp, CU_EVENT_DEFAULT);
if (err != CUDA_SUCCESS) {
NANOARROW_CUDA_ASSERT_OK(cuCtxPopCurrent(&unused));
NANOARROW_CUDA_RETURN_NOT_OK(err, "cuEventCreate", NULL);
}

cu_event = &cu_event_tmp;
}

if (stream != NULL) {
err = cuEventRecord(*cu_event, *((CUstream*)stream));
if (err != CUDA_SUCCESS) {
NANOARROW_CUDA_ASSERT_OK(cuCtxPopCurrent(&unused));
if (cu_event_tmp != NULL) {
NANOARROW_CUDA_ASSERT_OK(cuEventDestroy(cu_event_tmp));
}

NANOARROW_CUDA_RETURN_NOT_OK(err, "cuEventCreate", NULL);
}
}

int result = ArrowDeviceCudaArrayInitInternal(device, device_array, array, cu_event);
NANOARROW_CUDA_ASSERT_OK(cuCtxPopCurrent(&unused));
if (result != NANOARROW_OK) {
if (cu_event_tmp != NULL) {
NANOARROW_CUDA_ASSERT_OK(cuEventDestroy(cu_event_tmp));
}

return result;
}

return NANOARROW_OK;
}

static ArrowErrorCode ArrowDeviceCudaBufferCopyAsyncInternal(
struct ArrowDevice* device_src, struct ArrowBufferView src,
struct ArrowDevice* device_dst, struct ArrowBufferView dst, int* n_pop_context,
struct ArrowError* error, CUstream hstream) {
// Note: the device_src/sync event must be synchronized before calling these methods,
// even though the cuMemcpyXXX() functions may do this automatically in some cases.

Expand All @@ -238,7 +289,8 @@ static ArrowErrorCode ArrowDeviceCudaBufferCopyInternal(struct ArrowDevice* devi
(*n_pop_context)++;

NANOARROW_CUDA_RETURN_NOT_OK(
cuMemcpyHtoD((CUdeviceptr)dst.data.data, src.data.data, (size_t)src.size_bytes),
cuMemcpyHtoDAsync((CUdeviceptr)dst.data.data, src.data.data,
(size_t)src.size_bytes, hstream),
"cuMemcpyHtoD", error);

} else if (device_src->device_type == ARROW_DEVICE_CUDA &&
Expand All @@ -252,9 +304,9 @@ static ArrowErrorCode ArrowDeviceCudaBufferCopyInternal(struct ArrowDevice* devi
(*n_pop_context)++;

NANOARROW_CUDA_RETURN_NOT_OK(
cuMemcpyDtoD((CUdeviceptr)dst.data.data, (CUdeviceptr)src.data.data,
(size_t)src.size_bytes),
"cuMemcpytoD", error);
cuMemcpyDtoDAsync((CUdeviceptr)dst.data.data, (CUdeviceptr)src.data.data,
(size_t)src.size_bytes, hstream),
"cuMemcpyDtoDAsync", error);

} else if (device_src->device_type == ARROW_DEVICE_CUDA &&
device_dst->device_type == ARROW_DEVICE_CUDA) {
Expand All @@ -264,10 +316,10 @@ static ArrowErrorCode ArrowDeviceCudaBufferCopyInternal(struct ArrowDevice* devi
(struct ArrowDeviceCudaPrivate*)device_dst->private_data;

NANOARROW_CUDA_RETURN_NOT_OK(
cuMemcpyPeer((CUdeviceptr)dst.data.data, dst_private->cu_context,
(CUdeviceptr)src.data.data, src_private->cu_context,
(size_t)src.size_bytes),
"cuMemcpyPeer", error);
cuMemcpyPeerAsync((CUdeviceptr)dst.data.data, dst_private->cu_context,
(CUdeviceptr)src.data.data, src_private->cu_context,
(size_t)src.size_bytes, hstream),
"cuMemcpyPeerAsync", error);

} else if (device_src->device_type == ARROW_DEVICE_CUDA &&
device_dst->device_type == ARROW_DEVICE_CPU) {
Expand All @@ -278,9 +330,9 @@ static ArrowErrorCode ArrowDeviceCudaBufferCopyInternal(struct ArrowDevice* devi
"cuCtxPushCurrent", error);
(*n_pop_context)++;
NANOARROW_CUDA_RETURN_NOT_OK(
cuMemcpyDtoH((void*)dst.data.data, (CUdeviceptr)src.data.data,
(size_t)src.size_bytes),
"cuMemcpyDtoH", error);
cuMemcpyDtoHAsync((void*)dst.data.data, (CUdeviceptr)src.data.data,
(size_t)src.size_bytes, hstream),
"cuMemcpyDtoHAsync", error);

} else if (device_src->device_type == ARROW_DEVICE_CPU &&
device_dst->device_type == ARROW_DEVICE_CUDA_HOST) {
Expand All @@ -301,15 +353,22 @@ static ArrowErrorCode ArrowDeviceCudaBufferCopyInternal(struct ArrowDevice* devi
return NANOARROW_OK;
}

static ArrowErrorCode ArrowDeviceCudaBufferCopy(struct ArrowDevice* device_src,
struct ArrowBufferView src,
struct ArrowDevice* device_dst,
struct ArrowBufferView dst) {
static ArrowErrorCode ArrowDeviceCudaBufferCopyAsync(struct ArrowDevice* device_src,
struct ArrowBufferView src,
struct ArrowDevice* device_dst,
struct ArrowBufferView dst,
void* stream) {
if (stream == NULL) {
return EINVAL;
}

CUstream hstream = *((CUstream*)stream);

int n_pop_context = 0;
struct ArrowError error;

int result = ArrowDeviceCudaBufferCopyInternal(device_src, src, device_dst, dst,
&n_pop_context, &error);
int result = ArrowDeviceCudaBufferCopyAsyncInternal(device_src, src, device_dst, dst,
&n_pop_context, &error, hstream);
for (int i = 0; i < n_pop_context; i++) {
CUcontext unused;
NANOARROW_CUDA_ASSERT_OK(cuCtxPopCurrent(&unused));
Expand All @@ -318,17 +377,24 @@ static ArrowErrorCode ArrowDeviceCudaBufferCopy(struct ArrowDevice* device_src,
return result;
}

static ArrowErrorCode ArrowDeviceCudaBufferInit(struct ArrowDevice* device_src,
struct ArrowBufferView src,
struct ArrowDevice* device_dst,
struct ArrowBuffer* dst) {
static ArrowErrorCode ArrowDeviceCudaBufferInitAsync(struct ArrowDevice* device_src,
struct ArrowBufferView src,
struct ArrowDevice* device_dst,
struct ArrowBuffer* dst,
void* stream) {
if (stream == NULL) {
return EINVAL;
}

CUstream hstream = *((CUstream*)stream);

struct ArrowBuffer tmp;

switch (device_dst->device_type) {
case ARROW_DEVICE_CUDA:
case ARROW_DEVICE_CUDA_HOST:
NANOARROW_RETURN_NOT_OK(
ArrowDeviceCudaAllocateBuffer(device_dst, &tmp, src.size_bytes));
ArrowDeviceCudaAllocateBufferAsync(device_dst, &tmp, src.size_bytes, &hstream));
break;
case ARROW_DEVICE_CPU:
ArrowBufferInit(&tmp);
Expand All @@ -341,7 +407,8 @@ static ArrowErrorCode ArrowDeviceCudaBufferInit(struct ArrowDevice* device_src,
struct ArrowBufferView tmp_view;
tmp_view.data.data = tmp.data;
tmp_view.size_bytes = tmp.size_bytes;
int result = ArrowDeviceCudaBufferCopy(device_src, src, device_dst, tmp_view);
int result =
ArrowDeviceCudaBufferCopyAsync(device_src, src, device_dst, tmp_view, &hstream);
if (result != NANOARROW_OK) {
ArrowBufferReset(&tmp);
return result;
Expand All @@ -352,7 +419,7 @@ static ArrowErrorCode ArrowDeviceCudaBufferInit(struct ArrowDevice* device_src,
}

static ArrowErrorCode ArrowDeviceCudaSynchronize(struct ArrowDevice* device,
void* sync_event,
void* sync_event, void* stream,
struct ArrowError* error) {
if (sync_event == NULL) {
return NANOARROW_OK;
Expand All @@ -363,11 +430,33 @@ static ArrowErrorCode ArrowDeviceCudaSynchronize(struct ArrowDevice* device,
return ENOTSUP;
}

// Sync functions require a context to be set
struct ArrowDeviceCudaPrivate* private_data =
(struct ArrowDeviceCudaPrivate*)device->private_data;

NANOARROW_CUDA_RETURN_NOT_OK(cuCtxPushCurrent(private_data->cu_context),
"cuCtxPushCurrent", NULL);
CUcontext unused; // needed for cuCtxPopCurrent()

// Memory for cuda_event is owned by the ArrowArray member of the ArrowDeviceArray
CUevent* cuda_event = (CUevent*)sync_event;
NANOARROW_CUDA_RETURN_NOT_OK(cuEventSynchronize(*cuda_event), "cuEventSynchronize",
error);
CUevent* cu_event = (CUevent*)sync_event;
CUstream* cu_stream = (CUstream*)stream;
CUresult err;
const char* op = "";

if (cu_stream == NULL && cu_event != NULL) {
err = cuEventSynchronize(*cu_event);
op = "cuEventSynchronize";
} else if (cu_stream != NULL && cu_event == NULL) {
err = cuStreamSynchronize(*cu_stream);
op = "cuStreamSynchronize";
} else if (cu_stream != NULL && cu_event != NULL) {
err = cuStreamWaitEvent(*cu_stream, *cu_event, CU_EVENT_WAIT_DEFAULT);
op = "cuStreamWaitEvent";
}

NANOARROW_ASSERT_OK(cuCtxPopCurrent(&unused));
NANOARROW_CUDA_RETURN_NOT_OK(err, op, error);
return NANOARROW_OK;
}

Expand All @@ -384,7 +473,7 @@ static ArrowErrorCode ArrowDeviceCudaArrayMove(struct ArrowDevice* device_src,
// We do have to wait on the sync event, though, because this has to be NULL
// for a CPU device array.
NANOARROW_RETURN_NOT_OK(
ArrowDeviceCudaSynchronize(device_src, src->sync_event, NULL));
ArrowDeviceCudaSynchronize(device_src, src->sync_event, NULL, NULL));
ArrowDeviceArrayMove(src, dst);
dst->device_type = device_dst->device_type;
dst->device_id = device_dst->device_id;
Expand Down Expand Up @@ -436,11 +525,11 @@ static ArrowErrorCode ArrowDeviceCudaInitDevice(struct ArrowDevice* device,

device->device_type = device_type;
device->device_id = device_id;
device->array_init = &ArrowDeviceCudaArrayInit;
device->array_init = &ArrowDeviceCudaArrayInitAsync;
device->array_move = &ArrowDeviceCudaArrayMove;
device->buffer_init = &ArrowDeviceCudaBufferInit;
device->buffer_init = &ArrowDeviceCudaBufferInitAsync;
device->buffer_move = NULL;
device->buffer_copy = &ArrowDeviceCudaBufferCopy;
device->buffer_copy = &ArrowDeviceCudaBufferCopyAsync;
device->synchronize_event = &ArrowDeviceCudaSynchronize;
device->release = &ArrowDeviceCudaRelease;

Expand Down
Loading
Loading